123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- CREATE DATABASE if not exists tsensor;
- create user if not exists 'sensor'@'localhost' identified by 'Whillmqq';
- create user if not exists 'sensordataread'@'localhost' identified by 'Whill';
- create table if not exists tsensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
- topic varchar(256) comment 'topic, unter den dieses Datum verwaltet wird',
- value bigint not null comment 'Sensordaten als Integer');
- create table if not exists tsensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
- topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
- value bigint not null comment 'Sensordaten als Integer', index (timestamp,topicid));
- create table if not exists tsensor.data_retent as select * from tsensor.data_storage;
- create table if not exists tsensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
- value bigint not null comment 'Sensordaten als Integer', index (topicid));
- create table if not exists tsensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
- pos tinyint not null comment 'Position innerhalb des Topics',
- minlength tinyint comment 'Mindestanzahl an elementen, die das Topic haben muss, damit die Regel greift');
- create table if not exists tsensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
- topic varchar(256) not null unique comment 'Topic');
- create table if not exists tsensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
- feld varchar(256) comment 'Bezeichnung des Datums',
- inhalt varchar(256) comment 'Inhalt des Datums',index (topicid,feld));
- create or replace view tsensor.data_view as select utd.*,tl.topic,tdq.inhalt as quantity,tdd.inhalt as device,tdi.inhalt as internal_id from ((select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_storage ds) union (select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_retent ds)) utd
- join tsensor.topic_list tl on utd.topicid=tl.topicid
- left join tsensor.topic_def tdq on tdq.topicid=utd.topicid and tdq.feld='quantity'
- left join tsensor.topic_def tdd on tdd.topicid=utd.topicid and tdq.feld='device'
- left join tsensor.topic_def tdi on tdi.topicid=utd.topicid and tdq.feld='internal_id';
- grant select,insert,update on tsensor.data_storage to 'sensor'@'localhost' with grant option;
- grant insert,select,update on tsensor.last_data to 'sensor'@'localhost' with grant option;
- grant insert,select on tsensor.topic_list to 'sensor'@'localhost' with grant option;
- grant insert,select on tsensor.topic_def to 'sensor'@'localhost' with grant option;
- grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
- grant select on tsensor.data_storage to 'sensordataread'@'localhost' with grant option;
- grant select on tsensor.data_view to 'sensordataread'@'localhost' with grant option;
- delimiter //
- create or replace procedure tsensor.retent_data()
- begin
- declare tid bigint;
- declare btopicdone int;
- declare topic_curs cursor for select distinct topic_list.topicid from topic_list;
- declare continue handler for not found set btopicdone = 1;
- open topic_curs;
- repeat
- fetch topic_curs into tid;
- select count(*) into @actcount from data_storage where topicid=tid;
- if (@actcount > 2) then
- select min(timestamp) into @mts from (select timestamp from data_storage where topicid=tid order by timestamp desc limit 2) td ;
- insert into data_retent select * from data_storage where topicid=tid and timestamp < @mts;
- delete from data_storage where topicid=tid and timestamp < @mts;
- commit;
- end if;
- until btopicdone end repeat;
- close topic_curs;
- end
- //
- delimiter ;
- DELIMITER //
- CREATE OR REPLACE PROCEDURE tsensor.insert_topic(vtopic varchar(1024),newid int unsigned)
- begin
- declare tpos tinyint;
- declare feldname varchar(1024);
- DECLARE bcfgDone INT;
- DECLARE cfg_curs CURSOR FOR SELECT cfg_topic.feld,cfg_topic.pos FROM cfg_topic where cfg_topic.minlength is null or cfg_topic.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
- DECLARE CONTINUE HANDLER FOR NOT FOUND SET bcfgDone = 1;
- OPEN cfg_curs;
- REPEAT
- fetch cfg_curs into feldname,tpos;
- if (tpos < 0) then
- select substring_index(substring_index(vtopic,'/',tpos),'/',1) into @feldinhalt;
- else
- select substring_index(substring_index(vtopic,'/',tpos),'/',-1) into @feldinhalt;
- end if;
- if (select count(*) from topic_def where topic_def.topicid = newid and topic_def.feld=feldname) = 0 then
- insert into topic_def (topicid,feld,inhalt) values (newid,feldname,@feldinhalt);
- commit;
- end if;
- UNTIL bcfgDone END REPEAT;
- close cfg_curs;
- end
- //
- DELIMITER ;
- DELIMITER //
- CREATE OR REPLACE PROCEDURE tsensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
- BEGIN
- select crc32(intopic) into @crctopic;
- if (@crctopic is null) then
- signal sqlstate '45000' set mysql_errno=32100,message_text='Missing topic';
- end if;
- if (select invalue is null) then
- signal sqlstate '45000' set mysql_errno=32100,message_text='Missing value';
- end if;
- if (select tstamp is null) then
- signal sqlstate '45000' set mysql_errno=32100,message_text='Missing timestamp';
- end if;
- if (@crctopic in (select topicid from topic_list))=0 then
- # new topic means no value existent
- call insert_topic(intopic,@crctopic);
- insert into topic_list (topicid,topic) values (@crctopic,intopic);
- insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
- insert into last_data (topicid,value) values (@crctopic,invalue);
- else
- # topic exists and values should be available
- if (invalue not in (select value from last_data where topicid=@crctopic)) then
- insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
- update last_data set value=invalue where topicid=@crctopic;
- else
- if (select count(distinct td.value) from (select data_storage.value from data_storage where topicid=@crctopic and (data_storage.timestamp > (unix_timestamp(now())-300)*1000) order by timestamp desc limit 2) td) = 1 then
- update data_storage set timestamp=tstamp where topicid=@crctopic and timestamp=(select max(timestamp) from data_storage where topicid=@crctopic);
- else
- insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
- end if; # check if second last match
- end if; # compare with last value
- end if; # topicid is null
- commit;
- END
- //
- DELIMITER ;
- grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
- set global event_scheduler=ON;
- create event if not exists tsensor.retent_old_data on schedule every 1 hour do call tsensor.retent_data();
- insert into tsensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
- insert into tsensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
- insert into tsensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
- insert into tsensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
- commit;
|