WIP: config via mqtt

This commit is contained in:
T 2025-03-10 19:16:35 +01:00
parent 247d7712b9
commit a28def65b8
2 changed files with 75 additions and 18 deletions

View File

@ -1,5 +1,7 @@
#!/usr/bin/env python3
import hashlib
import json
import threading
import time
from dataclasses import dataclass
@ -65,6 +67,25 @@ class Relay:
self.on_button_event(self, 0)
self.update_state(new_state)
def get_ha_components(self, prefix):
yield {
"name": self.name.replace("/", " ").title(),
"unique_id": hashlib.md5(f"{prefix}/{self.name}".encode()).hexdigest(),
"platform": "light",
"state_topic": f"{prefix}/{self.name}/status",
"command_topic": f"{prefix}/{self.name}/action",
"payload_on": "on",
"payload_off": "off",
}
yield {
"name": self.name.replace("/", " ").title() + " Button",
"unique_id": hashlib.md5(f"{prefix}/{self.name}#button".encode()).hexdigest(),
"platform": "binary_sensor",
"state_topic": f"{prefix}/{self.name}/input",
"payload_on": "1",
"payload_off": "0",
}
@dataclass
class ButtonInput(Relay):
@ -100,6 +121,15 @@ class DimmerRelay(ButtonInput):
else:
pass # TODO
def get_ha_components(self, prefix):
dev, btn = super().get_ha_components(prefix)
dev.update({
"brightness_state_topic": f"{prefix}/{self.name}/brightness",
"brightness_command_topic": f"{prefix}/{self.name}/action",
})
yield dev
yield btn
@dataclass
class ZentralAus(ButtonInput):
@ -119,6 +149,10 @@ class ZentralAus(ButtonInput):
def button_pressed(self, ms: int):
self.set_state(self.STATE_MAX)
def get_ha_components(self, prefix):
return
yield
class PowerCTL:
PREFIX = "foobar/oben"
@ -127,12 +161,12 @@ class PowerCTL:
self.bus = BusWLock(smbus.SMBus(1))
self.relays: dict[str, Relay] = {} # topic: relay
self.mqtc = mqtt.Client()
self.mqtc.connect("mqtt.chaospott.de", 1883, 60)
self.mqttc = mqtt.Client()
self.mqttc.connect("mqtt.chaospott.de", 1883, 60)
self.mqtc.on_message = self.on_message
self.mqtc.on_connect = self.on_connect
self.mqtc.on_subscribe = self.on_subscribe
self.mqttc.on_message = self.on_message
self.mqttc.on_connect = self.on_connect
self.mqttc.on_subscribe = self.on_subscribe
def add_relays(self, *relays: Relay):
for relay in relays:
@ -142,23 +176,42 @@ class PowerCTL:
topic = f"{self.PREFIX}/{relay.name}/action"
self.relays[topic] = relay
print("subscribe", topic)
self.mqtc.subscribe(topic, 0)
self.mqttc.subscribe(topic, 0)
def mqtt_send_state(self, relay: Relay):
if relay.state is not None:
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/status", {True: "on", False: "off"}[relay.state > relay.STATE_MIN], qos=0, retain=False)
self.mqttc.publish(f"{self.PREFIX}/{relay.name}/status", {True: "on", False: "off"}[relay.state > relay.STATE_MIN], qos=0, retain=False)
if relay.has_brightness:
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/brightness", relay.state, qos=0, retain=False)
self.mqttc.publish(f"{self.PREFIX}/{relay.name}/brightness", relay.state, qos=0, retain=False)
def mqtt_send_event(self, relay: ButtonInput, ev):
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/input", ev, qos=0, retain=False)
self.mqttc.publish(f"{self.PREFIX}/{relay.name}/input", ev, qos=0, retain=False)
def mqtt_register_ha(self):
self.mqttc.publish(f"homeassistant/device/powerctl/config", json.dumps(self.get_ha_device_config()), retain=True)
def on_connect(self, _mosq, _obj, connect_flags, _rc):
print("Connected", connect_flags)
if not connect_flags.get("session present", 0):
print("Re-Subscribe")
for topic in self.relays:
self.mqtc.subscribe(topic, 0)
self.mqttc.subscribe(topic, 0)
def get_ha_device_config(self):
return {
"dev": {
"ids": "powerctl",
"name": "PowerCTL",
"m": "foobar",
},
"origin": {
"name": "PowerCTL",
},
"cmps": {
r.name: cmp for r in self.relays.values()
for cmp in r.get_ha_components(self.PREFIX)
},
}
@staticmethod
def on_subscribe(_mosq, _obj, _mid, _granted_qos):
@ -219,13 +272,17 @@ def main():
*relays,
ZentralAus("zentralaus", 0x3a, 0, 0x21, 7, relays=relays),
)
# powerctl.mqtt_register_ha()
threading.Thread(target=powerctl.i2c_status_thread_new, daemon=True).start()
powerctl.mqtc.loop_start()
time.sleep(10)
while True:
powerctl.send_state()
time.sleep(60)
try:
powerctl.mqttc.loop_start()
time.sleep(10)
while True:
powerctl.send_state()
time.sleep(60)
except KeyboardInterrupt:
powerctl.mqttc.loop_stop()
if __name__ == '__main__':

View File

@ -1,4 +1,3 @@
import random
import time
from unittest.mock import patch
@ -25,8 +24,9 @@ class MqttClient(mqtt.Client):
return super().connect("127.0.0.1", *args, **kwargs)
# return super().connect(host, *args, **kwargs)
def publish(self, topic, payload=None, qos=0, retain=False, properties=None, ):
pass
def publish(self, topic, payload=None, qos=0, retain=False, properties=None):
print(f"publish topic={topic}, data={payload}")
super().publish(topic, payload, qos, retain, properties)
with patch('smbus.SMBus', new=SMBus) as SMBus_mock: