From e8395ae404eb4fd2e58076ff8a45005599c70339 Mon Sep 17 00:00:00 2001 From: T Date: Mon, 27 Jan 2025 14:15:03 +0100 Subject: [PATCH] Rewrite to make it readable --- power_mqtt1.py | 447 ++++++++++++++++++++----------------------------- test.py | 29 ++++ 2 files changed, 211 insertions(+), 265 deletions(-) create mode 100644 test.py diff --git a/power_mqtt1.py b/power_mqtt1.py index a38ad63..7d97679 100755 --- a/power_mqtt1.py +++ b/power_mqtt1.py @@ -1,312 +1,229 @@ #!/usr/bin/env python3 -import _thread +import threading import time -from datetime import datetime +from dataclasses import dataclass -import paho.mqtt.client as mqt +import paho.mqtt.client as mqtt import smbus -bus = smbus.SMBus(1) -mqtc = mqt.Client() -access = {} -# [ code : [last_access, penalty] ] - -bus_use = True +class BusWLock: + def __init__(self, bus: smbus.SMBus): + self.bus = bus + self.lock = threading.Lock() -def allowed(code): - print(access) - if code in access: - if millis_since(access[code][0]) > 3000 + access[code][1]: - access[code] = [datetime.now(), 0] - return True +@dataclass +class Relay: + name: str + i2c_read_addr: int + i2c_read_bit: int + i2c_write_addr: int + i2c_write_bit: int + + bus: BusWLock = None + state: int = 0 + has_brightness: bool = False + on_state_change = lambda s, _: _ + on_button_event = lambda s, _s, _ev: None + + STATE_MIN = 0 + STATE_MAX = 255 + + def set_state(self, state): + if state == self.state: + return + if self.STATE_MIN < state < self.STATE_MAX: + self._trigger(4 * state / 100 + 1) else: - access[code] = [datetime.now(), access[code][1] + 3000] - return False - else: - access[code] = [datetime.now(), 0] - return True + self._trigger() + self.state = state + print("set", self.name, self.state) + self.on_state_change(self) + + def update_state(self, state): + if self.state == state: + return + self.state = state + print("update", self.name, self.state) + self.on_state_change(self) + + def _trigger(self, duration=0.5): + data_mask = 0xff ^ (1 << self.i2c_write_bit) + with self.bus.lock: + print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask)) + self.bus.bus.write_byte(self.i2c_write_addr, data_mask) + time.sleep(duration) + self.bus.bus.write_byte(self.i2c_write_addr, 255) + print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask)) + + def process_event(self, active): + new_state = self.STATE_MAX if active else self.STATE_MIN + if self.state != new_state: + self.on_button_event(self, 1) + self.on_button_event(self, 0) + self.update_state(new_state) -def millis_since(start_time): - dt = datetime.now() - start_time - ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 - return ms +@dataclass +class ButtonInput(Relay): + pressed_since: int | None = None + def button_pressed(self, ms: int): + ... -def on_connect(mosq, obj, foo, bar): - print("Connected") - - -def on_message(mosq, obj, msg): - # print( "Received on topic: " + msg.topic + " Message: "+str(msg.payload) ); - msgs(msg.payload, msg.topic) - - -def on_subscribe(mosq, obj, mid, granted_qos): - print("Subscribed OK") - - -# Funktion Setze Bit in Variable / Function Set Bit in byte -def set_bit(value, bit): - return value | (1 << bit) - - -# Funktion rücksetzte Bit in Variable / Function reset Bit in byte -def clear_bit(value, bit): - return value & ~(1 << bit) - - -def eval_time_diff(switch, start, end): - print("Switch: " + str(switch) + " was pressed for " + str(end - start) + "MS") - d = end - start - if d < 600 and switch == 7: - zentral_aus() - print("Zentral Aus gedrueckt") - return - if d < 600: # Single Switch - state[switch] ^= 1 - send_state() - - -def i2c_status_thread_new(): - global bus_use, state - state_timer = [0, 0, 0, 0, 0, 0, 0, 0] - while True: - while bus_use: - for pos, i2c_input in enumerate(inputs): - byte = bus.read_byte(i2c_input) - for c, i in i2c_inputs[pos].items(): - ns = 1 if byte | c == c else 0 # get new state - - # if i == 7: - # zentral_aus() - # print( "zentral-aus" ) - # continue - - if state[i] != ns and not i in [1, 2, 3, 6]: # Turned on - mqtc.publish("foobar/oben/" + names[i][0] + "/" + services[names[i][1]] + "/status", states[ns], qos=0, retain=False) - state[i] = ns - - if i in (1, 2, 3, 6, 7): - if ns == 0 and state_timer[i] > 0: - eval_time_diff(i, state_timer[i], int(time.time() * 1000)) - state_timer[i] = 0 - - if ns == 1 and state_timer[i] == 0: - state_timer[i] = int(time.time() * 1000) - time.sleep(0.1) # abtastrate für die schalter - - time.sleep(1) - - -# Buttonbefehle -def switch(i, speed=0.5): - global bus_use - # if allowed(i): - bus_use = False - print("Switched: " + str(i) + " Speed: " + str(speed)) - o = 0 - if 7 < i < 16: - o = set_bit(o, i - 8) - bus.write_byte(0x3f, 255 - o) - time.sleep(speed) - o = clear_bit(o, i - 8) - bus.write_byte(0x3f, 255 - o) - elif i < 8: - o = set_bit(o, i) - bus.write_byte(0x21, 255 - o) - time.sleep(speed) - o = clear_bit(o, i) - bus.write_byte(0x21, 255 - o) - bus_use = True - - -def strobo_switch(switch_list, speed=0.5): - global bus_use - bus_use = False - for i in switch_list: - o = 0 - if i > 7: - o = set_bit(o, i - 8) - bus.write_byte(0x3f, 255 - o) - time.sleep(speed) - o = clear_bit(o, i - 8) - bus.write_byte(0x3f, 255 - o) + def process_event(self, active: bool): + if active: + if self.pressed_since is None: + self.pressed_since = time.monotonic_ns() + self.on_button_event(self, 1) else: - o = set_bit(o, i) - bus.write_byte(0x21, 255 - o) - time.sleep(speed) - o = clear_bit(o, i) - bus.write_byte(0x21, 255 - o) - bus_use = True + if self.pressed_since is not None: + self.button_pressed((time.monotonic_ns() - self.pressed_since) // 1000000) # ms + self.on_button_event(self, 0) + self.pressed_since = None -services = ["strom", "licht"] -state = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] -inputs = (0x23, 0x3a) -states = {0: "off", 1: "on"} -commands = {"flur": 0, "baellebad": 1, - "lounge-front": 2, "lounge-back": 3, - "baellebad-ein": 4, "lounge-ein": 5, - "cantina-ein": 6, "zentral-aus": 7, - "cantina": 8} -i2c_inputs = [{0xFE: 4, 0xFD: 5, 0xFB: 8, 0xF7: 0, 0xEF: 2, 0xDF: 1, 0xBF: 3, 0x7F: 6}, {0xFE: 7}] +@dataclass +class DimmerRelay(ButtonInput): + def set_state(self, state): + super().set_state(self.STATE_MAX if state > self.STATE_MIN else self.STATE_MIN) -names = {7: ["zentral", 0], 4: ["baellebad", 0], - 5: ["lounge", 0], 6: ["cantina", 0], 0: ["flur", 1], - 1: ["baellebad", 1], 2: ["lounge-front", 1], - 3: ["lounge-back", 1], 8: ["cantina", 1]} -power = {"zentral": 7, "baellebad": 4, "lounge": 5, "cantina": 6} -light = {"flur": 0, "baellebad": 1, "lounge-front": 2, - "lounge-back": 3, "cantina": 8} + def __post_init__(self): + self.has_brightness = True + + def button_pressed(self, ms: int): + print(self.name, "pressed for", ms) + if ms < 600: + self.update_state(self.STATE_MIN if self.state else self.STATE_MAX) + else: + pass # TODO -# foobar/oben/lounge /licht/action -# cantina /strom/status -# flur -# baellebad -# zentral +@dataclass +class ZentralAus(ButtonInput): + relays: list[Relay] = None -def zentral_aus(): - for i in range(len(state)): - state[i] = 0 - send_state() - # for k,v in names.items(): - # mqtc.publish("foobar/oben/" + v[0] + "/" + services[v[1]] + "/status", "off" ) + def __post_init__(self): + self.state: None = None + + def set_state(self, state): + if state == self.STATE_MAX: + for relay in self.relays: + relay.set_state(relay.STATE_MIN) + + def update_state(self, state): + return False + + def button_pressed(self, ms: int): + self.set_state(self.STATE_MAX) -def switch_state(i, state_, speed=0.5): - if state[i] != state_: # changed - switch(i, speed=0.5) - state[i] = state_ - return state[i] +class PowerCTL: + PREFIX = "foobar/oben" + def __init__(self): + self.bus = BusWLock(smbus.SMBus(1)) + self.relays: dict[str, Relay] = {} # topic: relay -def switch_toggle(i, speed=0.5): - switch(i, speed=speed) - # power can only be switched on, and centrally shutdown - # if i in [ 4,5,6]: - # state[i] = 1 - # not anymore - if i == 7: - state[i] = 0 - elif i < 16: - state[i] = 0 if state[i] == 1 else 1 - mqtc.publish("foobar/oben/" + names[i][0] + "/" + services[names[i][1]] + "/status", states[state[i]]) + self.mqtc = mqtt.Client() + self.mqtc.connect("mqtt.chaospott.de", 1883, 60) + self.mqtc.on_message = self.on_message + self.mqtc.on_connect = self.on_connect + self.mqtc.on_subscribe = self.on_subscribe -def decode_topic(topic, state): - clist = topic.split('/') - if clist[3] == "strom" and clist[2] == "zentral": - zentral_aus() + def add_relays(self, *relays: Relay): + for relay in relays: + relay.bus = self.bus + relay.on_state_change = self.mqtt_send_state + relay.on_button_event = self.mqtt_send_event + topic = f"{self.PREFIX}/{relay.name}/action" + self.relays[topic] = relay + print("subscribe", topic) + self.mqtc.subscribe(topic, 0) - if clist[3] == "licht" and clist[2] in light: - ns = switch_state(light[clist[2]], state) - mqtc.publish(topic.replace("action", "status"), states[ns], qos=0, retain=False) + def mqtt_send_state(self, relay: Relay): + if relay.state is not None: + self.mqtc.publish(f"{self.PREFIX}/{relay.name}/status", {True: "on", False: "off"}[relay.state > relay.STATE_MIN], qos=0, retain=False) + if relay.has_brightness: + self.mqtc.publish(f"{self.PREFIX}/{relay.name}/brightness", relay.state, qos=0, retain=False) - if clist[3] == "strom" and clist[2] in power: - ns = switch_state(power[clist[2]], state) - mqtc.publish(topic.replace("action", "status"), states[ns], qos=0, retain=False) + def mqtt_send_event(self, relay: ButtonInput, ev): + self.mqtc.publish(f"{self.PREFIX}/{relay.name}/input", ev, qos=0, retain=False) + @staticmethod + def on_connect(_mosq, _obj, _foo, _bar): + print("Connected") -def msgs(inp, topic): - c = inp.decode("utf-8") - l = len(c) - # supporting number commands - if c == "on" or c == "off": - if c.find("on") >= 0: - decode_topic(topic, 1) - elif c.find("off") >= 0: - decode_topic(topic, 0) - return - elif l < 3: - try: - msg = int(inp) - switch_toggle(msg) - except ValueError: + @staticmethod + def on_subscribe(_mosq, _obj, _mid, _granted_qos): + print("Subscribed OK") + + def on_message(self, _mosq, _obj, msg): + relay = self.relays.get(msg.topic) + if relay is None: return - # supporting string commands with dimming parameters - else: - cmds = c.split(",") - if len(cmds) > 1: - command = 9001 - # error checking - if cmds[0] in commands: - command = commands[cmds[0]] - else: - return + payload = msg.payload.decode("utf-8") + if payload == "on": + state = relay.STATE_MAX + elif payload == "off": + state = relay.STATE_MIN + else: try: - arg = int(cmds[1]) + state = int(payload) except ValueError: return - # strobo - if (command == 100 or command == 99) and arg < 100: - for i in range(arg): - switch((100 - command) * 8, speed=0.05) - time.sleep(0.06) - elif command == 101 and arg < 100: - for i in range(arg): - strobo_switch([0, 8], speed=0.05) - time.sleep(0.01) - # command with parameter used for dimming - else: - if arg < 100: - switch(command, speed=4 * arg / 100 + 1) - if state[command] == 0: - state[command] = 1 - mqtc.publish("foobar/oben/" + names[command][0] + "/" + services[names[command][1]] + "/status", states[state[command]]) + relay.set_state(state) - # single string command without parameter - else: - print("One Command") - if commands[c] == 7: - zentral_aus() - if c in commands: - switch_toggle(commands[c]) - return + def i2c_status_thread_new(self): + while True: + bus_cache = {} + for relay in self.relays.values(): + if relay.i2c_read_addr not in bus_cache: + with self.bus.lock: + bus_cache[relay.i2c_read_addr] = self.bus.bus.read_byte(relay.i2c_read_addr) + i2c_data = bus_cache[relay.i2c_read_addr] + + active = False if i2c_data & (1 << relay.i2c_read_bit) else True + relay.process_event(active) + + time.sleep(0.1) # abtastrate für die schalter + + def send_state(self): + for relay in self.relays.values(): + self.mqtt_send_state(relay) -def send_state(): - for i in range(len(state)): - try: - mqtc.publish("foobar/oben/" + names[i][0] + "/" + services[names[i][1]] + "/status", states[state[i]]) - except KeyError: - pass +def main(): + powerctl = PowerCTL() + relays = [ + Relay("flur/licht", 0x23, 3, 0x21, 0), + Relay("cantina/licht", 0x23, 2, 0x3f, 0), + DimmerRelay("baellebad/licht", 0x23, 5, 0x21, 1), + DimmerRelay("lounge-front/licht", 0x23, 4, 0x21, 2), + DimmerRelay("lounge-back/licht", 0x23, 6, 0x21, 3), -def init_mqtt(): - mqtc.connect("mqtt.chaospott.de", 1883, 60) - mqtc.subscribe("foobar/oben/licht", 0) - mqtc.subscribe("foobar/oben/lounge-back/licht/action", 0) - mqtc.subscribe("foobar/oben/lounge-front/licht/action", 0) - mqtc.subscribe("foobar/oben/lounge/strom/action", 0) - mqtc.subscribe("foobar/oben/baellebad/licht/action", 0) - mqtc.subscribe("foobar/oben/baellebad/strom/action", 0) - mqtc.subscribe("foobar/oben/cantina/licht/action", 0) - mqtc.subscribe("foobar/oben/cantina/strom/action", 0) - mqtc.subscribe("foobar/oben/flur/licht/action", 0) - mqtc.subscribe("foobar/oben/strom/zentral/licht/action", 0) - mqtc.on_message = on_message - mqtc.on_connect = on_connect - mqtc.on_subscribe = on_subscribe + Relay("baellebad/strom", 0x23, 0, 0x21, 4), + Relay("lounge/strom", 0x23, 1, 0x21, 5), + Relay("cantina/strom", 0x23, 7, 0x21, 6), + ] + powerctl.add_relays( + *relays, + ZentralAus("zentralaus", 0x3a, 0, 0x21, 7, relays=relays), + ) - _thread.start_new_thread(i2c_status_thread_new, ()) - mqtc.loop_start() + threading.Thread(target=powerctl.i2c_status_thread_new, daemon=True).start() + powerctl.mqtc.loop_start() + time.sleep(10) while True: - # try: - # print("Err") - # mqtc.loop_forever() - # except: - # pass + powerctl.send_state() time.sleep(60) - send_state() -init_mqtt() - -# vim: noai:ts=4:sw=4 +if __name__ == '__main__': + main() diff --git a/test.py b/test.py new file mode 100644 index 0000000..922d442 --- /dev/null +++ b/test.py @@ -0,0 +1,29 @@ +import random +import time +from unittest.mock import patch + +import paho.mqtt.client as mqtt + +import power_mqtt1 + + +class SMBus: + def __init__(self, _bus): + pass + + def read_byte(self, _i2c_addr): + time.sleep(5) + return random.randbytes(1)[0] + + def write_byte(self, i2c_addr, data): + pass + + +class MqttClient(mqtt.Client): + def connect(self, host, *args, **kwargs): + return super().connect("127.0.0.1", *args, **kwargs) + + +with patch('smbus.SMBus', new=SMBus) as SMBus_mock, \ + patch("paho.mqtt.client.Client", new=MqttClient) as connect_mock: + power_mqtt1.main()