import paho.mqtt.client as mqtt import time import board import busio from adafruit_ht16k33 import segments from enum import Enum from gpiozero import Button import socket #to get local ip __TOPIC_BRIGHTNESS__ = "brightness" __TOPIC_SCROLL_INTERVAL__ = "scrollinterval" __TOPIC_TEXT_STATIC__ = "textstatic" __TOPIC_TEXT_ONCE__ = "textonce" __TOPIC_TEXT_SCROLL__ = "textscroll" __TOPIC_TEXT_MARQUEE__ = "textmarquee" __TOPIC_TEXT_PAN__ = "textpan" __TOPIC_BUTTON__ = "button" NUMDIGITS = 8 class Mode(Enum): STATIC = 1 #show text constantly. may cut digits ONCE = 3 #show once for some time SCROLL = 2 #scroll thorugh once PAN = 4 #scroll back and forth MARQUEE = 5 class Controller: connected = False displayL = None displayR = None basetopic = None mqtthost = None mqttport = None def __init__(self, base_topic, mqtt_host, mqtt_port = 1883): self.scroll_interval = 0.25 self.text = " " self.text_last = "" self.poscount = 0 self.last_scrollupdate = 0 self.mode = Mode.STATIC self.buttonLeft = Button(23, pull_up=False, hold_time=1) # https://gpiozero.readthedocs.io/en/stable/api_input.html self.buttonRight = Button(24, pull_up=False, hold_time=1) # with deactivated pullup input is active high self.buttonLeft.when_released = self.buttonLeft_released self.buttonLeft.when_held = self.buttonLeft_held self.buttonLeft_heldFlag = False #if true, next released will be ignored self.buttonRight.when_released = self.buttonRight_released self.buttonRight.when_held = self.buttonRight_held self.buttonRight_heldFlag = False #if true, next released will be ignored # Create the I2C interface. self.i2c = busio.I2C(board.SCL, board.SDA) # Create the LED segment class. # This creates a 14 segment 4 character display: self.displayL = segments.Seg14x4(self.i2c, address=0x70) # find address with "sudo i2cdetect -y 1" . you need to install: sudo apt-get install -y python-smbus i2c-tools self.displayR = segments.Seg14x4(self.i2c, address=0x71) # Clear the display. self.displayL.fill(0) self.displayR.fill(0) # set brightness, range 0-1.0, 1.0 max brightness self.displayL.brightness = 1.0 self.displayR.brightness = 1.0 self.basetopic = basetopic self.mqtthost = mqtt_host self.mqttport = mqtt_port connectToMQTT() def connectToMQTT(self): print("trying to connect to mqtt") self.mqtt = mqtt.Client() self.mqtt.on_connect = self.on_connect self.mqtt.on_message = self.on_message self.mqtt.on_disconnect = self.on_disconnect self.mqtt.connect(self.mqtthost, self.mqttport) self.topic = self.basetopic def on_disconnect(self, client, userdata, rc): print("MQTT disconnected") self.mode = Mode.STATIC self.text = "- - - -" self.connected = False def on_message(self, client, userdata, message): msg = str(message.payload.decode("utf-8")) print("msg = " + msg) if message.topic.endswith(__TOPIC_BRIGHTNESS__ + "/set"): val = float(msg) if val >= 0.0 and val <=1.0: self.displayL.brightness = val self.displayR.brightness = val print("changed brightness to ", self.displayL.brightness) self.mqtt.publish(self.topic + "/" + __TOPIC_BRIGHTNESS__, self.displayL.brightness, 1 ) else: print("invalid brightness ", val) if message.topic.endswith(__TOPIC_SCROLL_INTERVAL__ + "/set"): val = float(msg) if val > 0.0: self.scroll_interval = val print("changed scroll_interval to ", self.scroll_interval) self.mqtt.publish(self.topic + "/" + __TOPIC_SCROLL_INTERVAL__, self.scroll_interval, 1 ) else: print("error not >0.0: ", val) if message.topic.endswith(__TOPIC_TEXT_STATIC__ + "/set"): self.mode = Mode.STATIC self.text = msg.ljust(NUMDIGITS) print("changed text to ", self.text) self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_STATIC__, self.text, 1 ) if message.topic.endswith(__TOPIC_TEXT_ONCE__ + "/set"): self.mode = Mode.ONCE self.text = msg.ljust(NUMDIGITS) self.poscount = 0 #use for timing print("changed text to ", self.text) self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_ONCE__, self.text, 1 ) if message.topic.endswith(__TOPIC_TEXT_MARQUEE__ + "/set"): self.mode = Mode.MARQUEE self.poscount = 0 #start at beginning of new text self.text = msg for i in range(NUMDIGITS): self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame print("changed text to ", self.text) self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_MARQUEE__, self.text, 1 ) if message.topic.endswith(__TOPIC_TEXT_PAN__ + "/set"): self.mode = Mode.PAN self.poscount = 0 #start at beginning of new text self.text = msg for i in range(int(NUMDIGITS/4)): self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame print("changed text to ", self.text) self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_PAN__, self.text, 1 ) if message.topic.endswith(__TOPIC_TEXT_SCROLL__ + "/set"): self.mode = Mode.SCROLL self.poscount = 0 #start at beginning of new text self.text = msg for i in range(NUMDIGITS): self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame print("changed text to ", self.text) self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_SCROLL__, self.text, 1 ) def on_connect(self, client, userdata, flags, rc): print("Connected to MQTT Broker") self.mqtt.subscribe(self.topic + "/#") self.connected = True self.mqtt.publish(self.topic + "/" + "$state", "ready", 1 ) self.mqtt.publish(self.topic + "/" + "$hostname", socket.gethostname() , 1 ) self.mqtt.publish(self.topic + "/" + "$localip", self.get_ip() , 1 ) #clear display self.mode = Mode.STATIC self.text = "" def get_ip(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def writeSegment(self, _digit, _char, clear=False): if _digit>=0 and _digit<4: if clear: self.displayL[_digit]=' ' self.displayL[_digit] = _char elif _digit<8: if clear: self.displayR[_digit-4]=' ' self.displayR[_digit-4] = _char def displayTextOffset(self, _text, _offset = 0): # This function is needed because library print functions only work for a single display module. This function handles the '.' character with its special cases. _text+=" " #append NUMDIGITS spaces at end for when scrolling past end _text=_text[_offset:] #move by offset i=0 #counter for reading pos d=0 #counter for writing segment pos while dself.last_reconnectTry+30): connectToMQTT() self.last_reconnectTry=time.time() self.mqtt.loop(0.1) #with block timeout if self.displayL is not None and self.displayR is not None: #displays initialized if self.mode == Mode.STATIC or self.mode == Mode.ONCE: if self.text != self.text_last: #time to update animation self.displayTextOffset(self.text, 0) if self.mode == Mode.ONCE: if time.time() > self.last_scrollupdate+self.scroll_interval: if self.poscount>=NUMDIGITS: #time it stays depends on scrollspeed (how long it would take to pass over the frame) self.clearAndStop() self.poscount += 1 #used for timing self.last_scrollupdate = time.time() if self.mode == Mode.MARQUEE or self.mode == Mode.SCROLL: if time.time() > self.last_scrollupdate+self.scroll_interval or self.text != self.text_last: #time to update animation self.displayTextOffset(self.text, self.poscount) self.poscount += 1 if self.mode == Mode.MARQUEE: self.poscount %= max(1,int(self.seglen(self.text)-NUMDIGITS+1)) elif self.mode == Mode.SCROLL: if self.poscount >= max(1,int(self.seglen(self.text)-NUMDIGITS+1)): #reached end for scroll once self.clearAndStop() self.last_scrollupdate = time.time() if self.mode == Mode.PAN: if time.time() > self.last_scrollupdate+self.scroll_interval or self.text != self.text_last: #time to update animation print("pos=", self.poscount) scrolloffset = self.poscount if scrolloffset >= (self.seglen(self.text)-NUMDIGITS): scrolloffset = (self.seglen(self.text)-NUMDIGITS) - (scrolloffset-(self.seglen(self.text)-NUMDIGITS)) #reverse scrolloffset = max(0,scrolloffset) #cap at zero print("scrolloffset=", scrolloffset) self.displayTextOffset(self.text, scrolloffset) self.poscount += 1 self.poscount %= (self.seglen(self.text)-NUMDIGITS)*2 #if poscount over scrollable width means scroll backwards self.last_scrollupdate = time.time() self.text_last = self.text def clearAndStop(self): self.mode == Mode.STATIC #Stop animation self.text=" ".ljust(NUMDIGITS) #empty self.displayTextOffset(self.text, 0) #update immediately def buttonLeft_released(self): if self.buttonLeft_heldFlag: self.buttonLeft_heldFlag = False else: print("button left pressed") if self.connected: self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "lpress", 1 ) def buttonLeft_held(self): print("button left held") self.buttonLeft_heldFlag = True if self.connected: self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "lhold", 1 ) def buttonRight_released(self): if self.buttonRight_heldFlag: self.buttonRight_heldFlag = False else: print("button right pressed") if self.connected: self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "rpress", 1 ) def buttonRight_held(self): print("button right held") self.buttonRight_heldFlag = True if self.connected: self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "rhold", 1 )