meas_data.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. import socket,numpy,time,_thread,json,requests,zlib,binascii,io,os,socket
  2. has_gnupg=False
  3. try:
  4. import gnupg
  5. except:
  6. has_gnupg=False
  7. gpg=[]
  8. else:
  9. has_gnupg=True
  10. gpg=gnupg.GPG()
  11. has_mqtt=False
  12. try:
  13. import paho.mqtt.client as mqtt
  14. import paho.mqtt.publish as publish
  15. except:
  16. has_mqtt=False
  17. else:
  18. has_mqtt=True
  19. from uuid import getnode
  20. _STRINGREMOVE=str.maketrans(dict.fromkeys(":;\\{}(){]%&"))
  21. class meas_data:
  22. def __init__(self,var_config=[]):
  23. # def __init__(self,var_name,ring_length=100,sigma=2,device=socket.gethostname(),sensor="CPU",i2c=0,store_dir="/home/pi/data",multiplicator=1,digits=5,check_last=0,mac=0,sql_min_wait=5,rsa_key="",check_last_shrinking=0.99,mean_count=5,store_each_cycle=False,sens_id={'varname':"",'sensor':"CPU",'i2c':0},id_conf=0,deviceid="FF"):
  24. if "var_name" in var_config:
  25. self.set_varname(var_config["var_name"])
  26. else:
  27. self.set_varname("dummy")
  28. self.value=[]
  29. self.act_value=0
  30. self.act_time=0
  31. self.mval=[]
  32. self.stat_std=0
  33. self.stat_mean=0
  34. self.stat_val_std=0
  35. self.stat_val_mean=0
  36. if "ring_length" in var_config:
  37. self.set_ring_length(var_config["ring_length"])
  38. else:
  39. self.set_ring_length(60)
  40. if "mean_count" in var_config:
  41. self.set_mean_count(var_config["mean_count"])
  42. if "store_each_cycle" in var_config:
  43. self.set_store_cycle(var_config["store_each_cycle"])
  44. self.sensebox=""
  45. self.senseid=""
  46. self.sense_url="https://ingress.opensensemap.org/"
  47. self.sense_intervall=300
  48. self.sense_last_time=0
  49. if "sql_min_wait" in var_config:
  50. self.sql_min_wait=var_config["sql_min_wait"]
  51. self.sqlhost=""
  52. self.sqlport=0
  53. self.sql_last_transmit=0
  54. self.bsense=False
  55. self.bsql=False
  56. self.bchecklast=False
  57. self.bchanged=False
  58. self.bfile=False
  59. self.bstoreeach=False
  60. if "sigma" in var_config:
  61. self.sigma=var_config["sigma"]
  62. else:
  63. self.sigma=2
  64. if "device" in var_config:
  65. self.set_device(var_config["device"])
  66. else:
  67. self.set_device(socket.gethostname())
  68. if "deviceid" in var_config:
  69. self.set_deviceid(var_config["deviceid"])
  70. else:
  71. self.set_device(format(getnode(),"x"))
  72. if "sensor" in var_config:
  73. self.set_sensor(var_config["sensor"])
  74. else:
  75. self.set_sensor("local")
  76. if "digits" in var_config:
  77. self.set_digits(var_config["digits"])
  78. else:
  79. self.set_digits(2)
  80. if ("check_last" in var_config) & ("check_last_shrinking" in var_config):
  81. self.set_check_last(var_config["check_last"],var_config["check_last_shrinking"])
  82. if "i2c" in var_config:
  83. self.set_i2c(var_config["i2c"])
  84. else:
  85. self.set_i2c(0)
  86. if "multiplicator" in var_config:
  87. self.set_multiplicator(var_config["multiplicator"])
  88. else:
  89. self.set_multiplicator(1)
  90. if "sens_id" in var_config:
  91. self.set_sensid(var_config["sens_id"])
  92. self.mqtt_broker=""
  93. self.mqtt_port=1883
  94. self.mqtt_topic=""
  95. self.mqtt_bool=False
  96. self.hash=0
  97. self.ids=""
  98. self.set_mac()
  99. # self.set_id(id_conf,deviceid)
  100. if "rsa_key" in var_config:
  101. self.set_rsa(var_config["rsa_key"])
  102. else:
  103. self.set_rsa("")
  104. self.sql_out=""
  105. self.set_id()
  106. self.json_out={"hash":self.hash,"signature":"","payload":{"device":self.device,"varname":self.var_name,"i2c":self.i2c,"sensor":self.sensor,"mac":self.mac,"measures":{}}}
  107. if "store_dir" in var_config:
  108. self.set_file_log(var_config["store_dir"])
  109. else:
  110. self.set_file_log(var_config[""])
  111. self.tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  112. self.tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  113. def set_sensid(self,sens_id):
  114. if "varname" in sens_id:
  115. self.set_varname(sens_id["varname"])
  116. if "sensor" in sens_id:
  117. self.set_sensor(sens_id["sensor"])
  118. if "i2c" in sens_id:
  119. self.set_i2c(sens_id["i2c"])
  120. if "channel" in sens_id:
  121. self.set_channel(sens_id["channel"])
  122. def set_store_cycle(self,store_each_cycle):
  123. self.bstoreeach=False
  124. if isinstance(store_each_cycle,bool):
  125. self.bstoreeach=True
  126. def sign(self):
  127. self.sql_out=self.json_out
  128. if self.brsa and not self.brsa_sql:
  129. self.sql_out={"signed_gpg":gpg.sign(json.dumps(self.json_out),keyid=self.rsa['keyid']).data.decode('utf-8'),"hash":self.hash}
  130. def set_rsa(self,rsa_key):
  131. self.brsa=False
  132. self.brsa_sql=False
  133. if len(rsa_key)>0:
  134. self.brsa=False
  135. try:
  136. gpgkey=gpg.list_keys(keys=rsa_key.translate(_STRINGREMOVE))
  137. except:
  138. self.brsa=False
  139. else:
  140. if len(gpgkey)>0:
  141. self.rsa=gpgkey[0]
  142. self.brsa=True
  143. else:
  144. self.brsa=False
  145. def set_id(self):
  146. idstring=self.device+str(self.i2c)+self.sensor+self.var_name
  147. self.ids=idstring
  148. self.hash=zlib.crc32(idstring.encode("utf-8")) % (1<<32)
  149. def get_id(self):
  150. print(self.hash)
  151. def set_mac(self):
  152. self.mac=format(getnode(),"x")
  153. def set_i2c(self,i2c):
  154. if (isinstance(i2c,(int,float)) and not isinstance(i2c,bool)):
  155. self.i2c=i2c
  156. else:
  157. self.i2c=0
  158. def set_channel(self,channel):
  159. if (isinstance(channel,(int,float)) and not isinstance(channel,bool)):
  160. self.channel=channel
  161. else:
  162. self.channel=0
  163. def get_varname(self):
  164. return self.var_name
  165. def set_varname(self,var_name):
  166. tvn=var_name.translate(_STRINGREMOVE)
  167. if len(tvn)==0:
  168. tvn="var"
  169. self.var_name=tvn
  170. def set_check_last(self,check_last,check_last_shrinking):
  171. if (isinstance(check_last,(int,float)) and not isinstance(check_last,bool))and(isinstance(check_last_shrinking,(int,float)) and not isinstance(check_last_shrinking,bool)):
  172. self.checklast_min=0
  173. self.checklast_max=0
  174. self.checklast_shrinking=1
  175. if check_last > 0:
  176. self.bchecklast=True
  177. self.checklast=check_last
  178. if (check_last_shrinking < 1) and (check_last_shrinking > 0):
  179. self.checklast_shrinking=check_last_shrinking
  180. else:
  181. self.bchecklast=False
  182. self.ckecklast=0
  183. def set_digits(self,digits):
  184. self.digits=1
  185. if (isinstance(digits,(int,float)) and not isinstance(digits,bool)):
  186. try:
  187. td=int(digits)-1
  188. except:
  189. print("could not set digits")
  190. else:
  191. if td<0:
  192. td=1
  193. if td>20:
  194. td=20
  195. self.digits=td
  196. def set_ring_length(self,ring_length):
  197. self.ring_length=60
  198. if (isinstance(ring_length,(int,float)) and not isinstance(ring_length,bool)):
  199. trl=int(ring_length)
  200. if trl<0:
  201. trl=(-1)*trl
  202. if trl==0:
  203. trl = 60
  204. self.ring_length=trl
  205. def set_mean_count(self,mean_count):
  206. self.mean_count=5
  207. if (isinstance(mean_count,(int,float)) and not isinstance(mean_count,bool)):
  208. tmc=int(mean_count)
  209. if tmc < 0:
  210. tmc=(-1)*tmc
  211. if tmc == 0:
  212. tmc=self.ring_length
  213. if mean_count>self.ring_length:
  214. mean_count=self.ring_length
  215. self.mean_count=mean_count
  216. def set_sensebox(self,sensebox,senseid,intervall=300):
  217. tsb=sensebox.translate(_STRINGREMOVE)
  218. tsb=tsb[0:24]
  219. self.sensebox=tsb
  220. tsi=senseid.translate(_STRINGREMOVE)
  221. tsi=tsi[0:24]
  222. self.senseid=tsi
  223. self.sense_intervall=int(intervall)
  224. self.bsense=True
  225. self.sense_url="https://ingress.opensensemap.org/boxes/%s/%s" % (tsb,tsi)
  226. print(self.sense_url)
  227. def set_sql(self,host,port=8080,min_wait=5):
  228. self.bsql=True
  229. th=host.translate(_STRINGREMOVE)
  230. self.sqlhost=th
  231. self.sqlport=8080
  232. if (isinstance(port,(int,float)) and not isinstance(port,bool)):
  233. self.sqlport=int(port)
  234. if self.sqlport<1024 or self.sqlport > 2**16:
  235. self.bsql=False
  236. self.sqlurl="http://"+self.sqlhost+":"+str(self.sqlport)+"/data/"+str(self.hash)
  237. if isinstance(min_wait,(int,float)) and not isinstance(min_wait,bool):
  238. self.sql_min_wait=min_wait
  239. else:
  240. self.sql_min_wait=0
  241. self.brsa_sql=False
  242. if self.brsa:
  243. skey=gpg.list_keys(keys="@"+th)
  244. if len(skey)>0:
  245. self.brsa_sql=True
  246. self.sql_rsa=skey[1]
  247. else:
  248. self.brsa_sql=False
  249. def set_mqtt(self,broker="localhost",port=1883,topic=""):
  250. mt=topic.translate(_STRINGREMOVE)
  251. if len(mt)==0:
  252. mt='tele/'+self.device+"/sens/"+self.sensor+"/"+self.var_name
  253. else:
  254. mt=mt.translate(":;\\{}(){]%&")
  255. mb=broker.translate(_STRINGREMOVE)
  256. mport=1883
  257. if (isinstance(port,(int,float)) and not isinstance(port,bool)):
  258. mport=int(port)
  259. self.mqtt_broker=mb
  260. self.mqtt_port=mport
  261. self.mqtt_topic=mt
  262. self.mqtt_bool=has_mqtt
  263. if has_mqtt:
  264. if mport < 1024:
  265. mport=1883
  266. try:
  267. mqtt.connect(host=mb,port=mport)
  268. except:
  269. print("no connection to broker")
  270. self.mqtt=False
  271. #else:
  272. else:
  273. self.mqtt_bool=False
  274. def set_file_log(self,store_dir="/home/pi/data/"):
  275. self.file_time=io.BytesIO()
  276. self.file_value=io.BytesIO()
  277. if len(store_dir)>1:
  278. self.store_dir=store_dir+str(self.hash)+'/'
  279. if os.path.exists(self.store_dir):
  280. self.bfile=True
  281. else:
  282. try:
  283. os.makedirs(self.store_dir,exist_ok=True)
  284. except:
  285. self.bfile=False
  286. else:
  287. self.bfile=True
  288. else:
  289. self.bfile=False
  290. def show_def(self):
  291. print(self.var_name)
  292. print(self.value)
  293. def get_device(self):
  294. return self.device
  295. def set_device(self,device):
  296. self.device=device.translate(_STRINGREMOVE)
  297. def get_deviceid(self):
  298. return self.deviceid
  299. def set_deviceid(self,deviceid):
  300. if (isinstance(deviceid,(int,float)) and not isinstance(deviceid,bool)):
  301. self.deviceid=deviceid
  302. else:
  303. self.deviceid=getnode()
  304. def get_sensor(self):
  305. return self.sensor
  306. def set_sensor(self,sensor):
  307. ts=sensor.translate(_STRINGREMOVE)
  308. self.sensor=ts
  309. def set_multiplicator(self,multiplicator):
  310. tmult=float(multiplicator)
  311. if tmult < 1:
  312. tmult=tmult*(-1)
  313. self.mult=tmult
  314. def append(self,value):
  315. if (isinstance(value,(int,float)) and not isinstance(value,bool)):
  316. tdif=-1
  317. if value>0:
  318. try:
  319. tdig=10**(round(numpy.log10(value))-self.digits)
  320. except:
  321. print("")
  322. else:
  323. tdig=1
  324. if tdig>=0:
  325. try:
  326. tv=round(value/tdig)*tdig
  327. except:
  328. tv=value
  329. btv=True
  330. if self.bchecklast and (len(self.value)>0):
  331. # check if new value is within range between min - max
  332. if (tv <= self.checklast_max) and (tv >= self.checklast_min):
  333. btv=False
  334. minmax=(self.checklast_max-self.checklast_min)*(1-self.checklast_shrinking)/2
  335. if minmax>0:
  336. self.checklast_max=self.checklast_max-minmax
  337. self.checklast_min=self.checklast_min+minmax
  338. if self.checklast_min>self.checklast_max:
  339. self.checklaft_min=self.checklast_max
  340. else:
  341. # check if new value is exact within last values
  342. testvalue=self.value[-1*min(self.checklast,len(self.value)):]
  343. if isinstance(testvalue,list):
  344. if tv in testvalue:
  345. btv=False
  346. else:
  347. if tv==testvalue:
  348. btv=False
  349. if btv:
  350. self.value.append(tv)
  351. self.act_value=tv
  352. self.act_time=int(1000*time.time())
  353. if self.bchecklast:
  354. # calc new min/max values
  355. checklast_amount=min(self.checklast,len(self.value))
  356. btv=True
  357. testvalue=[]
  358. startcheck=len(self.value)-1
  359. # get last unique values
  360. while btv:
  361. tv=self.value[startcheck]
  362. if not tv in testvalue:
  363. testvalue.append(tv)
  364. startcheck=startcheck-1
  365. if (startcheck<=0) or (len(testvalue)>self.checklast):
  366. btv=False
  367. if isinstance(testvalue,list):
  368. if len(testvalue)>=3: # simple check for outlier
  369. tvrmin=testvalue
  370. tvrmax=testvalue
  371. tvrmin.remove(min(tvrmin))
  372. tvrmax.remove(max(tvrmin))
  373. tvrminm=numpy.mean(tvrmin)
  374. tvrmaxm=numpy.mean(tvrmax)
  375. tvrmins=numpy.std(tvrmin)*3
  376. tvrmaxs=numpy.std(tvrmax)*3
  377. if min(testvalue)<(tvrminm-tvrmins):
  378. testvalue.remove(min(testvalue))
  379. if max(testvalue)>(tvrmaxm+tvrmaxs):
  380. testvalue.remove(max(testvalue))
  381. self.checklast_min=min(testvalue)
  382. self.checklast_max=max(testvalue)
  383. else:
  384. self.checklast_min=testvalue
  385. self.checklast_max=testvalue
  386. if self.mqtt_bool:
  387. _thread.start_new_thread(self.send_mqtt,(0,))
  388. if self.bsql:
  389. _thread.start_new_thread(self.send_sql,(0,))
  390. if self.bfile:
  391. _thread.start_new_thread(self.send_file,(0,))
  392. if (len(self.mval)==0) and (len(self.value)>1):
  393. self.stat_mean=numpy.mean(self.value)
  394. self.stat_std=numpy.std(self.value)/numpy.sqrt(len(self.value)-1)
  395. self.mval.append(value)
  396. if self.bsense:
  397. if len(self.mval)==self.mean_count:
  398. mmean=numpy.mean(self.mval)
  399. nstd=self.stat_std+numpy.std(self.mval)/numpy.sqrt(len(self.mval)-1)
  400. bsave=self.bstoreeach
  401. if abs(mmean-self.stat_mean) > (self.sigma*nstd):
  402. bsave=True
  403. if bsave:
  404. if self.stat_val_std==0:
  405. self.stat_val_std=abs(self.stat_val_mean/100)
  406. nstd=10**(round(numpy.log10(numpy.maximum(0.001,numpy.std(self.mval))))-1)
  407. mmean=round(mmean/nstd,0)*nstd
  408. self.act_value=int(self.mult*mmean)
  409. self.act_std=int(self.mult*numpy.std(self.mval))
  410. self.act_time=int(1000*time.time())
  411. if self.bfile:
  412. _thread.start_new_thread(self.send_file,(0,))
  413. if self.bsense:
  414. _thread.start_new_thread(self.upload_osm,(0,))
  415. self.mval=[]
  416. if len(self.value)>=self.ring_length:
  417. self.value=self.value[((-1)*self.ring_length):]
  418. def get_ring(self):
  419. return(self.value)
  420. def get_act_value(self):
  421. return(self.act_value)
  422. def send_file(self,trigger=0):
  423. self.file_time.write(int(self.act_time).to_bytes(8,byteorder="big",signed=False))
  424. self.file_value.write(int(self.act_value*self.mult).to_bytes(3,byteorder="big",signed=False))
  425. if(len(self.file_time.getvalue())>64):
  426. bstored=True
  427. try:
  428. ft=open(self.store_dir+'timestamp','ab+')
  429. fv=open(self.store_dir+'value','ab+')
  430. except:
  431. bstored=False
  432. else:
  433. ft.write(self.file_time.getvalue())
  434. fv.write(self.file_value.getvalue())
  435. ft.close()
  436. fv.close()
  437. self.file_time.close()
  438. self.file_value.close()
  439. self.file_time=io.BytesIO()
  440. self.file_value=io.BytesIO()
  441. def send_sql(self,trigger=0):
  442. sv=(-99)
  443. if len(self.value)>0:
  444. if len(self.value)>1:
  445. if self.value[-1]!=self.value[-2]:
  446. sv=self.value[-1]
  447. else:
  448. sv=self.value[-1]
  449. if sv >(-99):
  450. self.json_out['payload']['measures'][self.act_time]=int(sv*self.mult)
  451. if ((self.act_time-self.sql_last_transmit)>(self.sql_min_wait*1000)) and (len(self.json_out['payload']['measures'])>0):
  452. self.sign()
  453. # self.tcpsock.connect(("127.0.0.1",24048))
  454. # self.tcpsock.send(json.dumps(self.sql_out).encode("utf-8"))
  455. # self.tcpsock.close()
  456. # self.json_out['payload']['measures']={}
  457. try:
  458. self._r=requests.put(self.sqlurl,json=json.dumps(self.sql_out))
  459. # self._r=requests.post(self.sqlurl,data=self.sql_out)
  460. except:
  461. self._r={"status_code":404}
  462. else:
  463. if self._r.status_code==200:
  464. self.json_out['payload']['measures']={}
  465. self.sql_last_transmit=time.time()*1000
  466. def upload_osm(self,trigger=0):
  467. if (self.act_time-self.sense_last_time)>(self.sense_intervall*1000):
  468. r = requests.post(self.sense_url,json={'value': float(self.act_value)/self.mult})
  469. if (r.status_code != requests.codes.ok) & (r.status_code != 201):
  470. print("Error %d: %s" % (r.status_code,r.text))
  471. else:
  472. self.sense_last_time=self.act_time
  473. def send_mqtt(self,trigger=0):
  474. sv=(-99)
  475. if len(self.value)>0:
  476. if len(self.value)>1:
  477. if self.value[-1]!=self.value[-2]:
  478. sv=self.value[-1]
  479. else:
  480. sv=self.value[-1]
  481. if sv >(-99):
  482. try:
  483. publish.single(self.mqtt_topic,self.mult*sv,hostname=self.mqtt_broker,port=self.mqtt_port,retain=True)
  484. except:
  485. print("could not send mqtt")