Code cleanup

This commit is contained in:
T
2025-09-05 22:53:31 +02:00
parent c54e2649ef
commit e13881b07b
7 changed files with 233 additions and 256 deletions

216
1og/button.py Normal file
View File

@@ -0,0 +1,216 @@
import json
import subprocess
import threading
import time
import urllib.request
import RPi.GPIO as gpio
import board
import neopixel
def ssh(host, name):
def ssh_(host_, name_):
print("[ssh]", f"{name_}@{host_}")
subprocess.check_call(["ssh", f"{name_}@{host_}"], stdin=subprocess.DEVNULL)
threading.Thread(target=ssh_, args=(host, name)).start()
class Door:
def __init__(self, host, user_open, user_close):
self.host = host
self.user_open = user_open
self.user_close = user_close
def open(self):
ssh(self.host, self.user_open)
def close(self):
ssh(self.host, self.user_close)
class ButtonStrip:
def __init__(self, strip, leds):
self.leds = leds
self.strip = strip
def draw(self, color):
for led in self.leds:
self.strip[led] = color
class ButtonHandler(threading.Thread):
def __init__(self, pin, func, edge='both', bouncetime=100):
super().__init__(daemon=True)
self.pin = pin
self.func = func
self.edge = edge
self.bouncetime = bouncetime / 1000
self.lastpinval = gpio.input(self.pin)
self.lock = threading.Lock()
def __call__(self, *args):
if not self.lock.acquire(blocking=False):
return
t = threading.Timer(self.bouncetime, self.read, args=args)
t.start()
def read(self, *args):
pinval = gpio.input(self.pin)
if ((pinval == 0 and self.lastpinval == 1 and self.edge in ['falling', 'both'])
or (pinval == 1 and self.lastpinval == 0 and self.edge in ['rising', 'both'])):
self.func(*args)
self.lastpinval = pinval
self.lock.release()
class ButtonSensor:
def __init__(self, pin, callback):
self.pin = pin
gpio.setup(pin, gpio.IN, pull_up_down=gpio.PUD_UP)
self.handler = ButtonHandler(pin, callback, edge='falling', bouncetime=100)
self.handler.start()
gpio.add_event_detect(pin, gpio.BOTH, callback=self.handler)
class ButtonPress(Exception):
def __init__(self, pin):
super().__init__()
self.pin = pin
class MultiSensor:
def __init__(self, pins):
self.pressed = None
self.buttons = [ButtonSensor(pin, self.press) for pin in pins]
def press(self, pin):
self.pressed = pin
def sleepSecond(self):
for i in range(100):
if self.pressed is not None:
pressed_pin = self.pressed
self.pressed = None
raise ButtonPress(pressed_pin)
time.sleep(0.01)
def sleep(self, delay):
for i in range(delay):
self.sleepSecond()
class APILoader(threading.Thread):
def __init__(self, delay):
super().__init__()
self.delay = delay
self.data = None
self.loadData()
def getData(self):
if not self.is_alive():
print("API thread is dead. Trying to restart...")
self.start()
return self.data
def run(self):
while True:
try:
self.loadData()
except Exception as e:
print("Unable to load API data:", e)
time.sleep(self.delay)
def loadData(self):
# Load api data
print("Loading API data...")
response = urllib.request.urlopen("https://status-v2.chaospott.de/status.json").read()
data = json.loads(response.decode("utf-8"))
self.data = {door["location"]: not door["value"] for door in data["sensors"]["door_locked"]}
def main():
print("[*] Initializing...")
# Initialize GPIO board
gpio.setmode(gpio.BCM)
# Create LED-Strip handle
gpio.setup(4, gpio.IN, pull_up_down=gpio.PUD_UP)
strip = neopixel.NeoPixel(board.D18, 36, brightness=0.5, pixel_order=neopixel.GRB, auto_write=False)
# Create sub-handle for each button handle
leds = {
"cellar": ButtonStrip(strip, list(range(1, 12))),
"center": ButtonStrip(strip, list(range(12, 24))),
"aerie": ButtonStrip(strip, list(range(24, 36))),
}
# Create button press sensor
buttons = {
22: "cellar",
23: "aerie",
24: "center",
}
sensor = MultiSensor(buttons.keys())
doors = {
"aerie": Door("10.42.1.28", "open", "close"),
"cellar": Door("10.42.1.20", "open", "close"),
}
# Initialize API interface
loader = APILoader(4)
loader.start()
print("[*] Starting...")
while True:
for led in leds:
leds[led].draw((0, 0, 0))
data = loader.getData()
for door in data:
if door in leds:
leds[door].draw((0, 255, 0) if data[door] else (255, 0, 0))
strip.show()
# Wait for a button press
try:
sensor.sleep(2)
except ButtonPress as press:
selection = buttons[press.pin]
if selection == "aerie":
leds["cellar"].draw((255, 0, 0))
elif selection == "cellar":
leds["aerie"].draw((255, 0, 0))
else:
continue
leds["center"].draw((0, 255, 0))
leds[selection].draw((0, 0, 0))
strip.show()
try:
sensor.sleep(5)
except ButtonPress as press:
action = buttons[press.pin]
if action == "center":
doors[selection].open()
elif action != selection:
doors[selection].close()
if __name__ == '__main__':
main()

