From e13881b07bd9e502f4030e06cebad6a8289c0922 Mon Sep 17 00:00:00 2001 From: T Date: Fri, 5 Sep 2025 22:53:31 +0200 Subject: [PATCH] Code cleanup --- README.md => 1og/README.md | 0 1og/button.py | 216 ++++++++++++++++ .../buttons-platzierung.png | Bin pins.png => 1og/pins.png | Bin 1og/service.sh | 17 ++ button.py | 232 ------------------ service.sh | 24 -- 7 files changed, 233 insertions(+), 256 deletions(-) rename README.md => 1og/README.md (100%) create mode 100644 1og/button.py rename buttons-platzierung.png => 1og/buttons-platzierung.png (100%) rename pins.png => 1og/pins.png (100%) create mode 100644 1og/service.sh delete mode 100644 button.py delete mode 100644 service.sh diff --git a/README.md b/1og/README.md similarity index 100% rename from README.md rename to 1og/README.md diff --git a/1og/button.py b/1og/button.py new file mode 100644 index 0000000..5c556de --- /dev/null +++ b/1og/button.py @@ -0,0 +1,216 @@ +import json +import subprocess +import threading +import time +import urllib.request + +import RPi.GPIO as gpio +import board +import neopixel + + +def ssh(host, name): + def ssh_(host_, name_): + print("[ssh]", f"{name_}@{host_}") + subprocess.check_call(["ssh", f"{name_}@{host_}"], stdin=subprocess.DEVNULL) + + threading.Thread(target=ssh_, args=(host, name)).start() + + +class Door: + def __init__(self, host, user_open, user_close): + self.host = host + self.user_open = user_open + self.user_close = user_close + + def open(self): + ssh(self.host, self.user_open) + + def close(self): + ssh(self.host, self.user_close) + + +class ButtonStrip: + def __init__(self, strip, leds): + self.leds = leds + self.strip = strip + + def draw(self, color): + for led in self.leds: + self.strip[led] = color + + +class ButtonHandler(threading.Thread): + def __init__(self, pin, func, edge='both', bouncetime=100): + super().__init__(daemon=True) + + self.pin = pin + self.func = func + self.edge = edge + self.bouncetime = bouncetime / 1000 + + self.lastpinval = gpio.input(self.pin) + self.lock = threading.Lock() + + def __call__(self, *args): + if not self.lock.acquire(blocking=False): + return + + t = threading.Timer(self.bouncetime, self.read, args=args) + t.start() + + def read(self, *args): + pinval = gpio.input(self.pin) + + if ((pinval == 0 and self.lastpinval == 1 and self.edge in ['falling', 'both']) + or (pinval == 1 and self.lastpinval == 0 and self.edge in ['rising', 'both'])): + self.func(*args) + + self.lastpinval = pinval + self.lock.release() + + +class ButtonSensor: + def __init__(self, pin, callback): + self.pin = pin + + gpio.setup(pin, gpio.IN, pull_up_down=gpio.PUD_UP) + self.handler = ButtonHandler(pin, callback, edge='falling', bouncetime=100) + self.handler.start() + gpio.add_event_detect(pin, gpio.BOTH, callback=self.handler) + + +class ButtonPress(Exception): + def __init__(self, pin): + super().__init__() + self.pin = pin + + +class MultiSensor: + def __init__(self, pins): + self.pressed = None + self.buttons = [ButtonSensor(pin, self.press) for pin in pins] + + def press(self, pin): + self.pressed = pin + + def sleepSecond(self): + for i in range(100): + if self.pressed is not None: + pressed_pin = self.pressed + self.pressed = None + raise ButtonPress(pressed_pin) + time.sleep(0.01) + + def sleep(self, delay): + for i in range(delay): + self.sleepSecond() + + +class APILoader(threading.Thread): + def __init__(self, delay): + super().__init__() + self.delay = delay + self.data = None + self.loadData() + + def getData(self): + if not self.is_alive(): + print("API thread is dead. Trying to restart...") + self.start() + return self.data + + def run(self): + while True: + try: + self.loadData() + except Exception as e: + print("Unable to load API data:", e) + + time.sleep(self.delay) + + def loadData(self): + # Load api data + print("Loading API data...") + response = urllib.request.urlopen("https://status-v2.chaospott.de/status.json").read() + data = json.loads(response.decode("utf-8")) + + self.data = {door["location"]: not door["value"] for door in data["sensors"]["door_locked"]} + + +def main(): + print("[*] Initializing...") + # Initialize GPIO board + gpio.setmode(gpio.BCM) + + # Create LED-Strip handle + gpio.setup(4, gpio.IN, pull_up_down=gpio.PUD_UP) + strip = neopixel.NeoPixel(board.D18, 36, brightness=0.5, pixel_order=neopixel.GRB, auto_write=False) + + # Create sub-handle for each button handle + leds = { + "cellar": ButtonStrip(strip, list(range(1, 12))), + "center": ButtonStrip(strip, list(range(12, 24))), + "aerie": ButtonStrip(strip, list(range(24, 36))), + } + + # Create button press sensor + buttons = { + 22: "cellar", + 23: "aerie", + 24: "center", + } + sensor = MultiSensor(buttons.keys()) + + doors = { + "aerie": Door("10.42.1.28", "open", "close"), + "cellar": Door("10.42.1.20", "open", "close"), + } + + # Initialize API interface + loader = APILoader(4) + loader.start() + + print("[*] Starting...") + while True: + for led in leds: + leds[led].draw((0, 0, 0)) + + data = loader.getData() + for door in data: + if door in leds: + leds[door].draw((0, 255, 0) if data[door] else (255, 0, 0)) + + strip.show() + + # Wait for a button press + try: + sensor.sleep(2) + except ButtonPress as press: + selection = buttons[press.pin] + + if selection == "aerie": + leds["cellar"].draw((255, 0, 0)) + elif selection == "cellar": + leds["aerie"].draw((255, 0, 0)) + else: + continue + + leds["center"].draw((0, 255, 0)) + leds[selection].draw((0, 0, 0)) + + strip.show() + + try: + sensor.sleep(5) + except ButtonPress as press: + action = buttons[press.pin] + + if action == "center": + doors[selection].open() + elif action != selection: + doors[selection].close() + + +if __name__ == '__main__': + main() diff --git a/buttons-platzierung.png b/1og/buttons-platzierung.png similarity index 100% rename from buttons-platzierung.png rename to 1og/buttons-platzierung.png diff --git a/pins.png b/1og/pins.png similarity index 100% rename from pins.png rename to 1og/pins.png diff --git a/1og/service.sh b/1og/service.sh new file mode 100644 index 0000000..304ebe4 --- /dev/null +++ b/1og/service.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Generate systemd service file + +FILE=/etc/systemd/system/buttond.service + +cat > $FILE << EOF +[Unit] +Description = buttonctl + +[Service] +Type = simple +ExecStart = /usr/bin/python3 $PWD/button.py + +[Install] +WantedBy=multi-user.target +Alias=buttond.service +EOF diff --git a/button.py b/button.py deleted file mode 100644 index 616ec66..0000000 --- a/button.py +++ /dev/null @@ -1,232 +0,0 @@ -import time - -start = time.time() -import RPi.GPIO as gpio -from neopixel import NeoPixel, GRB -from board import D18 -from json import loads -from urllib.request import urlopen -from os.path import expanduser -from os import system -import threading -from sys import stdout - -print("Imports took", round(time.time() - start, 2), "seconds", file=stdout) - - -def ssh_(host, name): - print("[ssh]", host, name, file=stdout) - system("ssh " + str(name) + "@" + host) - -def ssh(host, name): - threading.Thread(target=ssh_, args=(host, name)).start() - - - -class Door: - def __init__(self, host, users): - self.host = host - self.users = users - - def open(self): - ssh(self.host, self.users[0]) - - def close(self): - ssh(self.host, self.users[1]) - - -class ButtonStrip: - def __init__(self, strip, leds): - self.leds = leds - self.strip = strip - - def draw(self, color): - for led in self.leds: - self.strip[led] = color - - -class ButtonHandler(threading.Thread): - def __init__(self, pin, func, edge='both', bouncetime=100): - super().__init__(daemon=True) - - self.edge = edge - self.func = func - self.pin = pin - self.bouncetime = float(bouncetime)/1000 - - self.lastpinval = gpio.input(self.pin) - self.lock = threading.Lock() - - def __call__(self, *args): - if not self.lock.acquire(blocking=False): - return - - t = threading.Timer(self.bouncetime, self.read, args=args) - t.start() - - def read(self, *args): - pinval = gpio.input(self.pin) - - if ( - ((pinval == 0 and self.lastpinval == 1) and - (self.edge in ['falling', 'both'])) or - ((pinval == 1 and self.lastpinval == 0) and - (self.edge in ['rising', 'both'])) - ): - self.func(*args) - - self.lastpinval = pinval - self.lock.release() - - -class ButtonSensor: - def __init__(self, pin, callback): - self.pin = pin - - gpio.setup(pin, gpio.IN, pull_up_down=gpio.PUD_UP) - self.handler = ButtonHandler(pin, callback, - edge='falling', bouncetime=100) - self.handler.start() - gpio.add_event_detect(pin, gpio.BOTH, callback=self.handler) - - -class ButtonPress(Exception): - def __init__(self, pin): - Exception.__init__(self) - self.pin = pin - - def __str__(self): - return buttons[self.pin] - - -class MultiSensor: - def __init__(self, pins): - self.buttons = [] - self.pressed = None - for pin in pins: - self.buttons.append(ButtonSensor(pin, self.press)) - - def press(self, pin): - self.pressed = pin - - def sleepSecond(self): - for i in range(100): - if self.pressed is not None: - pressedPin = self.pressed - self.pressed = None - raise ButtonPress(pressedPin) - time.sleep(0.01) - - def sleep(self, delay): - for i in range(delay): - self.sleepSecond() - - -class APILoader: - def __init__(self, delay): - self.delay = delay - self.loadData() - - def start(self): - self.thread = threading.Thread(target=self.run) - self.thread.start() - - def getData(self): - if not self.thread.is_alive(): - print("API thread is dead. Trying to restart...", file=stdout) - self.start() - return self.data - - def run(self): - while True: - try: - self.loadData() - except Exception as e: - print("Unable to load API data:", e, file=stdout) - - time.sleep(self.delay) - - def loadData(self): - print("Loading API data...", file=stdout) - # Load api data - response = urlopen("https://status.chaospott.de/status.json").read() - data = loads(response.decode("utf-8")) - - self.data = {door["location"]: not door["value"] - for door in data["sensors"]["door_locked"]} - - - - -print("[*] Initializing...", file=stdout) -# Initialize GPIO board -gpio.setmode(gpio.BCM) - -# Create LED-Strip handle -gpio.setup(4, gpio.IN, pull_up_down=gpio.PUD_UP) -strip = NeoPixel(D18, 36, brightness=0.5, - pixel_order=GRB, auto_write=False) - -# Create subhandles for each button -leds = { - "cellar": ButtonStrip(strip, list(range(1, 12))), - "center": ButtonStrip(strip, list(range(12, 24))), - "aerie": ButtonStrip(strip, list(range(24, 36))), -} - -# Create button press sensor for input pins -buttons = { - 22: "cellar", - 23: "aerie", - 24: "center", -} -sensor = MultiSensor([i for i in buttons]) - -doors = { - "aerie": Door("10.42.1.28", ("open", "close")), - "cellar": Door("10.42.1.20", ("open", "close")), -} - -# Initialize API interface -loader = APILoader(4) -loader.start() - -print("[*] Starting...", file=stdout) -while True: - for led in leds: - leds[led].draw((0, 0, 0)) - - data = loader.getData() - for door in data: - if door in leds: - leds[door].draw((0, 255, 0) if data[door] else (255, 0, 0)) - - strip.show() - - # Wait for a button press - try: - sensor.sleep(2) - except ButtonPress as press: - selection = str(press) - - if selection == "aerie": - leds["cellar"].draw((255, 0, 0)) - elif selection == "cellar": - leds["aerie"].draw((255, 0, 0)) - else: - continue - - leds[selection].draw((0, 0, 0)) - leds["center"].draw((0, 255, 0)) - - strip.show() - - try: - sensor.sleep(5) - except ButtonPress as press: - action = str(press) - - if action == "center": - doors[selection].open() - elif action != selection: - doors[selection].close() diff --git a/service.sh b/service.sh deleted file mode 100644 index 5cac443..0000000 --- a/service.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -# Generate systemd service file - -FILE=/lib/systemd/system/buttond.service - -generate() { - echo [Unit] - echo Description = buttonctl - echo - echo [Service] - echo ExecStart = /usr/bin/python3 $PWD/button.py - echo Type = simple - echo - echo [Install] - echo WantedBy=multi-user.target - echo Alias=buttond.service -} - -if test -f $FILE -then - rm $FILE -fi -touch $FILE -generate >> $FILE