123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- import socket,numpy,time,_thread,json,requests,zlib,binascii,io,os,socket
- has_gnupg=False
- try:
- import gnupg
- except:
- has_gnupg=False
- gpg=[]
- else:
- has_gnupg=True
- gpg=gnupg.GPG()
- has_mqtt=False
- try:
- import paho.mqtt.client as mqtt
- import paho.mqtt.publish as publish
- except:
- has_mqtt=False
- else:
- has_mqtt=True
- from uuid import getnode
- _STRINGREMOVE=str.maketrans(dict.fromkeys(":;\\{}(){]%&"))
- class meas_data:
- def __init__(self,var_config=[]):
- # def __init__(self,var_name,ring_length=100,sigma=2,device=socket.gethostname(),sensor="CPU",i2c=0,store_dir="/home/pi/data",multiplicator=1,digits=5,check_last=0,mac=0,sql_min_wait=5,rsa_key="",check_last_shrinking=0.99,mean_count=5,store_each_cycle=False,sens_id={'varname':"",'sensor':"CPU",'i2c':0},id_conf=0,deviceid="FF"):
- if "var_name" in var_config:
- self.set_varname(var_config["var_name"])
- else:
- self.set_varname("dummy")
- self.value=[]
- self.act_value=0
- self.act_time=0
- self.mval=[]
- self.stat_std=0
- self.stat_mean=0
- self.stat_val_std=0
- self.stat_val_mean=0
- if "ring_length" in var_config:
- self.set_ring_length(var_config["ring_length"])
- else:
- self.set_ring_length(60)
- if "mean_count" in var_config:
- self.set_mean_count(var_config["mean_count"])
- if "store_each_cycle" in var_config:
- self.set_store_cycle(var_config["store_each_cycle"])
- self.sensebox=""
- self.senseid=""
- self.sense_url="https://ingress.opensensemap.org/"
- self.sense_intervall=300
- self.sense_last_time=0
- if "sql_min_wait" in var_config:
- self.sql_min_wait=var_config["sql_min_wait"]
- self.sqlhost=""
- self.sqlport=0
- self.sql_last_transmit=0
- self.bsense=False
- self.bsql=False
- self.bchecklast=False
- self.bchanged=False
- self.bfile=False
- self.bstoreeach=False
- if "sigma" in var_config:
- self.sigma=var_config["sigma"]
- else:
- self.sigma=2
- if "device" in var_config:
- self.set_device(var_config["device"])
- else:
- self.set_device(socket.gethostname())
- if "deviceid" in var_config:
- self.set_deviceid(var_config["deviceid"])
- else:
- self.set_device(format(getnode(),"x"))
- if "sensor" in var_config:
- self.set_sensor(var_config["sensor"])
- else:
- self.set_sensor("local")
- if "digits" in var_config:
- self.set_digits(var_config["digits"])
- else:
- self.set_digits(2)
- if ("check_last" in var_config) & ("check_last_shrinking" in var_config):
- self.set_check_last(var_config["check_last"],var_config["check_last_shrinking"])
- if "i2c" in var_config:
- self.set_i2c(var_config["i2c"])
- else:
- self.set_i2c(0)
- if "multiplicator" in var_config:
- self.set_multiplicator(var_config["multiplicator"])
- else:
- self.set_multiplicator(1)
- if "sens_id" in var_config:
- self.set_sensid(var_config["sens_id"])
- self.mqtt_broker=""
- self.mqtt_port=1883
- self.mqtt_topic=""
- self.mqtt_bool=False
- self.hash=0
- self.ids=""
- self.set_mac()
- # self.set_id(id_conf,deviceid)
- if "rsa_key" in var_config:
- self.set_rsa(var_config["rsa_key"])
- else:
- self.set_rsa("")
- self.sql_out=""
- self.set_id()
- self.json_out={"hash":self.hash,"signature":"","payload":{"device":self.device,"varname":self.var_name,"i2c":self.i2c,"sensor":self.sensor,"mac":self.mac,"measures":{}}}
- if "store_dir" in var_config:
- self.set_file_log(var_config["store_dir"])
- else:
- self.set_file_log(var_config[""])
- self.tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- def set_sensid(self,sens_id):
- if "varname" in sens_id:
- self.set_varname(sens_id["varname"])
- if "sensor" in sens_id:
- self.set_sensor(sens_id["sensor"])
- if "i2c" in sens_id:
- self.set_i2c(sens_id["i2c"])
- if "channel" in sens_id:
- self.set_channel(sens_id["channel"])
- def set_store_cycle(self,store_each_cycle):
- self.bstoreeach=False
- if isinstance(store_each_cycle,bool):
- self.bstoreeach=True
- def sign(self):
- self.sql_out=self.json_out
- if self.brsa and not self.brsa_sql:
- self.sql_out={"signed_gpg":gpg.sign(json.dumps(self.json_out),keyid=self.rsa['keyid']).data.decode('utf-8'),"hash":self.hash}
- def set_rsa(self,rsa_key):
- self.brsa=False
- self.brsa_sql=False
- if len(rsa_key)>0:
- self.brsa=False
- try:
- gpgkey=gpg.list_keys(keys=rsa_key.translate(_STRINGREMOVE))
- except:
- self.brsa=False
- else:
- if len(gpgkey)>0:
- self.rsa=gpgkey[0]
- self.brsa=True
- else:
- self.brsa=False
- def set_id(self):
- idstring=self.device+str(self.i2c)+self.sensor+self.var_name
- self.ids=idstring
- self.hash=zlib.crc32(idstring.encode("utf-8")) % (1<<32)
- def get_id(self):
- print(self.hash)
- def set_mac(self):
- self.mac=format(getnode(),"x")
- def set_i2c(self,i2c):
- if (isinstance(i2c,(int,float)) and not isinstance(i2c,bool)):
- self.i2c=i2c
- else:
- self.i2c=0
- def set_channel(self,channel):
- if (isinstance(channel,(int,float)) and not isinstance(channel,bool)):
- self.channel=channel
- else:
- self.channel=0
- def get_varname(self):
- return self.var_name
- def set_varname(self,var_name):
- tvn=var_name.translate(_STRINGREMOVE)
- if len(tvn)==0:
- tvn="var"
- self.var_name=tvn
- def set_check_last(self,check_last,check_last_shrinking):
- if (isinstance(check_last,(int,float)) and not isinstance(check_last,bool))and(isinstance(check_last_shrinking,(int,float)) and not isinstance(check_last_shrinking,bool)):
- self.checklast_min=0
- self.checklast_max=0
- self.checklast_shrinking=1
- if check_last > 0:
- self.bchecklast=True
- self.checklast=check_last
- if (check_last_shrinking < 1) and (check_last_shrinking > 0):
- self.checklast_shrinking=check_last_shrinking
- else:
- self.bchecklast=False
- self.ckecklast=0
- def set_digits(self,digits):
- self.digits=1
- if (isinstance(digits,(int,float)) and not isinstance(digits,bool)):
- try:
- td=int(digits)-1
- except:
- print("could not set digits")
- else:
- if td<0:
- td=1
- if td>20:
- td=20
- self.digits=td
- def set_ring_length(self,ring_length):
- self.ring_length=60
- if (isinstance(ring_length,(int,float)) and not isinstance(ring_length,bool)):
- trl=int(ring_length)
- if trl<0:
- trl=(-1)*trl
- if trl==0:
- trl = 60
- self.ring_length=trl
- def set_mean_count(self,mean_count):
- self.mean_count=5
- if (isinstance(mean_count,(int,float)) and not isinstance(mean_count,bool)):
- tmc=int(mean_count)
- if tmc < 0:
- tmc=(-1)*tmc
- if tmc == 0:
- tmc=self.ring_length
- if mean_count>self.ring_length:
- mean_count=self.ring_length
- self.mean_count=mean_count
- def set_sensebox(self,sensebox,senseid,intervall=300):
- tsb=sensebox.translate(_STRINGREMOVE)
- tsb=tsb[0:24]
- self.sensebox=tsb
- tsi=senseid.translate(_STRINGREMOVE)
- tsi=tsi[0:24]
- self.senseid=tsi
- self.sense_intervall=int(intervall)
- self.bsense=True
- self.sense_url="https://ingress.opensensemap.org/boxes/%s/%s" % (tsb,tsi)
- print(self.sense_url)
- def set_sql(self,host,port=8080,min_wait=5):
- self.bsql=True
- th=host.translate(_STRINGREMOVE)
- self.sqlhost=th
- self.sqlport=8080
- if (isinstance(port,(int,float)) and not isinstance(port,bool)):
- self.sqlport=int(port)
- if self.sqlport<1024 or self.sqlport > 2**16:
- self.bsql=False
- self.sqlurl="http://"+self.sqlhost+":"+str(self.sqlport)+"/data/"+str(self.hash)
- if isinstance(min_wait,(int,float)) and not isinstance(min_wait,bool):
- self.sql_min_wait=min_wait
- else:
- self.sql_min_wait=0
- self.brsa_sql=False
- if self.brsa:
- skey=gpg.list_keys(keys="@"+th)
- if len(skey)>0:
- self.brsa_sql=True
- self.sql_rsa=skey[1]
- else:
- self.brsa_sql=False
- def set_mqtt(self,broker="localhost",port=1883,topic=""):
- mt=topic.translate(_STRINGREMOVE)
- if len(mt)==0:
- mt='tele/'+self.device+"/sens/"+self.sensor+"/"+self.var_name
- else:
- mt=mt.translate(":;\\{}(){]%&")
- mb=broker.translate(_STRINGREMOVE)
- mport=1883
- if (isinstance(port,(int,float)) and not isinstance(port,bool)):
- mport=int(port)
- self.mqtt_broker=mb
- self.mqtt_port=mport
- self.mqtt_topic=mt
- self.mqtt_bool=has_mqtt
- if has_mqtt:
- if mport < 1024:
- mport=1883
- try:
- mqtt.connect(host=mb,port=mport)
- except:
- print("no connection to broker")
- self.mqtt=False
- #else:
- else:
- self.mqtt_bool=False
- def set_file_log(self,store_dir="/home/pi/data/"):
- self.file_time=io.BytesIO()
- self.file_value=io.BytesIO()
- if len(store_dir)>1:
- self.store_dir=store_dir+str(self.hash)+'/'
- if os.path.exists(self.store_dir):
- self.bfile=True
- else:
- try:
- os.makedirs(self.store_dir,exist_ok=True)
- except:
- self.bfile=False
- else:
- self.bfile=True
- else:
- self.bfile=False
- def show_def(self):
- print(self.var_name)
- print(self.value)
- def get_device(self):
- return self.device
- def set_device(self,device):
- self.device=device.translate(_STRINGREMOVE)
- def get_deviceid(self):
- return self.deviceid
- def set_deviceid(self,deviceid):
- if (isinstance(deviceid,(int,float)) and not isinstance(deviceid,bool)):
- self.deviceid=deviceid
- else:
- self.deviceid=getnode()
- def get_sensor(self):
- return self.sensor
- def set_sensor(self,sensor):
- ts=sensor.translate(_STRINGREMOVE)
- self.sensor=ts
- def set_multiplicator(self,multiplicator):
- tmult=float(multiplicator)
- if tmult < 1:
- tmult=tmult*(-1)
- self.mult=tmult
- def append(self,value):
- if (isinstance(value,(int,float)) and not isinstance(value,bool)):
- tdif=-1
- if value>0:
- try:
- tdig=10**(round(numpy.log10(value))-self.digits)
- except:
- print("")
- else:
- tdig=1
- if tdig>=0:
- try:
- tv=round(value/tdig)*tdig
- except:
- tv=value
- btv=True
- if self.bchecklast and (len(self.value)>0):
- # check if new value is within range between min - max
- if (tv <= self.checklast_max) and (tv >= self.checklast_min):
- btv=False
- minmax=(self.checklast_max-self.checklast_min)*(1-self.checklast_shrinking)/2
- if minmax>0:
- self.checklast_max=self.checklast_max-minmax
- self.checklast_min=self.checklast_min+minmax
- if self.checklast_min>self.checklast_max:
- self.checklaft_min=self.checklast_max
- else:
- # check if new value is exact within last values
- testvalue=self.value[-1*min(self.checklast,len(self.value)):]
- if isinstance(testvalue,list):
- if tv in testvalue:
- btv=False
- else:
- if tv==testvalue:
- btv=False
- if btv:
- self.value.append(tv)
- self.act_value=tv
- self.act_time=int(1000*time.time())
- if self.bchecklast:
- # calc new min/max values
- checklast_amount=min(self.checklast,len(self.value))
- btv=True
- testvalue=[]
- startcheck=len(self.value)-1
- # get last unique values
- while btv:
- tv=self.value[startcheck]
- if not tv in testvalue:
- testvalue.append(tv)
- startcheck=startcheck-1
- if (startcheck<=0) or (len(testvalue)>self.checklast):
- btv=False
- if isinstance(testvalue,list):
- if len(testvalue)>=3: # simple check for outlier
- tvrmin=testvalue
- tvrmax=testvalue
- tvrmin.remove(min(tvrmin))
- tvrmax.remove(max(tvrmin))
- tvrminm=numpy.mean(tvrmin)
- tvrmaxm=numpy.mean(tvrmax)
- tvrmins=numpy.std(tvrmin)*3
- tvrmaxs=numpy.std(tvrmax)*3
- if min(testvalue)<(tvrminm-tvrmins):
- testvalue.remove(min(testvalue))
- if max(testvalue)>(tvrmaxm+tvrmaxs):
- testvalue.remove(max(testvalue))
- self.checklast_min=min(testvalue)
- self.checklast_max=max(testvalue)
- else:
- self.checklast_min=testvalue
- self.checklast_max=testvalue
- if self.mqtt_bool:
- _thread.start_new_thread(self.send_mqtt,(0,))
- if self.bsql:
- _thread.start_new_thread(self.send_sql,(0,))
- if self.bfile:
- _thread.start_new_thread(self.send_file,(0,))
- if (len(self.mval)==0) and (len(self.value)>1):
- self.stat_mean=numpy.mean(self.value)
- self.stat_std=numpy.std(self.value)/numpy.sqrt(len(self.value)-1)
- self.mval.append(value)
- if self.bsense:
- if len(self.mval)==self.mean_count:
- mmean=numpy.mean(self.mval)
- nstd=self.stat_std+numpy.std(self.mval)/numpy.sqrt(len(self.mval)-1)
- bsave=self.bstoreeach
- if abs(mmean-self.stat_mean) > (self.sigma*nstd):
- bsave=True
- if bsave:
- if self.stat_val_std==0:
- self.stat_val_std=abs(self.stat_val_mean/100)
- nstd=10**(round(numpy.log10(numpy.maximum(0.001,numpy.std(self.mval))))-1)
- mmean=round(mmean/nstd,0)*nstd
- self.act_value=int(self.mult*mmean)
- self.act_std=int(self.mult*numpy.std(self.mval))
- self.act_time=int(1000*time.time())
- if self.bfile:
- _thread.start_new_thread(self.send_file,(0,))
- if self.bsense:
- _thread.start_new_thread(self.upload_osm,(0,))
- self.mval=[]
- if len(self.value)>=self.ring_length:
- self.value=self.value[((-1)*self.ring_length):]
- def get_ring(self):
- return(self.value)
- def get_act_value(self):
- return(self.act_value)
- def send_file(self,trigger=0):
- self.file_time.write(int(self.act_time).to_bytes(8,byteorder="big",signed=False))
- self.file_value.write(int(self.act_value*self.mult).to_bytes(3,byteorder="big",signed=False))
- if(len(self.file_time.getvalue())>64):
- bstored=True
- try:
- ft=open(self.store_dir+'timestamp','ab+')
- fv=open(self.store_dir+'value','ab+')
- except:
- bstored=False
- else:
- ft.write(self.file_time.getvalue())
- fv.write(self.file_value.getvalue())
- ft.close()
- fv.close()
- self.file_time.close()
- self.file_value.close()
- self.file_time=io.BytesIO()
- self.file_value=io.BytesIO()
- def send_sql(self,trigger=0):
- sv=(-99)
- if len(self.value)>0:
- if len(self.value)>1:
- if self.value[-1]!=self.value[-2]:
- sv=self.value[-1]
- else:
- sv=self.value[-1]
- if sv >(-99):
- self.json_out['payload']['measures'][self.act_time]=int(sv*self.mult)
- if ((self.act_time-self.sql_last_transmit)>(self.sql_min_wait*1000)) and (len(self.json_out['payload']['measures'])>0):
- self.sign()
- # self.tcpsock.connect(("127.0.0.1",24048))
- # self.tcpsock.send(json.dumps(self.sql_out).encode("utf-8"))
- # self.tcpsock.close()
- # self.json_out['payload']['measures']={}
- try:
- self._r=requests.put(self.sqlurl,json=json.dumps(self.sql_out))
- # self._r=requests.post(self.sqlurl,data=self.sql_out)
- except:
- self._r={"status_code":404}
- else:
- if self._r.status_code==200:
- self.json_out['payload']['measures']={}
- self.sql_last_transmit=time.time()*1000
- def upload_osm(self,trigger=0):
- if (self.act_time-self.sense_last_time)>(self.sense_intervall*1000):
- r = requests.post(self.sense_url,json={'value': float(self.act_value)/self.mult})
- if (r.status_code != requests.codes.ok) & (r.status_code != 201):
- print("Error %d: %s" % (r.status_code,r.text))
- else:
- self.sense_last_time=self.act_time
- def send_mqtt(self,trigger=0):
- sv=(-99)
- if len(self.value)>0:
- if len(self.value)>1:
- if self.value[-1]!=self.value[-2]:
- sv=self.value[-1]
- else:
- sv=self.value[-1]
- if sv >(-99):
- try:
- publish.single(self.mqtt_topic,self.mult*sv,hostname=self.mqtt_broker,port=self.mqtt_port,retain=True)
- except:
- print("could not send mqtt")
|