import json import subprocess import threading import time import urllib.request import RPi.GPIO as gpio import board import neopixel def ssh(host, name): def ssh_(host_, name_): print("[ssh]", f"{name_}@{host_}") subprocess.check_call(["ssh", f"{name_}@{host_}"], stdin=subprocess.DEVNULL) threading.Thread(target=ssh_, args=(host, name)).start() class Door: def __init__(self, host, user_open, user_close): self.host = host self.user_open = user_open self.user_close = user_close def open(self): ssh(self.host, self.user_open) def close(self): ssh(self.host, self.user_close) class ButtonStrip: def __init__(self, strip, leds): self.leds = leds self.strip = strip def draw(self, color): for led in self.leds: self.strip[led] = color class ButtonHandler(threading.Thread): def __init__(self, pin, func, edge='both', bouncetime=100): super().__init__(daemon=True) self.pin = pin self.func = func self.edge = edge self.bouncetime = bouncetime / 1000 self.lastpinval = gpio.input(self.pin) self.lock = threading.Lock() def __call__(self, *args): if not self.lock.acquire(blocking=False): return t = threading.Timer(self.bouncetime, self.read, args=args) t.start() def read(self, *args): pinval = gpio.input(self.pin) if ((pinval == 0 and self.lastpinval == 1 and self.edge in ['falling', 'both']) or (pinval == 1 and self.lastpinval == 0 and self.edge in ['rising', 'both'])): self.func(*args) self.lastpinval = pinval self.lock.release() class ButtonSensor: def __init__(self, pin, callback): self.pin = pin gpio.setup(pin, gpio.IN, pull_up_down=gpio.PUD_UP) self.handler = ButtonHandler(pin, callback, edge='falling', bouncetime=100) self.handler.start() gpio.add_event_detect(pin, gpio.BOTH, callback=self.handler) class ButtonPress(Exception): def __init__(self, pin): super().__init__() self.pin = pin class MultiSensor: def __init__(self, pins): self.pressed = None self.buttons = [ButtonSensor(pin, self.press) for pin in pins] def press(self, pin): self.pressed = pin def sleepSecond(self): for i in range(100): if self.pressed is not None: pressed_pin = self.pressed self.pressed = None raise ButtonPress(pressed_pin) time.sleep(0.01) def sleep(self, delay): for i in range(delay): self.sleepSecond() class APILoader(threading.Thread): def __init__(self, delay): super().__init__() self.delay = delay self.data = None self.loadData() def getData(self): if not self.is_alive(): print("API thread is dead. Trying to restart...") self.start() return self.data def run(self): while True: try: self.loadData() except Exception as e: print("Unable to load API data:", e) time.sleep(self.delay) def loadData(self): # Load api data print("Loading API data...") response = urllib.request.urlopen("https://status-v2.chaospott.de/status.json").read() data = json.loads(response.decode("utf-8")) self.data = {door["location"]: not door["value"] for door in data["sensors"]["door_locked"]} def main(): print("[*] Initializing...") # Initialize GPIO board gpio.setmode(gpio.BCM) # Create LED-Strip handle gpio.setup(4, gpio.IN, pull_up_down=gpio.PUD_UP) strip = neopixel.NeoPixel(board.D18, 36, brightness=0.5, pixel_order=neopixel.GRB, auto_write=False) # Create sub-handle for each button handle leds = { "cellar": ButtonStrip(strip, list(range(1, 12))), "center": ButtonStrip(strip, list(range(12, 24))), "aerie": ButtonStrip(strip, list(range(24, 36))), } # Create button press sensor buttons = { 22: "cellar", 23: "aerie", 24: "center", } sensor = MultiSensor(buttons.keys()) doors = { "aerie": Door("10.42.1.28", "open", "close"), "cellar": Door("10.42.1.20", "open", "close"), } # Initialize API interface loader = APILoader(4) loader.start() print("[*] Starting...") while True: for led in leds: leds[led].draw((0, 0, 0)) data = loader.getData() for door in data: if door in leds: leds[door].draw((0, 255, 0) if data[door] else (255, 0, 0)) strip.show() # Wait for a button press try: sensor.sleep(2) except ButtonPress as press: selection = buttons[press.pin] if selection == "aerie": leds["cellar"].draw((255, 0, 0)) elif selection == "cellar": leds["aerie"].draw((255, 0, 0)) else: continue leds["center"].draw((0, 255, 0)) leds[selection].draw((0, 0, 0)) strip.show() try: sensor.sleep(5) except ButtonPress as press: action = buttons[press.pin] if action == "center": doors[selection].open() elif action != selection: doors[selection].close() if __name__ == '__main__': main()