foodoord/debian/usr/sbin/foodoord_oben
2025-01-14 19:04:13 +01:00

202 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env python3
# vim: ts=2 sw=2 et
import grp
import json
import os
import signal
import stat
import subprocess
import sys
import threading
import time
from configparser import ConfigParser
from dataclasses import dataclass
import paho.mqtt.client as mqtt
import pifacedigitalio
class FoodoorMQTT:
def __init__(self, area):
self.area = area
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self._connect_lock = threading.Condition()
def connect(self):
try:
self.client.connect("mqtt.chaospott.de")
self.client.loop_start()
with self._connect_lock:
self._connect_lock.wait()
except Exception as e:
print(f"Verbindungsfehler zu MQTT-Server: {e}")
def disconnect(self):
self.client.loop_stop()
def on_connect(self, client, userdata, flags, rc):
with self._connect_lock:
self._connect_lock.notify()
def on_message(self, client, userdata, msg):
print(f"MQTT-Server Message: {msg.topic} {msg.payload}")
def send_state(self, locked: bool):
self.client.publish(f"foobar/{self.area}/foodoor/status", {
False: "open",
True: "closed",
}[locked], qos=0, retain=True)
# Read config
parser = ConfigParser()
parser.read('/etc/foodoord.conf')
@dataclass
class API:
location: str
api_url: str
consumer_key: str
consumer_secret: str
def update_state(self, locked):
subprocess.check_call([
"/usr/bin/curl", "-XPOST",
"--header", "Content-Type: application/json",
"--data",
json.dumps({"consumer_key": self.consumer_key, "consumer_secret": self.consumer_secret, self.location: locked}),
self.api_url
])
APIv1 = API("aerie",
parser.get('doorstatus', 'status_url'),
parser.get('doorstatus', 'key'),
parser.get('doorstatus', 'secret'),
)
APIv2 = API("aerie",
parser.get('doorstatusv2', 'status_url'),
parser.get('doorstatusv2', 'key'),
parser.get('doorstatusv2', 'secret'),
)
class Foodoord:
# Definitions for LED color
RED = 0b1
GREEN = 0b10
ORANGE = RED | GREEN
# Definitions for output
LEDS = {
RED: 6,
GREEN: 7,
}
RELAYS_LOCK = 0
RELAYS_UNLOCK = 1
# Definitions for input
DOOR_BELL = 0
CLOSE_BUTTON = 1
def __init__(self):
self.status_open = False
self.mqtt = FoodoorMQTT("oben")
self.pifacedigital = pifacedigitalio.PiFaceDigital()
self.listener = pifacedigitalio.InputEventListener()
self.listener.register(self.DOOR_BELL, pifacedigitalio.IODIR_RISING_EDGE, self.doorbell, settle_time=10)
self.listener.register(self.CLOSE_BUTTON, pifacedigitalio.IODIR_RISING_EDGE, self.close_button, settle_time=5)
def signal_handler(self, _signal, _frame):
self.listener.deactivate()
os.remove("/var/run/foodoord.pipe")
self.update_api(True)
self.set_led(self.RED)
sys.exit(0)
def update_api(self, locked):
try:
self.mqtt.send_state(locked)
except:
pass
try:
APIv1.update_state(locked)
except:
pass
try:
APIv2.update_state(locked)
except:
pass
def set_led(self, color):
for led, gpio in self.LEDS.items():
if color & led:
self.pifacedigital.leds[gpio].turn_on()
else:
self.pifacedigital.leds[gpio].turn_off()
def doorbell(self, event):
if self.status_open:
self.pifacedigital.relays[self.RELAYS_UNLOCK].toggle()
time.sleep(2)
self.pifacedigital.relays[self.RELAYS_UNLOCK].toggle()
def close_button(self, _event):
self.status_open = False
self.update_api(True)
self.set_led(self.RED)
def main(self):
self.mqtt.connect()
self.listener.activate()
signal.signal(signal.SIGTERM, self.signal_handler)
# Start settings
self.set_led(self.RED)
# Setting up FiFo to get sshd-output
try:
os.mkfifo("/var/run/foodoord.pipe")
os.chown("/var/run/foodoord.pipe", -1, grp.getgrnam('foodoor')[2])
os.chmod("/var/run/foodoord.pipe", stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP)
except OSError:
pass
ssh_input = open("/var/run/foodoord.pipe", "r")
while True:
# Read sshd-output from pipe
pipe_cmd = ssh_input.readline().strip()
if pipe_cmd == "close" and self.status_open:
self.pifacedigital.relays[self.RELAYS_LOCK].toggle()
time.sleep(1)
self.pifacedigital.relays[self.RELAYS_LOCK].toggle()
self.status_open = False
self.update_api(True)
self.set_led(self.RED)
elif pipe_cmd == "open":
self.pifacedigital.relays[self.RELAYS_UNLOCK].toggle()
time.sleep(2)
self.pifacedigital.relays[self.RELAYS_UNLOCK].toggle()
if not self.status_open:
self.update_api(False)
self.status_open = True
self.set_led(self.GREEN)
time.sleep(0.1)
if __name__ == "__main__":
Foodoord().main()