230 lines
6.7 KiB
Python
Executable File
230 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
import paho.mqtt.client as mqtt
|
|
import smbus
|
|
|
|
|
|
class BusWLock:
|
|
def __init__(self, bus: smbus.SMBus):
|
|
self.bus = bus
|
|
self.lock = threading.Lock()
|
|
|
|
|
|
@dataclass
|
|
class Relay:
|
|
name: str
|
|
i2c_read_addr: int
|
|
i2c_read_bit: int
|
|
i2c_write_addr: int
|
|
i2c_write_bit: int
|
|
|
|
bus: BusWLock = None
|
|
state: int = 0
|
|
has_brightness: bool = False
|
|
on_state_change = lambda s, _: _
|
|
on_button_event = lambda s, _s, _ev: None
|
|
|
|
STATE_MIN = 0
|
|
STATE_MAX = 255
|
|
|
|
def set_state(self, state):
|
|
if state == self.state:
|
|
return
|
|
if self.STATE_MIN < state < self.STATE_MAX:
|
|
self._trigger(4 * state / 100 + 1)
|
|
else:
|
|
self._trigger()
|
|
self.state = state
|
|
print("set", self.name, self.state)
|
|
self.on_state_change(self)
|
|
|
|
def update_state(self, state):
|
|
if self.state == state:
|
|
return
|
|
self.state = state
|
|
print("update", self.name, self.state)
|
|
self.on_state_change(self)
|
|
|
|
def _trigger(self, duration=0.5):
|
|
data_mask = 0xff ^ (1 << self.i2c_write_bit)
|
|
with self.bus.lock:
|
|
print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask))
|
|
self.bus.bus.write_byte(self.i2c_write_addr, data_mask)
|
|
time.sleep(duration)
|
|
self.bus.bus.write_byte(self.i2c_write_addr, 255)
|
|
print("write", time.monotonic(), self.name, hex(self.i2c_write_addr), self.i2c_write_bit, hex(data_mask))
|
|
|
|
def process_event(self, active):
|
|
new_state = self.STATE_MAX if active else self.STATE_MIN
|
|
if self.state != new_state:
|
|
self.on_button_event(self, 1)
|
|
self.on_button_event(self, 0)
|
|
self.update_state(new_state)
|
|
|
|
|
|
@dataclass
|
|
class ButtonInput(Relay):
|
|
pressed_since: int | None = None
|
|
|
|
def button_pressed(self, ms: int):
|
|
...
|
|
|
|
def process_event(self, active: bool):
|
|
if active:
|
|
if self.pressed_since is None:
|
|
self.pressed_since = time.monotonic_ns()
|
|
self.on_button_event(self, 1)
|
|
else:
|
|
if self.pressed_since is not None:
|
|
self.button_pressed((time.monotonic_ns() - self.pressed_since) // 1000000) # ms
|
|
self.on_button_event(self, 0)
|
|
self.pressed_since = None
|
|
|
|
|
|
@dataclass
|
|
class DimmerRelay(ButtonInput):
|
|
def set_state(self, state):
|
|
super().set_state(self.STATE_MAX if state > self.STATE_MIN else self.STATE_MIN)
|
|
|
|
def __post_init__(self):
|
|
self.has_brightness = True
|
|
|
|
def button_pressed(self, ms: int):
|
|
print(self.name, "pressed for", ms)
|
|
if ms < 600:
|
|
self.update_state(self.STATE_MIN if self.state else self.STATE_MAX)
|
|
else:
|
|
pass # TODO
|
|
|
|
|
|
@dataclass
|
|
class ZentralAus(ButtonInput):
|
|
relays: list[Relay] = None
|
|
|
|
def __post_init__(self):
|
|
self.state: None = None
|
|
|
|
def set_state(self, state):
|
|
if state == self.STATE_MAX:
|
|
for relay in self.relays:
|
|
relay.set_state(relay.STATE_MIN)
|
|
|
|
def update_state(self, state):
|
|
return False
|
|
|
|
def button_pressed(self, ms: int):
|
|
self.set_state(self.STATE_MAX)
|
|
|
|
|
|
class PowerCTL:
|
|
PREFIX = "foobar/oben"
|
|
|
|
def __init__(self):
|
|
self.bus = BusWLock(smbus.SMBus(1))
|
|
self.relays: dict[str, Relay] = {} # topic: relay
|
|
|
|
self.mqtc = mqtt.Client()
|
|
self.mqtc.connect("mqtt.chaospott.de", 1883, 60)
|
|
|
|
self.mqtc.on_message = self.on_message
|
|
self.mqtc.on_connect = self.on_connect
|
|
self.mqtc.on_subscribe = self.on_subscribe
|
|
|
|
def add_relays(self, *relays: Relay):
|
|
for relay in relays:
|
|
relay.bus = self.bus
|
|
relay.on_state_change = self.mqtt_send_state
|
|
relay.on_button_event = self.mqtt_send_event
|
|
topic = f"{self.PREFIX}/{relay.name}/action"
|
|
self.relays[topic] = relay
|
|
print("subscribe", topic)
|
|
self.mqtc.subscribe(topic, 0)
|
|
|
|
def mqtt_send_state(self, relay: Relay):
|
|
if relay.state is not None:
|
|
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/status", {True: "on", False: "off"}[relay.state > relay.STATE_MIN], qos=0, retain=False)
|
|
if relay.has_brightness:
|
|
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/brightness", relay.state, qos=0, retain=False)
|
|
|
|
def mqtt_send_event(self, relay: ButtonInput, ev):
|
|
self.mqtc.publish(f"{self.PREFIX}/{relay.name}/input", ev, qos=0, retain=False)
|
|
|
|
@staticmethod
|
|
def on_connect(_mosq, _obj, _foo, _bar):
|
|
print("Connected")
|
|
|
|
@staticmethod
|
|
def on_subscribe(_mosq, _obj, _mid, _granted_qos):
|
|
print("Subscribed OK")
|
|
|
|
def on_message(self, _mosq, _obj, msg):
|
|
relay = self.relays.get(msg.topic)
|
|
if relay is None:
|
|
return
|
|
|
|
payload = msg.payload.decode("utf-8")
|
|
if payload == "on":
|
|
state = relay.STATE_MAX
|
|
elif payload == "off":
|
|
state = relay.STATE_MIN
|
|
else:
|
|
try:
|
|
state = int(payload)
|
|
except ValueError:
|
|
return
|
|
|
|
relay.set_state(state)
|
|
|
|
def i2c_status_thread_new(self):
|
|
while True:
|
|
bus_cache = {}
|
|
for relay in self.relays.values():
|
|
if relay.i2c_read_addr not in bus_cache:
|
|
with self.bus.lock:
|
|
bus_cache[relay.i2c_read_addr] = self.bus.bus.read_byte(relay.i2c_read_addr)
|
|
i2c_data = bus_cache[relay.i2c_read_addr]
|
|
|
|
active = False if i2c_data & (1 << relay.i2c_read_bit) else True
|
|
relay.process_event(active)
|
|
|
|
time.sleep(0.1) # abtastrate für die schalter
|
|
|
|
def send_state(self):
|
|
for relay in self.relays.values():
|
|
self.mqtt_send_state(relay)
|
|
|
|
|
|
def main():
|
|
powerctl = PowerCTL()
|
|
|
|
relays = [
|
|
Relay("flur/licht", 0x23, 3, 0x21, 0),
|
|
Relay("cantina/licht", 0x23, 2, 0x3f, 0),
|
|
DimmerRelay("baellebad/licht", 0x23, 5, 0x21, 1),
|
|
DimmerRelay("lounge-front/licht", 0x23, 4, 0x21, 2),
|
|
DimmerRelay("lounge-back/licht", 0x23, 6, 0x21, 3),
|
|
|
|
Relay("baellebad/strom", 0x23, 0, 0x21, 4),
|
|
Relay("lounge/strom", 0x23, 1, 0x21, 5),
|
|
Relay("cantina/strom", 0x23, 7, 0x21, 6),
|
|
]
|
|
powerctl.add_relays(
|
|
*relays,
|
|
ZentralAus("zentralaus", 0x3a, 0, 0x21, 7, relays=relays),
|
|
)
|
|
|
|
threading.Thread(target=powerctl.i2c_status_thread_new, daemon=True).start()
|
|
powerctl.mqtc.loop_start()
|
|
time.sleep(10)
|
|
while True:
|
|
powerctl.send_state()
|
|
time.sleep(60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|