123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import paho.mqtt.client as mqtt
- import json,time
- import pymysql,dateutil
- import datetime
- from dateutil.parser import parse as date_parse
- import threading
- sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
- lastvalue={}
- # check values and insert into database
- def data2sql(acttime,msgtime,topic,value,mycursor):
- bsend=True
- iv=int(value)
- if topic in lastvalue:
- if iv == lastvalue[topic]:
- bsend=False
- if bsend:
- lastvalue[topic]=iv
- # print(sqlinsert.format(acttime,msgtime,topic,iv))
- try:
- sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
- # print(sqlcheck)
- except Exception as e:
- print(sqlinsert.format(acttime,msgtime,topic,iv))
- print(e)
- # The callback for when the client receives a CONNACK response from the server.
- def on_connect(client, userdata, flags, rc):
- print("Connected with result code "+str(rc))
- # Subscribing in on_connect() means that if we lose the connection and
- # reconnect then subscriptions will be renewed.
- client.subscribe("tele/+/STATE")
- # The callback for when a PUBLISH message is received from the server.
- def on_message(client, userdata, msg):
- try:
- mydb=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
- except:
- print("Could not connect to sql server! Quitting")
- else:
- mycursor=mydb.cursor(pymysql.cursors.DictCursor)
- acttime=int(1000*time.time())
- jpl=json.loads(msg.payload)
- ts="sp/"+msg.topic.split("/")[1]
- # print(ts+" "+str(jpl))
- str_to_dt=0
- if 'Time' in jpl:
- str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
- if 'LoadAvg' in jpl:
- try:
- data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
- except Exception as e:
- print(e)
- if 'POWER' in jpl:
- power=0
- if jpl['POWER']=='OFF':
- power=1
- try:
- data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
- except Exception as e:
- print(e)
- if 'Wifi' in jpl:
- jplw=jpl['Wifi']
- if 'RSSI' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
- except Exception as e:
- print(e)
- if 'Signal' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
- except Exception as e:
- print(e)
- if 'Channel' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
- except Exception as e:
- print(e)
- mycursor.close()
- mydb.commit()
- mydb.close()
- # The callback for when the client receives a CONNACK response from the server.
- def on_connect_sens(client, userdata, flags, rc):
- print("Connected with result code "+str(rc))
- # Subscribing in on_connect() means that if we lose the connection and
- # reconnect then subscriptions will be renewed.
- client.subscribe("tele/+/SENSOR")
- # The callback for when a PUBLISH message is received from the server.
- def on_message_sens(client, userdata, msg):
- try:
- mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
- except:
- print("Could not connect to sql server! Quitting")
- else:
- mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
- acttime=int(1000*time.time())
- jpl=json.loads(msg.payload)
- ts="sp/"+msg.topic.split("/")[1]
- # print(ts+" "+str(jpl))
- str_to_dt=0
- if 'Time' in jpl:
- str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
- if 'ENERGY' in jpl:
- jplw=jpl['ENERGY']
- if 'Power' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'ApparentPower' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'ReactivePower' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'Factor' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'Voltage' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'Total' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'Current' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
- except Exception as e:
- print(e)
- if 'Period' in jplw:
- try:
- data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
- except Exception as e:
- print(e)
- mycursor_sens.close()
- mydb_sens.commit()
- mydb_sens.close()
- # The callback for when the client receives a CONNACK response from the server.
- def on_connect_sns(client, userdata, flags, rc):
- print("Connected with result code "+str(rc))
- # Subscribing in on_connect() means that if we lose the connection and
- # reconnect then subscriptions will be renewed.
- client.subscribe("tele/+/sens/#")
- # The callback for when a PUBLISH message is received from the server.
- def on_message_sns(client, userdata, msg):
- try:
- mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
- except:
- print("Could not connect to sql server! Quitting")
- else:
- print(int(msg.payload.decode("utf-8")))
- mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
- acttime=int(1000*time.time())
- ts=msg.topic.replace("tele","sp")
- print(ts)
- try:
- data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
- except Exception as e:
- print(e)
- mycursor_sns.close()
- mydb_sns.commit()
- mydb_sns.close()
- # The callback for when the client receives a CONNACK response from the server.
- def on_connect_net(client, userdata, flags, rc):
- print("Connected with result code "+str(rc))
- # Subscribing in on_connect() means that if we lose the connection and
- # reconnect then subscriptions will be renewed.
- client.subscribe("tele/+/net/#")
- # The callback for when a PUBLISH message is received from the server.
- def on_message_net(client, userdata, msg):
- try:
- mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
- except:
- print("Could not connect to sql server! Quitting")
- else:
- print(int(msg.payload.decode("utf-8")))
- mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
- acttime=int(1000*time.time())
- ts=msg.topic.replace("tele","sp")
- print(ts)
- try:
- data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
- except Exception as e:
- print(e)
- mycursor_net.close()
- mydb_net.commit()
- mydb_net.close()
- client = mqtt.Client()
- client.on_connect = on_connect
- client.on_message = on_message
- client_sens = mqtt.Client()
- client_sens.on_connect = on_connect_sens
- client_sens.on_message = on_message_sens
- #client_sns = mqtt.Client()
- #client_sns.on_connect = on_connect_sns
- #client_sns.on_message = on_message_sns
- client_net = mqtt.Client()
- client_net.on_connect = on_connect_net
- client_net.on_message = on_message_net
- client.connect("172.24.41.2", 1883, 60)
- client_sens.connect("172.24.41.2", 1883, 60)
- #client_sns.connect("172.24.41.2", 1883, 60)
- client_net.connect("172.24.41.2", 1883, 60)
- # Blocking call that processes network traffic, dispatches callbacks and
- # handles reconnecting.
- # Other loop*() functions are available that give a threaded interface and a
- # manual interface.
- mq_state=threading.Thread(target=client.loop_forever)
- mq_sens=threading.Thread(target=client_sens.loop_forever)
- #mq_sns=threading.Thread(target=client_sns.loop_forever)
- mq_net=threading.Thread(target=client_net.loop_forever)
- #client_sns.loop_forever()
- #client.loop_forever()
- mq_state.start()
- mq_sens.start()
- #mq_sns.start()
- mq_net.start()
- while True:
- time.sleep(1)
|