#!/usr/bin/env python3 import configparser import logging import os import queue import sys import threading import time from select import poll, POLLERR import paho.mqtt.publish as mqtt import requests def trans_http(): global serverconf # build together url and headers uri = "" if serverconf.getboolean("ssl"): uri = "https://" else: uri = "http://" uri = uri + serverconf["url"] + "/get.php" log.info('Remote URI: {}'.format(uri)) queuelast = time.time() while True: # get value from queue (blocking) queuedata = powerqueue.get() powerqueue.task_done() queuetime = int(queuedata[0]) queueval = str(queuedata[1]) # if more than five seconds passed since last event if (queuetime - queuelast) >= 5: # put payload together if serverconf.getboolean('timestamp'): payload = {"val": str(queuetime) + ";" + str(queueval)} else: payload = {"val": str(queueval)} log.debug('request {}:{}{} => {}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'], repr(payload))) # send to http server try: if serverconf.getboolean("basicauth"): r = requests.post(uri, data=payload, verify=False, auth=(serverconf["user"], serverconf["password"]), timeout=10, headers={'connection': 'close'}) else: r = requests.post(uri, data=payload, verify=False, timeout=10, headers={'connection': 'close'}) except: log.error('server response: {}'.format(str(r.status_code))) queuelast = queuetime else: # drop data if less than five seconds passed since last event del queuedata log.info('dropped queued element') def trans_mqtt(): global mqttconf queuelast = time.time() log.info('sending data via mqtt host: {}, port: {}, topic:{}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'])) while True: # get value from queue (blocking) queuedata = powerqueue.get() powerqueue.task_done() queuetime = int(queuedata[0]) queueval = str(queuedata[1]) # if more than five seconds passed since last event if (queuetime - queuelast) >= 5: # put payload together if mqttconf.getboolean('timestamp'): payload = str(queuetime) + ";" + str(queueval) else: payload = str(queueval) log.debug('mqtt {} => {}'.format(mqttconf['host'], repr(payload))) # send to mqtt broker try: mqtt.single(mqttconf['topic'], payload=payload, retain=True, hostname=mqttconf['host'], port=mqttconf.getint('port'), client_id=mqttconf['clientid'], transport="tcp") except BaseException as e: log.error('error while publishing: {}'.format(e)) queuelast = queuetime else: # drop data if less than five seconds passed since last event del queuedata log.info('dropped queued element') def readgpio(): global gpiopath global gpioconf global smconf # open gpio file handle gpio = open(gpiopath + "value", "r") # setup polling gpiopoll = poll() gpiopoll.register(gpio, POLLERR) # wait for two interrupts to have a "clean" starting point log.info('waiting for 2 interrupts to get a clean start') gpioevent = gpiopoll.poll() gpio.read() gpio.seek(0) log.info('waiting for 1 interrupt to get a clean start...') gpioevent = gpiopoll.poll() last = time.time() gpio.read() gpio.seek(0) # start readgpio mainloop log.info('start readgpio mainloop') while True: gpioevent = gpiopoll.poll() gpioval = gpio.read(1) gpio.seek(0) # check if interrupt was a falling edge (yes, really!) if gpioval == '0': now = time.time() # plausibility check if (now - last) >= gpioconf.getfloat("mintime"): # calculate current power consumption power = round((3600000 / smconf.getint("impkwh")) / (now - last), 2) log.info('Current power consumption: {} Watt'.format(str(power))) log.debug('GPIO state: {}'.format(gpioval)) # put measured value on queue powerqueue.put([now, power]) last = now # error if a tty is connected and a raising edge was triggered else: log.debug('Not a falling edge, ignoring') if __name__ == "__main__": # configure logging log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.INFO) # read config dirname, filename = os.path.split(os.path.abspath(__file__)) conf = configparser.ConfigParser() try: conf.read(os.path.join(dirname, "powerpi.local.conf")) except: conf.read(os.path.join(dirname, "powerpi.conf")) log.warning('using global config. Use powerpi.local.conf instead') conf.sections() transconf = conf['transport'] gpioconf = conf['gpio'] serverconf = conf['server'] smconf = conf['smartmeter'] mqttconf = conf['mqtt'] log.info('Running in path {}'.format(dirname)) if serverconf["url"] == "power.example.com" and mqttconf['host'] == "power.example.com": log.critical('Server/Broker -> url still default value') exit(1) gpiopath = "/sys/class/gpio/gpio" + gpioconf["port"] + "/" # initialise gpio interfaces try: if not os.path.exists(gpiopath): # if not already exported, export gpio port gpioinit = open("/sys/class/gpio/export", "w") gpioinit.write(gpioconf["port"] + "\n") gpioinit.close() # set direction of gpio port to "IN" gpiopin = open(gpiopath + "direction", "w") gpiopin.write("in") gpiopin.close() # set trigger to falling edge gpiotype = open(gpiopath + "edge", "w") gpiotype.write("falling") gpiotype.close() except: log.critical('can not initialize gpio interface. aborting.') sys.exit(1) # defining queue for measured values powerqueue = queue.Queue() # disable ssl "no verification" warnings if serverconf.getboolean("sslself"): requests.packages.urllib3.disable_warnings() # initialising and starting threads th1 = threading.Thread(target=readgpio) if transconf['transport'] == 'mqtt': log.info('method of transfer: mqtt') th2 = threading.Thread(target=trans_mqtt) elif transconf['transport'] == 'http': log.info('method of transfer: http') th2 = threading.Thread(target=trans_http) else: log.error('transfer option {} is invalid. Exiting!'.format(transconf['transport'])) sys.exit(1) th1.setDaemon(True) th2.setDaemon(True) th1.start() th2.start() # busy loop while True: time.sleep(1)