pysqltq.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import socket,threading,SocketServer,json,time
  2. import pymysql
  3. from Queue import Queue
  4. def sql_get_dict(mycursor,mytable,myvar):
  5. mycursor.execute("select * from "+str(mytable))
  6. tout={}
  7. tt=mycursor.fetchall()
  8. for tv in tt:
  9. tout[tv[myvar]]=tv['id']
  10. return(tout)
  11. def sql_insert(q):
  12. try:
  13. mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
  14. except:
  15. print("Could not connect to sql server! Quitting")
  16. else:
  17. mycursor=mydb.cursor(pymysql.cursors.DictCursor)
  18. sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})"
  19. # get variable id out of sql
  20. svar=sql_get_dict(mycursor,"var_id","var")
  21. sdev=sql_get_dict(mycursor,"device_id","device")
  22. ssens=sql_get_dict(mycursor,"sensor_id","sensor")
  23. sqldata=[]
  24. while True:
  25. if q.empty():
  26. time.sleep(0.1)
  27. else:
  28. try:
  29. indata=q.get()
  30. if indata is not None:
  31. q.task_done()
  32. except Exception as e:
  33. print("Error during queuing")
  34. print(e)
  35. else:
  36. if indata is not None:
  37. if indata['device'] not in sdev:
  38. mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')")
  39. mydb.commit()
  40. sdev=sql_get_dict(mycursor,"device_id","device")
  41. if indata['sensor'] not in ssens:
  42. mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')")
  43. mydb.commit()
  44. ssens=sql_get_dict(mycursor,"sensor_id","sensor")
  45. if indata['varname'] not in svar:
  46. mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')")
  47. mydb.commit()
  48. svar=sql_get_dict(mycursor,"var_id","var")
  49. txt_sql=sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value'])
  50. try:
  51. mycursor.execute(txt_sql)
  52. except:
  53. print("Eror in execute sql insert")
  54. errlog=open("sql_missed.txt","a")
  55. errlog.write(txt_sql)
  56. errlog.close()
  57. else:
  58. mydb.commit()
  59. class ThreadedTCPRequestHandler(SocketServer.StreamRequestHandler):
  60. def __init__(self, request, client_address, server):
  61. self.queue = server.queue
  62. socketserver.StreamRequestHandler.__init__(self, request, client_address, server)
  63. def handle(self):
  64. indata = str(self.request.recv(1024), 'ascii')
  65. cur_thread = threading.current_thread()
  66. #indata=self.data
  67. bjson=True
  68. # try if indata is in json format.
  69. # only process indata, if in json
  70. try:
  71. test=json.loads(indata)
  72. except:
  73. bjson=False
  74. else:
  75. # indata must have a payload entry
  76. if "payload" in test:
  77. # get credential for sql server and open connection
  78. # only process if sql connection could be open
  79. datasource=self.client_address[0]
  80. if "device" in test:
  81. datasource=test['device'].translate(' ./:;*|')
  82. datasource=datasource[:64]
  83. multi=1
  84. if "mult" in test:
  85. multi=int(test['mult'])
  86. payload=test['payload']
  87. for x,y in payload.items():
  88. # remove unwanted characters from variable name
  89. varx=x.translate(' ./:;*!')
  90. varx=varx[:64]
  91. value=0
  92. if "value" in y:
  93. value=int(y['value'])
  94. sensor=""
  95. if "time" in y:
  96. datatime=int(y['time'])
  97. else:
  98. datatime=int(1000*time.time())
  99. if "sensor" in y:
  100. sensor=y['sensor']
  101. sensor=sensor.translate(' ./:;*!')
  102. sensor=sensor[:32]
  103. i2c=0
  104. if "i2c" in y:
  105. i2c=int(y['i2c'])
  106. q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False)
  107. class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
  108. def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None):
  109. self.queue = queue
  110. SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass,
  111. bind_and_activate=bind_and_activate)
  112. if __name__ == "__main__":
  113. q=Queue(maxsize=0)
  114. sql_worker=threading.Thread(target=sql_insert,args=(q,))
  115. sql_worker.setDaemon(True)
  116. sql_worker.start()
  117. # Port 0 means to select an arbitrary unused port
  118. HOST, PORT = "", 24048
  119. server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q)
  120. server.timeout=None
  121. # Start a thread with the server -- that thread will then start one
  122. # more thread for each request
  123. server_thread = threading.Thread(target=server.serve_forever,args=(q,))
  124. # Exit the server thread when the main thread terminates
  125. server_thread.daemon = True
  126. server_thread.timeout=None
  127. server_thread.start()
  128. print("Server loop running in thread:", server_thread.name)
  129. server.serve_forever()
  130. server.shutdown()
  131. server.server_close()