#!/usr/bin/env python3 import hashlib import json import threading import time from dataclasses import dataclass import paho.mqtt.client as mqtt import smbus class BusWLock: def __init__(self, bus: smbus.SMBus): self.bus = bus self.lock = threading.Lock() @dataclass class Relay: name: str i2c_read_addr: int i2c_read_bit: int i2c_write_addr: int i2c_write_bit: int bus: BusWLock = None state: int = 0 has_brightness: bool = False on_state_change = lambda s, _: _ on_button_event = lambda s, _s, _ev: None STATE_MIN = 0 STATE_MAX = 255 def set_state(self, state): if state == self.state: return if self.STATE_MIN < state < self.STATE_MAX: self._trigger(4 * state / 100 + 1) else: self._trigger() self.state = state print("set", self.name, self.state) self.on_state_change(self) def update_state(self, state): if self.state == state: return self.state = state print("update", self.name, self.state) self.on_state_change(self) def _trigger(self, duration=0.5): data_mask = 0xff ^ (1 << self.i2c_write_bit) with self.bus.lock: print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask)) self.bus.bus.write_byte(self.i2c_write_addr, data_mask) time.sleep(duration) self.bus.bus.write_byte(self.i2c_write_addr, 255) print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask)) def process_event(self, active): new_state = self.STATE_MAX if active else self.STATE_MIN if self.state != new_state: self.on_button_event(self, 1) self.on_button_event(self, 0) self.update_state(new_state) def get_ha_components(self, prefix): yield { "name": self.name.replace("/", " ").title(), "unique_id": hashlib.md5(f"{prefix}/{self.name}".encode()).hexdigest(), "platform": "light", "state_topic": f"{prefix}/{self.name}/status", "command_topic": f"{prefix}/{self.name}/action", "payload_on": "on", "payload_off": "off", } yield { "name": self.name.replace("/", " ").title() + " Button", "unique_id": hashlib.md5(f"{prefix}/{self.name}#button".encode()).hexdigest(), "platform": "binary_sensor", "state_topic": f"{prefix}/{self.name}/input", "payload_on": "1", "payload_off": "0", } @dataclass class ButtonInput(Relay): pressed_since: int | None = None def button_pressed(self, ms: int): ... def process_event(self, active: bool): if active: if self.pressed_since is None: self.pressed_since = time.monotonic_ns() self.on_button_event(self, 1) else: if self.pressed_since is not None: self.button_pressed((time.monotonic_ns() - self.pressed_since) // 1000000) # ms self.on_button_event(self, 0) self.pressed_since = None @dataclass class DimmerRelay(ButtonInput): def set_state(self, state): super().set_state(self.STATE_MAX if state > self.STATE_MIN else self.STATE_MIN) def __post_init__(self): self.has_brightness = True def button_pressed(self, ms: int): print(self.name, "pressed for", ms) if ms < 600: self.update_state(self.STATE_MIN if self.state else self.STATE_MAX) else: pass # TODO def get_ha_components(self, prefix): dev, btn = super().get_ha_components(prefix) dev.update({ "brightness_state_topic": f"{prefix}/{self.name}/brightness", "brightness_command_topic": f"{prefix}/{self.name}/action", }) yield dev yield btn @dataclass class ZentralAus(ButtonInput): relays: list[Relay] = None def __post_init__(self): self.state: None = None def set_state(self, state): if state == self.STATE_MAX: for relay in self.relays: relay.set_state(relay.STATE_MIN) def update_state(self, state): return False def button_pressed(self, ms: int): for relay in self.relays: relay.update_state(relay.STATE_MIN) def get_ha_components(self, prefix): return yield class PowerCTL: PREFIX = "foobar/oben" def __init__(self): self.bus = BusWLock(smbus.SMBus(1)) self.relays: dict[str, Relay] = {} # topic: relay self.mqttc = mqtt.Client() self.mqttc.connect("mqtt.chaospott.de", 1883, 60) self.mqttc.on_message = self.on_message self.mqttc.on_connect = self.on_connect self.mqttc.on_subscribe = self.on_subscribe def add_relays(self, *relays: Relay): for relay in relays: relay.bus = self.bus relay.on_state_change = self.mqtt_send_state relay.on_button_event = self.mqtt_send_event topic = f"{self.PREFIX}/{relay.name}/action" self.relays[topic] = relay print("subscribe", topic) self.mqttc.subscribe(topic, 0) def mqtt_send_state(self, relay: Relay): if relay.state is not None: self.mqttc.publish(f"{self.PREFIX}/{relay.name}/status", {True: "on", False: "off"}[relay.state > relay.STATE_MIN], qos=0, retain=False) if relay.has_brightness: self.mqttc.publish(f"{self.PREFIX}/{relay.name}/brightness", relay.state, qos=0, retain=False) def mqtt_send_event(self, relay: ButtonInput, ev): self.mqttc.publish(f"{self.PREFIX}/{relay.name}/input", ev, qos=0, retain=False) def mqtt_register_ha(self): self.mqttc.publish(f"homeassistant/device/powerctl/config", json.dumps(self.get_ha_device_config()), retain=True) def on_connect(self, _mosq, _obj, connect_flags, _rc): print("Connected", connect_flags) if not connect_flags.get("session present", 0): print("Re-Subscribe") for topic in self.relays: self.mqttc.subscribe(topic, 0) def get_ha_device_config(self): return { "dev": { "ids": "powerctl", "name": "PowerCTL", "m": "foobar", }, "origin": { "name": "PowerCTL", }, "cmps": { r.name: cmp for r in self.relays.values() for cmp in r.get_ha_components(self.PREFIX) }, } @staticmethod def on_subscribe(_mosq, _obj, _mid, _granted_qos): print("Subscribed OK") def on_message(self, _mosq, _obj, msg): relay = self.relays.get(msg.topic) if relay is None: return payload = msg.payload.decode("utf-8") if payload == "on": state = relay.STATE_MAX elif payload == "off": state = relay.STATE_MIN else: try: state = int(payload) except ValueError: return relay.set_state(state) def i2c_status_thread_new(self): while True: bus_cache = {} for relay in self.relays.values(): if relay.i2c_read_addr not in bus_cache: with self.bus.lock: bus_cache[relay.i2c_read_addr] = self.bus.bus.read_byte(relay.i2c_read_addr) i2c_data = bus_cache[relay.i2c_read_addr] active = False if i2c_data & (1 << relay.i2c_read_bit) else True relay.process_event(active) time.sleep(0.1) # abtastrate für die schalter def send_state(self): for relay in self.relays.values(): self.mqtt_send_state(relay) def main(): powerctl = PowerCTL() relays = [ Relay("flur/licht", 0x23, 3, 0x21, 0), Relay("cantina/licht", 0x23, 2, 0x3f, 0), DimmerRelay("baellebad/licht", 0x23, 5, 0x21, 1), DimmerRelay("lounge-front/licht", 0x23, 4, 0x21, 2), DimmerRelay("lounge-back/licht", 0x23, 6, 0x21, 3), Relay("baellebad/strom", 0x23, 0, 0x21, 4), Relay("lounge/strom", 0x23, 1, 0x21, 5), Relay("cantina/strom", 0x23, 7, 0x21, 6), ] powerctl.add_relays( *relays, ZentralAus("zentralaus", 0x3a, 0, 0x21, 7, relays=relays), ) # powerctl.mqtt_register_ha() threading.Thread(target=powerctl.i2c_status_thread_new, daemon=True).start() try: powerctl.mqttc.loop_start() time.sleep(10) while True: powerctl.send_state() time.sleep(60) except KeyboardInterrupt: powerctl.mqttc.loop_stop() if __name__ == '__main__': main()