[F] basic mqtt support added
This commit is contained in:
		
							
								
								
									
										57
									
								
								pi/power.py
									
									
									
									
									
								
							
							
						
						
									
										57
									
								
								pi/power.py
									
									
									
									
									
								
							@@ -9,8 +9,9 @@ import queue
 | 
			
		||||
import requests
 | 
			
		||||
import configparser
 | 
			
		||||
import logging
 | 
			
		||||
import paho.mqtt.publish as mqtt
 | 
			
		||||
 | 
			
		||||
def trans():
 | 
			
		||||
def trans_http():
 | 
			
		||||
	global serverconf
 | 
			
		||||
	# build together url and headers
 | 
			
		||||
	uri = ""
 | 
			
		||||
@@ -31,11 +32,8 @@ def trans():
 | 
			
		||||
		if ((queuetime - queuelast) >= 5):
 | 
			
		||||
			# put payload together
 | 
			
		||||
			payload =  {"val": str(queuetime) + ";" + str(queueval)}
 | 
			
		||||
			log.debug('request {} => {}'.format(uri, repr(payload)))
 | 
			
		||||
			# send to webserver via http post
 | 
			
		||||
			#
 | 
			
		||||
			# Um den Krempel hier muss noch mal eine While Schleife rum, damit die Daten nicht verloren gehen.
 | 
			
		||||
			#
 | 
			
		||||
			log.debug('request {}:{}{} => {}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic'], repr(payload)))
 | 
			
		||||
			# send to http server
 | 
			
		||||
			try:
 | 
			
		||||
				if serverconf.getboolean("basicauth") == True:
 | 
			
		||||
					r = requests.post(uri, data=payload, verify=False, auth=(serverconf["user"], serverconf["password"]), timeout=10, headers={'connection':'close'})
 | 
			
		||||
@@ -50,6 +48,36 @@ def trans():
 | 
			
		||||
			del queuedata
 | 
			
		||||
			log.info('dropped queued element')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def trans_mqtt():
 | 
			
		||||
	global mqttconf
 | 
			
		||||
	queuelast = time.time()
 | 
			
		||||
	log.info('sending data via mqtt host: {}, port: {}, topic:{}'.format(mqttconf['host'], mqttconf['port'], mqttconf['topic']))
 | 
			
		||||
	while True:
 | 
			
		||||
		# get value from queue (blocking)
 | 
			
		||||
		queuedata = powerqueue.get()
 | 
			
		||||
		powerqueue.task_done()
 | 
			
		||||
		queuetime = int(queuedata[0])
 | 
			
		||||
		queueval = str(queuedata[1])
 | 
			
		||||
		# if more than five seconds passed since last event
 | 
			
		||||
		if ((queuetime - queuelast) >= 5):
 | 
			
		||||
			# put payload together
 | 
			
		||||
			payload =  {"val": str(queuetime) + ";" + str(queueval)}
 | 
			
		||||
			log.debug('mqtt {} => {}'.format(uri, repr(payload)))
 | 
			
		||||
			# send to mqtt broker
 | 
			
		||||
			try:
 | 
			
		||||
				mqtt.single(mqttconf['topic'], payload=payload, retain=True, hostname=mqttconf['host'], 
 | 
			
		||||
					port=mqttconf.getint('port'), client_id=mqttconf['clientid'], transport="tcp")
 | 
			
		||||
			except BaseException as e:
 | 
			
		||||
				log.error('error while publishing: {}'.format(e))
 | 
			
		||||
			queuelast = queuetime
 | 
			
		||||
		
 | 
			
		||||
		else:
 | 
			
		||||
			# drop data if less than five seconds passed since last event
 | 
			
		||||
			del queuedata
 | 
			
		||||
			log.info('dropped queued element')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def readgpio():
 | 
			
		||||
	global gpiopath
 | 
			
		||||
	global gpioconf
 | 
			
		||||
@@ -99,13 +127,15 @@ if __name__ == "__main__":
 | 
			
		||||
	conf = configparser.ConfigParser()
 | 
			
		||||
	conf.read("powerpi.conf")
 | 
			
		||||
	conf.sections()
 | 
			
		||||
	transconf = conf['transport']
 | 
			
		||||
	gpioconf = conf['gpio']
 | 
			
		||||
	serverconf = conf['server']
 | 
			
		||||
	smconf = conf['smartmeter']
 | 
			
		||||
	mqttconf = conf['mqtt']
 | 
			
		||||
	# configure logging
 | 
			
		||||
	log = logging.getLogger(__name__)
 | 
			
		||||
	if serverconf["url"] == "power.example.com":
 | 
			
		||||
		log.critical('Server -> url still default value')
 | 
			
		||||
	if serverconf["url"] == "power.example.com" and mqttconf['host'] == "power.example.com":
 | 
			
		||||
		log.critical('Server/Broker -> url still default value')
 | 
			
		||||
		exit(1)
 | 
			
		||||
	gpiopath = "/sys/class/gpio/gpio" + gpioconf["port"] + "/"
 | 
			
		||||
	# initialise gpio interfaces
 | 
			
		||||
@@ -135,7 +165,16 @@ if __name__ == "__main__":
 | 
			
		||||
	
 | 
			
		||||
	# initialising and starting threads
 | 
			
		||||
	th1 = threading.Thread(target=readgpio)
 | 
			
		||||
	th2 = threading.Thread(target=trans)
 | 
			
		||||
	if transconf['transport'] == 'mqtt':
 | 
			
		||||
		log.info('method of transfer: mqtt')
 | 
			
		||||
		th2 = threading.Thread(target=trans_mqtt)
 | 
			
		||||
	elif transconf['transport'] == 'http':
 | 
			
		||||
		log.info('method of transfer: http')
 | 
			
		||||
		th2 = threading.Thread(target=trans_http)
 | 
			
		||||
	else:
 | 
			
		||||
		log.error('transfer option {} is invalid. Exiting!'.format(transfer['transfer']))
 | 
			
		||||
		sys.exit(1)
 | 
			
		||||
 | 
			
		||||
	th1.setDaemon(True)
 | 
			
		||||
	th2.setDaemon(True)
 | 
			
		||||
	th1.start()
 | 
			
		||||
 
 | 
			
		||||
@@ -1,3 +1,6 @@
 | 
			
		||||
[transport]
 | 
			
		||||
transport: mqtt
 | 
			
		||||
 | 
			
		||||
[smartmeter]
 | 
			
		||||
impkwh: 2000
 | 
			
		||||
 | 
			
		||||
@@ -11,4 +14,10 @@ ssl: yes
 | 
			
		||||
basicauth: yes
 | 
			
		||||
user: basicUser
 | 
			
		||||
password: basicPass
 | 
			
		||||
sslself: yes
 | 
			
		||||
sslself: yes
 | 
			
		||||
 | 
			
		||||
[mqtt]
 | 
			
		||||
host: power.example.com
 | 
			
		||||
port: 1883
 | 
			
		||||
clientid: changeme
 | 
			
		||||
topic: /foo/bar/power
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user