123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import socket,threading,SocketServer,json,time
- import pymysql
- from Queue import Queue
- def sql_get_dict(mycursor,mytable,myvar):
- mycursor.execute("select * from "+str(mytable))
- tout={}
- tt=mycursor.fetchall()
- for tv in tt:
- tout[tv[myvar]]=tv['id']
- return(tout)
- def sql_insert(q):
- try:
- mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
- except:
- print("Could not connect to sql server! Quitting")
- else:
- mycursor=mydb.cursor(pymysql.cursors.DictCursor)
- sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})"
- # get variable id out of sql
- svar=sql_get_dict(mycursor,"var_id","var")
- sdev=sql_get_dict(mycursor,"device_id","device")
- ssens=sql_get_dict(mycursor,"sensor_id","sensor")
- sqldata=[]
- while True:
- if q.empty():
- time.sleep(0.1)
- else:
- try:
- indata=q.get()
- if indata is not None:
- q.task_done()
- except Exception as e:
- print("Error during queuing")
- print(e)
- else:
- if indata is not None:
- if indata['device'] not in sdev:
- mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')")
- mydb.commit()
- sdev=sql_get_dict(mycursor,"device_id","device")
- if indata['sensor'] not in ssens:
- mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')")
- mydb.commit()
- ssens=sql_get_dict(mycursor,"sensor_id","sensor")
- if indata['varname'] not in svar:
- mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')")
- mydb.commit()
- svar=sql_get_dict(mycursor,"var_id","var")
- txt_sql=sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value'])
- try:
- mycursor.execute(txt_sql)
- except:
- print("Eror in execute sql insert")
- errlog=open("sql_missed.txt","a")
- errlog.write(txt_sql)
- errlog.close()
- else:
- mydb.commit()
- class ThreadedTCPRequestHandler(SocketServer.StreamRequestHandler):
- def __init__(self, request, client_address, server):
- self.queue = server.queue
- socketserver.StreamRequestHandler.__init__(self, request, client_address, server)
- def handle(self):
- indata = str(self.request.recv(1024), 'ascii')
- cur_thread = threading.current_thread()
- #indata=self.data
- bjson=True
- # try if indata is in json format.
- # only process indata, if in json
- try:
- test=json.loads(indata)
- except:
- bjson=False
- else:
- # indata must have a payload entry
- if "payload" in test:
- # get credential for sql server and open connection
- # only process if sql connection could be open
- datasource=self.client_address[0]
- if "device" in test:
- datasource=test['device'].translate(' ./:;*|')
- datasource=datasource[:64]
- multi=1
- if "mult" in test:
- multi=int(test['mult'])
- payload=test['payload']
- for x,y in payload.items():
- # remove unwanted characters from variable name
- varx=x.translate(' ./:;*!')
- varx=varx[:64]
- value=0
- if "value" in y:
- value=int(y['value'])
- sensor=""
- if "time" in y:
- datatime=int(y['time'])
- else:
- datatime=int(1000*time.time())
- if "sensor" in y:
- sensor=y['sensor']
- sensor=sensor.translate(' ./:;*!')
- sensor=sensor[:32]
- i2c=0
- if "i2c" in y:
- i2c=int(y['i2c'])
- q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False)
- class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
- def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None):
- self.queue = queue
- SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass,
- bind_and_activate=bind_and_activate)
- if __name__ == "__main__":
- q=Queue(maxsize=0)
- sql_worker=threading.Thread(target=sql_insert,args=(q,))
- sql_worker.setDaemon(True)
- sql_worker.start()
- # Port 0 means to select an arbitrary unused port
- HOST, PORT = "", 24048
- server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q)
- server.timeout=None
- # Start a thread with the server -- that thread will then start one
- # more thread for each request
- server_thread = threading.Thread(target=server.serve_forever,args=(q,))
- # Exit the server thread when the main thread terminates
- server_thread.daemon = True
- server_thread.timeout=None
- server_thread.start()
- print("Server loop running in thread:", server_thread.name)
- server.serve_forever()
- server.shutdown()
- server.server_close()
|