#!/usr/bin/env python3 # vim: ts=2 sw=2 et import grp import json import os import signal import stat import subprocess import sys import threading import time from configparser import ConfigParser from dataclasses import dataclass import paho.mqtt.client as mqtt import pifacedigitalio class FoodoorMQTT: def __init__(self, area): self.area = area self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message self._connect_lock = threading.Condition() def connect(self): try: self.client.connect("mqtt.chaospott.de") self.client.loop_start() with self._connect_lock: self._connect_lock.wait() except Exception as e: print(f"Verbindungsfehler zu MQTT-Server: {e}") def disconnect(self): self.client.loop_stop() def on_connect(self, client, userdata, flags, rc): with self._connect_lock: self._connect_lock.notify() def on_message(self, client, userdata, msg): print(f"MQTT-Server Message: {msg.topic} {msg.payload}") def send_state(self, locked: bool): self.client.publish(f"foobar/{self.area}/foodoor/status", { False: "open", True: "closed", }[locked], qos=0, retain=True) # Read config parser = ConfigParser() parser.read('/etc/foodoord.conf') @dataclass class API: location: str api_url: str consumer_key: str consumer_secret: str def update_state(self, locked): subprocess.check_call([ "/usr/bin/curl", "-XPOST", "--header", "Content-Type: application/json", "--data", json.dumps({"consumer_key": self.consumer_key, "consumer_secret": self.consumer_secret, self.location: locked}), self.api_url ]) APIv1 = API("aerie", parser.get('doorstatus', 'status_url'), parser.get('doorstatus', 'key'), parser.get('doorstatus', 'secret'), ) APIv2 = API("aerie", parser.get('doorstatusv2', 'status_url'), parser.get('doorstatusv2', 'key'), parser.get('doorstatusv2', 'secret'), ) class Foodoord: # Definitions for LED color RED = 0b1 GREEN = 0b10 ORANGE = RED | GREEN # Definitions for output LEDS = { RED: 6, GREEN: 7, } RELAYS_LOCK = 0 RELAYS_UNLOCK = 1 # Definitions for input DOOR_BELL = 0 CLOSE_BUTTON = 1 def __init__(self): self.status_open = False self.mqtt = FoodoorMQTT("oben") self.listener = pifacedigitalio.InputEventListener() self.listener.register(self.DOOR_BELL, pifacedigitalio.IODIR_RISING_EDGE, self.doorbell, settle_time=10) self.listener.register(self.CLOSE_BUTTON, pifacedigitalio.IODIR_RISING_EDGE, self.close_button, settle_time=5) def signal_handler(self, _signal, _frame): self.listener.deactivate() os.remove("/var/run/foodoord.pipe") self.update_api(True) self.set_led(self.RED) sys.exit(0) def update_api(self, locked): try: self.mqtt.send_state(locked) except: pass try: APIv1.update_state(locked) except: pass try: APIv2.update_state(locked) except: pass def set_led(self, color): for led, gpio in self.LEDS.items(): if color & led: pifacedigital.leds[gpio].turn_on() else: pifacedigital.leds[gpio].turn_off() def doorbell(self, event): if self.status_open: pifacedigital.relays[self.RELAYS_UNLOCK].toggle() time.sleep(2) pifacedigital.relays[self.RELAYS_UNLOCK].toggle() def close_button(self, _event): self.status_open = False self.update_api(True) self.set_led(self.RED) def main(self): self.mqtt.connect() self.listener.activate() pifacedigital = pifacedigitalio.PiFaceDigital() signal.signal(signal.SIGTERM, self.signal_handler) # Start settings self.set_led(self.RED) # Setting up FiFo to get sshd-output try: os.mkfifo("/var/run/foodoord.pipe") os.chown("/var/run/foodoord.pipe", -1, grp.getgrnam('foodoor')[2]) os.chmod("/var/run/foodoord.pipe", stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP) except OSError: pass ssh_input = open("/var/run/foodoord.pipe", "r") while True: # Read sshd-output from pipe pipe_cmd = ssh_input.readline().strip() if pipe_cmd == "close" and self.status_open: pifacedigital.relays[self.RELAYS_LOCK].toggle() time.sleep(1) pifacedigital.relays[self.RELAYS_LOCK].toggle() self.status_open = False self.update_api(True) self.set_led(self.RED) elif pipe_cmd == "open": pifacedigital.relays[self.RELAYS_UNLOCK].toggle() time.sleep(2) pifacedigital.relays[self.RELAYS_UNLOCK].toggle() if not self.status_open: self.update_api(False) self.status_open = True self.set_led(self.GREEN) time.sleep(0.1) if __name__ == "__main__": Foodoord().main()