View File

Before

Width:  |  Height:  |  Size: 8.8 KiB

After

Width:  |  Height:  |  Size: 8.8 KiB

View File

Before

Width:  |  Height:  |  Size: 119 KiB

After

Width:  |  Height:  |  Size: 119 KiB

17
1og/service.sh Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
# Generate systemd service file
FILE=/etc/systemd/system/buttond.service
cat > $FILE << EOF
[Unit]
Description = buttonctl
[Service]
Type = simple
ExecStart = /usr/bin/python3 $PWD/button.py
[Install]
WantedBy=multi-user.target
Alias=buttond.service
EOF

232
button.py
View File

@@ -1,232 +0,0 @@
import time
start = time.time()
import RPi.GPIO as gpio
from neopixel import NeoPixel, GRB
from board import D18
from json import loads
from urllib.request import urlopen
from os.path import expanduser
from os import system
import threading
from sys import stdout
print("Imports took", round(time.time() - start, 2), "seconds", file=stdout)
def ssh_(host, name):
print("[ssh]", host, name, file=stdout)
system("ssh " + str(name) + "@" + host)
def ssh(host, name):
threading.Thread(target=ssh_, args=(host, name)).start()
class Door:
def __init__(self, host, users):
self.host = host
self.users = users
def open(self):
ssh(self.host, self.users[0])
def close(self):
ssh(self.host, self.users[1])
class ButtonStrip:
def __init__(self, strip, leds):
self.leds = leds
self.strip = strip
def draw(self, color):
for led in self.leds:
self.strip[led] = color
class ButtonHandler(threading.Thread):
def __init__(self, pin, func, edge='both', bouncetime=100):
super().__init__(daemon=True)
self.edge = edge
self.func = func
self.pin = pin
self.bouncetime = float(bouncetime)/1000
self.lastpinval = gpio.input(self.pin)
self.lock = threading.Lock()
def __call__(self, *args):
if not self.lock.acquire(blocking=False):
return
t = threading.Timer(self.bouncetime, self.read, args=args)
t.start()
def read(self, *args):
pinval = gpio.input(self.pin)
if (
((pinval == 0 and self.lastpinval == 1) and
(self.edge in ['falling', 'both'])) or
((pinval == 1 and self.lastpinval == 0) and
(self.edge in ['rising', 'both']))
):
self.func(*args)
self.lastpinval = pinval
self.lock.release()
class ButtonSensor:
def __init__(self, pin, callback):
self.pin = pin
gpio.setup(pin, gpio.IN, pull_up_down=gpio.PUD_UP)
self.handler = ButtonHandler(pin, callback,
edge='falling', bouncetime=100)
self.handler.start()
gpio.add_event_detect(pin, gpio.BOTH, callback=self.handler)
class ButtonPress(Exception):
def __init__(self, pin):
Exception.__init__(self)
self.pin = pin
def __str__(self):
return buttons[self.pin]
class MultiSensor:
def __init__(self, pins):
self.buttons = []
self.pressed = None
for pin in pins:
self.buttons.append(ButtonSensor(pin, self.press))
def press(self, pin):
self.pressed = pin
def sleepSecond(self):
for i in range(100):
if self.pressed is not None:
pressedPin = self.pressed
self.pressed = None
raise ButtonPress(pressedPin)
time.sleep(0.01)
def sleep(self, delay):
for i in range(delay):
self.sleepSecond()
class APILoader:
def __init__(self, delay):
self.delay = delay
self.loadData()
def start(self):
self.thread = threading.Thread(target=self.run)
self.thread.start()
def getData(self):
if not self.thread.is_alive():
print("API thread is dead. Trying to restart...", file=stdout)
self.start()
return self.data
def run(self):
while True:
try:
self.loadData()
except Exception as e:
print("Unable to load API data:", e, file=stdout)
time.sleep(self.delay)
def loadData(self):
print("Loading API data...", file=stdout)
# Load api data
response = urlopen("https://status.chaospott.de/status.json").read()
data = loads(response.decode("utf-8"))
self.data = {door["location"]: not door["value"]
for door in data["sensors"]["door_locked"]}
print("[*] Initializing...", file=stdout)
# Initialize GPIO board
gpio.setmode(gpio.BCM)
# Create LED-Strip handle
gpio.setup(4, gpio.IN, pull_up_down=gpio.PUD_UP)
strip = NeoPixel(D18, 36, brightness=0.5,
pixel_order=GRB, auto_write=False)
# Create subhandles for each button
leds = {
"cellar": ButtonStrip(strip, list(range(1, 12))),
"center": ButtonStrip(strip, list(range(12, 24))),
"aerie": ButtonStrip(strip, list(range(24, 36))),
}
# Create button press sensor for input pins
buttons = {
22: "cellar",
23: "aerie",
24: "center",
}
sensor = MultiSensor([i for i in buttons])
doors = {
"aerie": Door("10.42.1.28", ("open", "close")),
"cellar": Door("10.42.1.20", ("open", "close")),
}
# Initialize API interface
loader = APILoader(4)
loader.start()
print("[*] Starting...", file=stdout)
while True:
for led in leds:
leds[led].draw((0, 0, 0))
data = loader.getData()
for door in data:
if door in leds:
leds[door].draw((0, 255, 0) if data[door] else (255, 0, 0))
strip.show()
# Wait for a button press
try:
sensor.sleep(2)
except ButtonPress as press:
selection = str(press)
if selection == "aerie":
leds["cellar"].draw((255, 0, 0))
elif selection == "cellar":
leds["aerie"].draw((255, 0, 0))
else:
continue
leds[selection].draw((0, 0, 0))
leds["center"].draw((0, 255, 0))
strip.show()
try:
sensor.sleep(5)
except ButtonPress as press:
action = str(press)
if action == "center":
doors[selection].open()
elif action != selection:
doors[selection].close()

View File

@@ -1,24 +0,0 @@
#!/bin/bash
# Generate systemd service file
FILE=/lib/systemd/system/buttond.service
generate() {
echo [Unit]
echo Description = buttonctl
echo
echo [Service]
echo ExecStart = /usr/bin/python3 $PWD/button.py
echo Type = simple
echo
echo [Install]
echo WantedBy=multi-user.target
echo Alias=buttond.service
}
if test -f $FILE
then
rm $FILE
fi
touch $FILE
generate >> $FILE