From d9aa59476507dd287a7ce4d0e0c1a078c38eabda Mon Sep 17 00:00:00 2001 From: telegnom <max@telegnom.org> Date: Mon, 2 Oct 2017 12:42:33 +0200 Subject: [PATCH] [F] basic mqtt support added --- pi/power.py | 57 +++++++++++++++++++++++++++++++++++++++++-------- pi/powerpi.conf | 11 +++++++++- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/pi/power.py b/pi/power.py index bde20a1..067a7bc 100755 --- a/pi/power.py +++ b/pi/power.py @@ -9,8 +9,9 @@ import queue import requests import configparser import logging +import paho.mqtt.publish as mqtt -def trans(): +def trans_http(): global serverconf # build together url and headers uri = "" @@ -31,11 +32,8 @@ def trans(): if ((queuetime - queuelast) >= 5): # put payload together payload = {"val": str(queuetime) + ";" + str(queueval)} - log.debug('request {} => {}'.format(uri, repr(payload))) - # send to webserver via http post - # - # Um den Krempel hier muss noch mal eine While Schleife rum, damit die Daten nicht verloren gehen. - # + log.debug('request {}:{}{} => {}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'], repr(payload))) + # send to http server try: if serverconf.getboolean("basicauth") == True: r = requests.post(uri, data=payload, verify=False, auth=(serverconf["user"], serverconf["password"]), timeout=10, headers={'connection':'close'}) @@ -50,6 +48,36 @@ def trans(): del queuedata log.info('dropped queued element') + +def trans_mqtt(): + global mqttconf + queuelast = time.time() + log.info('sending data via mqtt host: {}, port: {}, topic:{}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'])) + while True: + # get value from queue (blocking) + queuedata = powerqueue.get() + powerqueue.task_done() + queuetime = int(queuedata[0]) + queueval = str(queuedata[1]) + # if more than five seconds passed since last event + if ((queuetime - queuelast) >= 5): + # put payload together + payload = {"val": str(queuetime) + ";" + str(queueval)} + log.debug('mqtt {} => {}'.format(uri, repr(payload))) + # send to mqtt broker + try: + mqtt.single(mqttconf['topic'], payload=payload, retain=True, hostname=mqttconf['host'], + port=mqttconf.getint('port'), client_id=mqttconf['clientid'], transport="tcp") + except BaseException as e: + log.error('error while publishing: {}'.format(e)) + queuelast = queuetime + + else: + # drop data if less than five seconds passed since last event + del queuedata + log.info('dropped queued element') + + def readgpio(): global gpiopath global gpioconf @@ -99,13 +127,15 @@ if __name__ == "__main__": conf = configparser.ConfigParser() conf.read("powerpi.conf") conf.sections() + transconf = conf['transport'] gpioconf = conf['gpio'] serverconf = conf['server'] smconf = conf['smartmeter'] + mqttconf = conf['mqtt'] # configure logging log = logging.getLogger(__name__) - if serverconf["url"] == "power.example.com": - log.critical('Server -> url still default value') + if serverconf["url"] == "power.example.com" and mqttconf['host'] == "power.example.com": + log.critical('Server/Broker -> url still default value') exit(1) gpiopath = "/sys/class/gpio/gpio" + gpioconf["port"] + "/" # initialise gpio interfaces @@ -135,7 +165,16 @@ if __name__ == "__main__": # initialising and starting threads th1 = threading.Thread(target=readgpio) - th2 = threading.Thread(target=trans) + if transconf['transport'] == 'mqtt': + log.info('method of transfer: mqtt') + th2 = threading.Thread(target=trans_mqtt) + elif transconf['transport'] == 'http': + log.info('method of transfer: http') + th2 = threading.Thread(target=trans_http) + else: + log.error('transfer option {} is invalid. Exiting!'.format(transfer['transfer'])) + sys.exit(1) + th1.setDaemon(True) th2.setDaemon(True) th1.start() diff --git a/pi/powerpi.conf b/pi/powerpi.conf index 48776b6..f1405ca 100644 --- a/pi/powerpi.conf +++ b/pi/powerpi.conf @@ -1,3 +1,6 @@ +[transport] +transport: mqtt + [smartmeter] impkwh: 2000 @@ -11,4 +14,10 @@ ssl: yes basicauth: yes user: basicUser password: basicPass -sslself: yes \ No newline at end of file +sslself: yes + +[mqtt] +host: power.example.com +port: 1883 +clientid: changeme +topic: /foo/bar/power