powerpi/pi/power.py
2017-10-02 16:14:05 +02:00

201 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
from select import poll, POLLPRI, POLLIN, POLLERR
import sys
import os
import threading
import time
import queue
import requests
import configparser
import logging
import paho.mqtt.publish as mqtt
def trans_http():
global serverconf
# build together url and headers
uri = ""
if serverconf.getboolean("ssl") == True:
uri = "https://"
else:
uri = "http://"
uri = uri + serverconf["url"] + "/get.php"
log.info('Remote URI: {}'.format(uri))
queuelast = time.time()
while True:
# get value from queue (blocking)
queuedata = powerqueue.get()
powerqueue.task_done()
queuetime = int(queuedata[0])
queueval = str(queuedata[1])
# if more than five seconds passed since last event
if ((queuetime - queuelast) >= 5):
# put payload together
if serverconf.getboolean('timestamp'):
payload = {"val": str(queuetime) + ";" + str(queueval)}
else:
payload = {"val": str(queueval)}
log.debug('request {}:{}{} => {}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'], repr(payload)))
# send to http server
try:
if serverconf.getboolean("basicauth") == True:
r = requests.post(uri, data=payload, verify=False, auth=(serverconf["user"], serverconf["password"]), timeout=10, headers={'connection':'close'})
else:
r = requests.post(uri, data=payload, verify=False, timeout=10, headers={'connection':'close'})
except:
log.error('server response: {}'.format(str(r.status_code)))
queuelast = queuetime
else:
# drop data if less than five seconds passed since last event
del queuedata
log.info('dropped queued element')
def trans_mqtt():
global mqttconf
queuelast = time.time()
log.info('sending data via mqtt host: {}, port: {}, topic:{}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic']))
while True:
# get value from queue (blocking)
queuedata = powerqueue.get()
powerqueue.task_done()
queuetime = int(queuedata[0])
queueval = str(queuedata[1])
# if more than five seconds passed since last event
if ((queuetime - queuelast) >= 5):
# put payload together
if mqttconf.getboolean('timestamp'):
payload = str(queuetime) + ";" + str(queueval)
else:
payload = str(queueval)
log.debug('mqtt {} => {}'.format(mqttconf['host'], repr(payload)))
# send to mqtt broker
try:
mqtt.single(mqttconf['topic'], payload=payload, retain=True, hostname=mqttconf['host'],
port=mqttconf.getint('port'), client_id=mqttconf['clientid'], transport="tcp")
except BaseException as e:
log.error('error while publishing: {}'.format(e))
queuelast = queuetime
else:
# drop data if less than five seconds passed since last event
del queuedata
log.info('dropped queued element')
def readgpio():
global gpiopath
global gpioconf
global smconf
# open gpio filehandle
gpio = open(gpiopath + "value", "r")
# setup polling
gpiopoll = poll()
gpiopoll.register(gpio, POLLERR)
# wait for two interrupts to have a "clean" starting point
log.info('waiting for 2 interrupts to get a clean start')
gpioevent = gpiopoll.poll()
gpio.read()
gpio.seek(0)
log.info('waiting for 1 interrupt to get a clean start...')
gpioevent = gpiopoll.poll()
last = time.time()
gpio.read()
gpio.seek(0)
# start readgpio mainloop
log.info('start readgpio mainloop')
while True:
gpioevent = gpiopoll.poll()
gpioval = gpio.read(1)
gpio.seek(0)
# check if interrupt was a falling edge (yes, really!)
if gpioval == '0':
now = time.time()
# plausibility check
if ((now-last) >= gpioconf.getfloat("mintime")):
# calculate current power consumption
power = round((3600000/smconf.getint("impkwh")) / (now-last),2)
log.info('Current power consumption: {} Watt'.format(str(power)))
log.debug('GPIO state: {}'.format(gpioval))
# put measured value on queue
powerqueue.put([now, power])
last = now
# error if a tty is connected and a raising edge was triggered
else:
log.debug('Not a falling edge, ignoring')
if __name__ == "__main__":
# read config
dirname, filename = os.path.split(os.path.abspath(__file__))
conf = configparser.ConfigParser()
try:
conf.read(os.path.join(dirname, "powerpi.local.conf"))
except:
conf.read(os.path.join(dirname, "powerpi.conf"))
log.warning('using global config. Use powerpi.local.conf instead')
conf.sections()
transconf = conf['transport']
gpioconf = conf['gpio']
serverconf = conf['server']
smconf = conf['smartmeter']
mqttconf = conf['mqtt']
# configure logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
log.info('Running in path {}'.format(dirname))
if serverconf["url"] == "power.example.com" and mqttconf['host'] == "power.example.com":
log.critical('Server/Broker -> url still default value')
exit(1)
gpiopath = "/sys/class/gpio/gpio" + gpioconf["port"] + "/"
# initialise gpio interfaces
try:
if not os.path.exists(gpiopath):
# if not already exported, export gpio port
gpioinit = open("/sys/class/gpio/export", "w")
gpioinit.write(gpioconf["port"] + "\n")
gpioinit.close()
# set direction of gpio port to "IN"
gpiopin = open(gpiopath + "direction", "w")
gpiopin.write("in")
gpiopin.close()
# set trigger to falling edge
gpiotype = open(gpiopath + "edge", "w")
gpiotype.write("falling")
gpiotype.close()
except:
log.critical('can not initialize gpio interface. aborting.')
sys.exit(1)
# defining queue for measured values
powerqueue = queue.Queue()
# disable ssl "no verification" warnings
if serverconf.getboolean("sslself") == True:
requests.packages.urllib3.disable_warnings()
# initialising and starting threads
th1 = threading.Thread(target=readgpio)
if transconf['transport'] == 'mqtt':
log.info('method of transfer: mqtt')
th2 = threading.Thread(target=trans_mqtt)
elif transconf['transport'] == 'http':
log.info('method of transfer: http')
th2 = threading.Thread(target=trans_http)
else:
log.error('transfer option {} is invalid. Exiting!'.format(transfer['transfer']))
sys.exit(1)
th1.setDaemon(True)
th2.setDaemon(True)
th1.start()
th2.start()
#busy loop
while True:
time.sleep(1)