123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- CREATE schema if not exists sensor;
- create role sensordatain connection limit 10 password '';
- create role sensordataread connection limit 10 password '';
- drop table sensor.datain;
- drop table sensor.data_storage;
- drop table sensor.last_data;
- drop table sensor.cfg_topic;
- drop table sensor.topic_list;
- drop table sensor.topic_def;
- create table if not exists sensor.datain (timestamp bigint not null,
- topic varchar(256),
- value bigint not null);
- create table if not exists sensor.data_storage (timestamp bigint not null,
- topicid bigint not null,
- value bigint not null);
- create table if not exists sensor.last_data (topicid bigint not null,
- value bigint not null);
- create table if not exists sensor.cfg_topic (feld varchar(256),
- pos smallint not null,
- minlength smallint );
- create table if not exists sensor.topic_list (topicid bigint not null primary key,
- topic varchar(256) not null unique);
- create table if not exists sensor.topic_def (topicid bigint not null,
- feld varchar(256),
- inhalt varchar(256));
- insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
- insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
- insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
- insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
- grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
- grant insert,select,update on sensor.last_data to akosensordatain with grant option;
- grant insert,select on sensor.topic_list to akosensordatain with grant option;
- grant insert,select on sensor.topic_def to akosensordatain with grant option;
- grant execute on all procedures in schema sensor to akosensordatain with grant option;
- grant select,insert,update on sensor.data_storage to sensor with grant option;
- grant insert,select,update on sensor.last_data to sensor with grant option;
- grant insert,select on sensor.topic_list to sensor with grant option;
- grant insert,select on sensor.topic_def to sensor with grant option;
- grant execute on all procedures in schema sensor to sensor with grant option;
- grant connect on database sensor to sensor;
- grant select on akosensor.data_storage to sensordataread with grant option;
- CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
- language plpgsql
- as $$
- declare
- feldname varchar(1024);
- tpos int;
- feldinhalt varchar(64);
- bcfgDone INT;
- cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
- begin
- OPEN cfg_curs;
- loop
- fetch cfg_curs into feldname,tpos;
- exit when not found;
- if (tpos < 0) then
- feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
- else
- feldinhalt := split_part(vtopic,'/',tpos);
- end if;
- if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then
- insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
- end if;
- end loop;
- close cfg_curs;
- end
- $$;
- CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
- language plpgsql
- as $$
- declare
- crctopic bigint;
- BEGIN
- if intopic is null then
- raise 'missing Topic' using message = 'Topic is missing';
- end if;
- crctopic=crc32(intopic);
- if invalue is null then
- raise 'missing value' using message = 'Value is missing';
- end if;
- if tstamp is null then
- raise 'missing time' using message = 'Time is missing';
- end if;
-
- if (crctopic in (select topicid from sensor.topic_list))=false then
- call sensor.insert_topic(intopic,crctopic);
- insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
- insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
- insert into sensor.last_data (topicid,value) values (crctopic,invalue);
- else
- if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
- insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
- update sensor.last_data set value=invalue where topicid=crctopic;
- else
- if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
- update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
- else
- insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
- end if;
- end if;
- end if;
- END
- $$;
- CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
- DECLARE
- tmp bigint;
- i int;
- j int;
- byte_length int;
- binary_string bytea;
- BEGIN
- IF text_string = '' THEN
- RETURN 0;
- END IF;
- i = 0;
- tmp = 4294967295;
- byte_length = bit_length(text_string) / 8;
- binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
- LOOP
- tmp = (tmp # get_byte(binary_string, i))::bigint;
- i = i + 1;
- j = 0;
- LOOP
- tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
- j = j + 1;
- IF j >= 8 THEN
- EXIT;
- END IF;
- END LOOP;
- IF i >= byte_length THEN
- EXIT;
- END IF;
- END LOOP;
- RETURN (tmp # 4294967295);
- END
- $$ IMMUTABLE LANGUAGE plpgsql;
|