import subprocess import threading import time import board import digitalio import neopixel import paho.mqtt.client as mqtt from adafruit_led_animation import color from adafruit_led_animation.animation.chase import Chase from adafruit_led_animation.animation.rainbowcomet import RainbowComet class SubStrip: def __init__(self, strip, pixel: list[int]): self.strip: neopixel.NeoPixel = strip self._pixel_map = pixel def __len__(self): return len(self._pixel_map) def __setitem__(self, key, value): if isinstance(key, slice): for i, v in zip(range(*key.indices(len(self._pixel_map))), value): self.strip[self._pixel_map[i]] = v else: self.strip[self._pixel_map[key]] = value def show(self): self.strip.show() def fill(self, color): for p in self._pixel_map: self.strip[p] = color class Door: def __init__(self, host, user_open, user_close): self.host = host self.user_open = user_open self.user_close = user_close self.inprogress = False def _ssh(self, host, name): def _ssh_(host_, name_): self.inprogress = True try: print("[ssh]", f"{name_}@{host_}") subprocess.check_call(["ssh", f"{name_}@{host_}"], stdin=subprocess.DEVNULL) except subprocess.CalledProcessError as ex: print("[ssh]", ex) self.inprogress = False threading.Thread(target=_ssh_, args=(host, name)).start() def open(self): self._ssh(self.host, self.user_open) def close(self): self._ssh(self.host, self.user_close) class FoodoorMQTT: def __init__(self, areas): self.areas = areas self.message_cbs = [] self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message self._connect_lock = threading.Condition() self._data = {} def _subscribe(self): for area in self.areas: self.client.subscribe(f"foobar/{area}/foodoor/status", 1) def connect(self, host="mqtt.chaospott.de"): try: self.client.connect(host) self.client.loop_start() with self._connect_lock: self._connect_lock.wait() except Exception as e: print(f"Verbindungsfehler zu MQTT-Server: {e}") def disconnect(self): self.client.loop_stop() def on_connect(self, client, userdata, flags, rc): self._subscribe() with self._connect_lock: self._connect_lock.notify() def on_message(self, client, userdata, msg): print(f"MQTT-Server Message: {msg.topic} {msg.payload.decode()}") self._data[msg.topic] = msg.payload.decode() for cb in self.message_cbs: cb(msg.topic) def get_door(self, area): return self._data.get(f"foobar/{area}/foodoor/status") def is_open(self, area): return self.get_door(area) == "open" class StateMenu: OFF = 0 SHOW_STATE = 1 CHANGE = 2 PROGRESS = 3 RAINBOW = 4 def __init__(self): self.state = StateMenu.OFF self.change_target = None self.target_state = None self.anim = [] self.timeout = None self.next_state = None self.reset_state() self.mqtt = FoodoorMQTT(["oben", "unten"]) self.mqtt.message_cbs.append(lambda *_: self.draw()) self.mqtt.connect() self.strip = neopixel.NeoPixel(board.D18, 36, brightness=0.5, pixel_order=neopixel.GRB, auto_write=False) self.strip_cellar = SubStrip(self.strip, pixel=list(range(12))) self.strip_center = SubStrip(self.strip, pixel=list(range(12, 24))) self.strip_aerie = SubStrip(self.strip, pixel=list(range(24, 36))) self.doors = { "cellar": Door("10.42.1.20", "open", "close"), "aerie": Door("10.42.1.28", "open", "close"), } def reset_state(self): self.state = StateMenu.SHOW_STATE self.change_target = None self.target_state = None self.anim.clear() self.timeout = None self.next_state = None def button_pressed(self, btn): if self.state == StateMenu.OFF: self.set_state(StateMenu.SHOW_STATE) elif self.state == StateMenu.RAINBOW and btn == "center": self.reset_state() elif self.state == StateMenu.SHOW_STATE or self.state == StateMenu.RAINBOW: if btn == "center": self.set_state(StateMenu.RAINBOW, 10) elif btn in ("aerie", "cellar"): self.set_state(StateMenu.CHANGE, 10) self.change_target = btn elif self.state == StateMenu.CHANGE: if btn == self.change_target: self.reset_state() elif self.change_target == "aerie": if btn == "center": self.target_state = "open" elif btn == "cellar": self.target_state = "close" elif self.change_target == "cellar": if btn == "aerie": self.target_state = "open" elif btn == "center": self.target_state = "close" if self.change_target is not None: self.set_state(StateMenu.PROGRESS, 5) self.next_state = StateMenu.SHOW_STATE print(self.change_target, self.target_state) if self.target_state == "open": self.doors[self.change_target].open() elif self.target_state == "close": self.doors[self.change_target].close() elif self.state == StateMenu.PROGRESS: pass self.draw() def set_state(self, state, timeout=None): self.state = state self.timeout = None if timeout is not None: self.timeout = time.monotonic() + timeout def draw(self): self.anim.clear() if self.state == StateMenu.OFF: self.strip.fill(color.BLACK) elif self.state == StateMenu.SHOW_STATE: self.strip_aerie.fill(color.GREEN if self.mqtt.is_open("oben") else color.RED) self.strip_cellar.fill(color.GREEN if self.mqtt.is_open("unten") else color.RED) self.strip_center.fill(color.BLACK) elif self.state == StateMenu.CHANGE: if self.change_target == "aerie": self.anim = [ Chase(self.strip_center, 0.5, color=color.GREEN, size=1, spacing=11), Chase(self.strip_cellar, 0.5, color=color.RED, size=1, spacing=11), ] elif self.change_target == "cellar": self.anim = [ Chase(self.strip_aerie, 0.5, color=color.GREEN, size=1, spacing=11), Chase(self.strip_center, 0.5, color=color.RED, size=1, spacing=11), ] elif self.state == StateMenu.PROGRESS: self.strip_center.fill(color.BLACK) if self.change_target == "aerie": self.strip_cellar.fill(color.GREEN if self.mqtt.is_open("unten") else color.RED) self.anim = [Chase(self.strip_aerie, 0.05, size=1, color=color.GREEN if self.target_state == "open" else color.RED)] elif self.change_target == "cellar": self.strip_aerie.fill(color.GREEN if self.mqtt.is_open("oben") else color.RED) self.anim = [Chase(self.strip_cellar, 0.05, size=1, color=color.GREEN if self.target_state == "open" else color.RED)] elif self.state == StateMenu.RAINBOW: self.strip_aerie.fill(color.GREEN if self.mqtt.is_open("oben") else color.RED) self.strip_cellar.fill(color.GREEN if self.mqtt.is_open("unten") else color.RED) self.anim = [RainbowComet(self.strip_center, 0.05, ring=True)] def tick(self): if self.timeout and self.timeout < time.monotonic() and not (self.change_target and self.doors[self.change_target].inprogress): if self.next_state is not None: self.set_state(self.next_state, 10) self.next_state = None else: self.reset_state() self.draw() for a in self.anim: a.animate(show=False) self.strip.show() def deinit(self): self.strip.deinit() self.mqtt.disconnect() def main(): btn_cellar = digitalio.DigitalInOut(board.D22) btn_cellar.switch_to_input(digitalio.Pull.DOWN) btn_center = digitalio.DigitalInOut(board.D24) btn_center.switch_to_input(digitalio.Pull.DOWN) btn_aerie = digitalio.DigitalInOut(board.D23) btn_aerie.switch_to_input(digitalio.Pull.DOWN) menu = StateMenu() btn_locked = False # For waiting while a button is still pressed try: while True: if not btn_cellar.value: if not btn_locked: menu.button_pressed("cellar") btn_locked = True elif not btn_center.value: if not btn_locked: menu.button_pressed("center") btn_locked = True elif not btn_aerie.value: if not btn_locked: menu.button_pressed("aerie") btn_locked = True else: btn_locked = False menu.tick() time.sleep(.01) except KeyboardInterrupt: menu.deinit() if __name__ == '__main__': main()