mqtt2call_sql.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import paho.mqtt.client as mqtt
  2. import json,time
  3. import pymysql,dateutil
  4. import datetime
  5. from dateutil.parser import parse as date_parse
  6. import threading
  7. #sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
  8. sqlinsert="call insert_data('{2}',{3:d},{1:d})"
  9. lastvalue={}
  10. # check values and insert into database
  11. def data2sql(acttime,msgtime,topic,value,mycursor):
  12. bsend=True
  13. iv=int(value)
  14. if topic in lastvalue:
  15. if iv == lastvalue[topic]:
  16. bsend=False
  17. if bsend:
  18. lastvalue[topic]=iv
  19. # print(sqlinsert.format(acttime,msgtime,topic,iv))
  20. try:
  21. sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
  22. # print(sqlcheck)
  23. except Exception as e:
  24. print(sqlinsert.format(acttime,msgtime,topic,iv))
  25. print(e)
  26. # The callback for when the client receives a CONNACK response from the server.
  27. def on_connect(client, userdata, flags, rc):
  28. print("Connected with result code "+str(rc))
  29. # Subscribing in on_connect() means that if we lose the connection and
  30. # reconnect then subscriptions will be renewed.
  31. client.subscribe("tele/+/STATE")
  32. # The callback for when a PUBLISH message is received from the server.
  33. def on_message(client, userdata, msg):
  34. try:
  35. mydb=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
  36. except:
  37. print("Could not connect to sql server! Quitting")
  38. else:
  39. mycursor=mydb.cursor(pymysql.cursors.DictCursor)
  40. acttime=int(1000*time.time())
  41. jpl=json.loads(msg.payload)
  42. ts="sp/"+msg.topic.split("/")[1]
  43. # print(ts+" "+str(jpl))
  44. str_to_dt=0
  45. if 'Time' in jpl:
  46. str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
  47. if 'LoadAvg' in jpl:
  48. try:
  49. data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
  50. except Exception as e:
  51. print(e)
  52. if 'POWER' in jpl:
  53. power=0
  54. if jpl['POWER']=='OFF':
  55. power=1
  56. try:
  57. data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
  58. except Exception as e:
  59. print(e)
  60. if 'Wifi' in jpl:
  61. jplw=jpl['Wifi']
  62. if 'RSSI' in jplw:
  63. try:
  64. data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
  65. except Exception as e:
  66. print(e)
  67. if 'Signal' in jplw:
  68. try:
  69. data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
  70. except Exception as e:
  71. print(e)
  72. if 'Channel' in jplw:
  73. try:
  74. data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
  75. except Exception as e:
  76. print(e)
  77. mycursor.close()
  78. mydb.commit()
  79. mydb.close()
  80. # The callback for when the client receives a CONNACK response from the server.
  81. def on_connect_sens(client, userdata, flags, rc):
  82. print("Connected with result code "+str(rc))
  83. # Subscribing in on_connect() means that if we lose the connection and
  84. # reconnect then subscriptions will be renewed.
  85. client.subscribe("tele/+/SENSOR")
  86. # The callback for when a PUBLISH message is received from the server.
  87. def on_message_sens(client, userdata, msg):
  88. try:
  89. mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
  90. except:
  91. print("Could not connect to sql server! Quitting")
  92. else:
  93. mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
  94. acttime=int(1000*time.time())
  95. jpl=json.loads(msg.payload)
  96. ts="sp/"+msg.topic.split("/")[1]
  97. # print(ts+" "+str(jpl))
  98. str_to_dt=0
  99. if 'Time' in jpl:
  100. str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
  101. if 'ENERGY' in jpl:
  102. jplw=jpl['ENERGY']
  103. if 'Power' in jplw:
  104. try:
  105. data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
  106. except Exception as e:
  107. print(e)
  108. if 'ApparentPower' in jplw:
  109. try:
  110. data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
  111. except Exception as e:
  112. print(e)
  113. if 'ReactivePower' in jplw:
  114. try:
  115. data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
  116. except Exception as e:
  117. print(e)
  118. if 'Factor' in jplw:
  119. try:
  120. data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
  121. except Exception as e:
  122. print(e)
  123. if 'Voltage' in jplw:
  124. try:
  125. data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
  126. except Exception as e:
  127. print(e)
  128. if 'Total' in jplw:
  129. try:
  130. data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
  131. except Exception as e:
  132. print(e)
  133. if 'Current' in jplw:
  134. try:
  135. data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
  136. except Exception as e:
  137. print(e)
  138. if 'Period' in jplw:
  139. try:
  140. data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
  141. except Exception as e:
  142. print(e)
  143. mycursor_sens.close()
  144. mydb_sens.commit()
  145. mydb_sens.close()
  146. # The callback for when the client receives a CONNACK response from the server.
  147. def on_connect_sns(client, userdata, flags, rc):
  148. print("Connected with result code "+str(rc))
  149. # Subscribing in on_connect() means that if we lose the connection and
  150. # reconnect then subscriptions will be renewed.
  151. client.subscribe("tele/+/sens/#")
  152. # The callback for when a PUBLISH message is received from the server.
  153. def on_message_sns(client, userdata, msg):
  154. try:
  155. mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
  156. except:
  157. print("Could not connect to sql server! Quitting")
  158. else:
  159. print(int(msg.payload.decode("utf-8")))
  160. mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
  161. acttime=int(1000*time.time())
  162. ts=msg.topic.replace("tele","sp")
  163. print(ts)
  164. try:
  165. data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
  166. except Exception as e:
  167. print(e)
  168. mycursor_sns.close()
  169. mydb_sns.commit()
  170. mydb_sns.close()
  171. # The callback for when the client receives a CONNACK response from the server.
  172. def on_connect_net(client, userdata, flags, rc):
  173. print("Connected with result code "+str(rc))
  174. # Subscribing in on_connect() means that if we lose the connection and
  175. # reconnect then subscriptions will be renewed.
  176. client.subscribe("tele/+/net/#")
  177. # The callback for when a PUBLISH message is received from the server.
  178. def on_message_net(client, userdata, msg):
  179. try:
  180. mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
  181. except:
  182. print("Could not connect to sql server! Quitting")
  183. else:
  184. print(int(msg.payload.decode("utf-8")))
  185. mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
  186. acttime=int(1000*time.time())
  187. ts=msg.topic.replace("tele","sp")
  188. print(ts)
  189. try:
  190. data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
  191. except Exception as e:
  192. print(e)
  193. mycursor_net.close()
  194. mydb_net.commit()
  195. mydb_net.close()
  196. client = mqtt.Client()
  197. client.on_connect = on_connect
  198. client.on_message = on_message
  199. client_sens = mqtt.Client()
  200. client_sens.on_connect = on_connect_sens
  201. client_sens.on_message = on_message_sens
  202. #client_sns = mqtt.Client()
  203. #client_sns.on_connect = on_connect_sns
  204. #client_sns.on_message = on_message_sns
  205. client_net = mqtt.Client()
  206. client_net.on_connect = on_connect_net
  207. client_net.on_message = on_message_net
  208. client.connect("172.24.41.2", 1883, 60)
  209. client_sens.connect("172.24.41.2", 1883, 60)
  210. #client_sns.connect("172.24.41.2", 1883, 60)
  211. client_net.connect("172.24.41.2", 1883, 60)
  212. # Blocking call that processes network traffic, dispatches callbacks and
  213. # handles reconnecting.
  214. # Other loop*() functions are available that give a threaded interface and a
  215. # manual interface.
  216. mq_state=threading.Thread(target=client.loop_forever)
  217. mq_sens=threading.Thread(target=client_sens.loop_forever)
  218. #mq_sns=threading.Thread(target=client_sns.loop_forever)
  219. mq_net=threading.Thread(target=client_net.loop_forever)
  220. #client_sns.loop_forever()
  221. #client.loop_forever()
  222. mq_state.start()
  223. mq_sens.start()
  224. #mq_sns.start()
  225. mq_net.start()
  226. while True:
  227. time.sleep(1)