mqtt2sql.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import paho.mqtt.client as mqtt
  2. import json,time
  3. import pymysql,dateutil
  4. import datetime
  5. from dateutil.parser import parse as date_parse
  6. import threading
  7. sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
  8. lastvalue={}
  9. # check values and insert into database
  10. def data2sql(acttime,msgtime,topic,value,mycursor):
  11. bsend=True
  12. iv=int(value)
  13. if topic in lastvalue:
  14. if iv == lastvalue[topic]:
  15. bsend=False
  16. if bsend:
  17. lastvalue[topic]=iv
  18. # print(sqlinsert.format(acttime,msgtime,topic,iv))
  19. try:
  20. sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
  21. # print(sqlcheck)
  22. except Exception as e:
  23. print(sqlinsert.format(acttime,msgtime,topic,iv))
  24. print(e)
  25. # The callback for when the client receives a CONNACK response from the server.
  26. def on_connect(client, userdata, flags, rc):
  27. print("Connected with result code "+str(rc))
  28. # Subscribing in on_connect() means that if we lose the connection and
  29. # reconnect then subscriptions will be renewed.
  30. client.subscribe("tele/+/STATE")
  31. # The callback for when a PUBLISH message is received from the server.
  32. def on_message(client, userdata, msg):
  33. try:
  34. mydb=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
  35. except:
  36. print("Could not connect to sql server! Quitting")
  37. else:
  38. mycursor=mydb.cursor(pymysql.cursors.DictCursor)
  39. acttime=int(1000*time.time())
  40. jpl=json.loads(msg.payload)
  41. ts="sp/"+msg.topic.split("/")[1]
  42. # print(ts+" "+str(jpl))
  43. str_to_dt=0
  44. if 'Time' in jpl:
  45. str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
  46. if 'LoadAvg' in jpl:
  47. try:
  48. data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
  49. except Exception as e:
  50. print(e)
  51. if 'POWER' in jpl:
  52. power=0
  53. if jpl['POWER']=='OFF':
  54. power=1
  55. try:
  56. data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
  57. except Exception as e:
  58. print(e)
  59. if 'Wifi' in jpl:
  60. jplw=jpl['Wifi']
  61. if 'RSSI' in jplw:
  62. try:
  63. data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
  64. except Exception as e:
  65. print(e)
  66. if 'Signal' in jplw:
  67. try:
  68. data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
  69. except Exception as e:
  70. print(e)
  71. if 'Channel' in jplw:
  72. try:
  73. data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
  74. except Exception as e:
  75. print(e)
  76. mycursor.close()
  77. mydb.commit()
  78. mydb.close()
  79. # The callback for when the client receives a CONNACK response from the server.
  80. def on_connect_sens(client, userdata, flags, rc):
  81. print("Connected with result code "+str(rc))
  82. # Subscribing in on_connect() means that if we lose the connection and
  83. # reconnect then subscriptions will be renewed.
  84. client.subscribe("tele/+/SENSOR")
  85. # The callback for when a PUBLISH message is received from the server.
  86. def on_message_sens(client, userdata, msg):
  87. try:
  88. mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
  89. except:
  90. print("Could not connect to sql server! Quitting")
  91. else:
  92. mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
  93. acttime=int(1000*time.time())
  94. jpl=json.loads(msg.payload)
  95. ts="sp/"+msg.topic.split("/")[1]
  96. # print(ts+" "+str(jpl))
  97. str_to_dt=0
  98. if 'Time' in jpl:
  99. str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
  100. if 'ENERGY' in jpl:
  101. jplw=jpl['ENERGY']
  102. if 'Power' in jplw:
  103. try:
  104. data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
  105. except Exception as e:
  106. print(e)
  107. if 'ApparentPower' in jplw:
  108. try:
  109. data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
  110. except Exception as e:
  111. print(e)
  112. if 'ReactivePower' in jplw:
  113. try:
  114. data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
  115. except Exception as e:
  116. print(e)
  117. if 'Factor' in jplw:
  118. try:
  119. data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
  120. except Exception as e:
  121. print(e)
  122. if 'Voltage' in jplw:
  123. try:
  124. data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
  125. except Exception as e:
  126. print(e)
  127. if 'Total' in jplw:
  128. try:
  129. data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
  130. except Exception as e:
  131. print(e)
  132. if 'Current' in jplw:
  133. try:
  134. data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
  135. except Exception as e:
  136. print(e)
  137. if 'Period' in jplw:
  138. try:
  139. data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
  140. except Exception as e:
  141. print(e)
  142. mycursor_sens.close()
  143. mydb_sens.commit()
  144. mydb_sens.close()
  145. # The callback for when the client receives a CONNACK response from the server.
  146. def on_connect_sns(client, userdata, flags, rc):
  147. print("Connected with result code "+str(rc))
  148. # Subscribing in on_connect() means that if we lose the connection and
  149. # reconnect then subscriptions will be renewed.
  150. client.subscribe("tele/+/sens/#")
  151. # The callback for when a PUBLISH message is received from the server.
  152. def on_message_sns(client, userdata, msg):
  153. try:
  154. mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
  155. except:
  156. print("Could not connect to sql server! Quitting")
  157. else:
  158. print(int(msg.payload.decode("utf-8")))
  159. mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
  160. acttime=int(1000*time.time())
  161. ts=msg.topic.replace("tele","sp")
  162. print(ts)
  163. try:
  164. data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
  165. except Exception as e:
  166. print(e)
  167. mycursor_sns.close()
  168. mydb_sns.commit()
  169. mydb_sns.close()
  170. # The callback for when the client receives a CONNACK response from the server.
  171. def on_connect_net(client, userdata, flags, rc):
  172. print("Connected with result code "+str(rc))
  173. # Subscribing in on_connect() means that if we lose the connection and
  174. # reconnect then subscriptions will be renewed.
  175. client.subscribe("tele/+/net/#")
  176. # The callback for when a PUBLISH message is received from the server.
  177. def on_message_net(client, userdata, msg):
  178. try:
  179. mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
  180. except:
  181. print("Could not connect to sql server! Quitting")
  182. else:
  183. print(int(msg.payload.decode("utf-8")))
  184. mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
  185. acttime=int(1000*time.time())
  186. ts=msg.topic.replace("tele","sp")
  187. print(ts)
  188. try:
  189. data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
  190. except Exception as e:
  191. print(e)
  192. mycursor_net.close()
  193. mydb_net.commit()
  194. mydb_net.close()
  195. client = mqtt.Client()
  196. client.on_connect = on_connect
  197. client.on_message = on_message
  198. client_sens = mqtt.Client()
  199. client_sens.on_connect = on_connect_sens
  200. client_sens.on_message = on_message_sens
  201. #client_sns = mqtt.Client()
  202. #client_sns.on_connect = on_connect_sns
  203. #client_sns.on_message = on_message_sns
  204. client_net = mqtt.Client()
  205. client_net.on_connect = on_connect_net
  206. client_net.on_message = on_message_net
  207. client.connect("172.24.41.2", 1883, 60)
  208. client_sens.connect("172.24.41.2", 1883, 60)
  209. #client_sns.connect("172.24.41.2", 1883, 60)
  210. client_net.connect("172.24.41.2", 1883, 60)
  211. # Blocking call that processes network traffic, dispatches callbacks and
  212. # handles reconnecting.
  213. # Other loop*() functions are available that give a threaded interface and a
  214. # manual interface.
  215. mq_state=threading.Thread(target=client.loop_forever)
  216. mq_sens=threading.Thread(target=client_sens.loop_forever)
  217. #mq_sns=threading.Thread(target=client_sns.loop_forever)
  218. mq_net=threading.Thread(target=client_net.loop_forever)
  219. #client_sns.loop_forever()
  220. #client.loop_forever()
  221. mq_state.start()
  222. mq_sens.start()
  223. #mq_sns.start()
  224. mq_net.start()
  225. while True:
  226. time.sleep(1)