create_sensor_db.psql 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. CREATE schema if not exists sensor;
  2. create role sensordatain connection limit 10 password '';
  3. create role sensordataread connection limit 10 password '';
  4. drop table sensor.datain;
  5. drop table sensor.data_storage;
  6. drop table sensor.last_data;
  7. drop table sensor.cfg_topic;
  8. drop table sensor.topic_list;
  9. drop table sensor.topic_def;
  10. create table if not exists sensor.datain (timestamp bigint not null,
  11. topic varchar(256),
  12. value bigint not null);
  13. create table if not exists sensor.data_storage (timestamp bigint not null,
  14. topicid bigint not null,
  15. value bigint not null);
  16. create table if not exists sensor.last_data (topicid bigint not null,
  17. value bigint not null);
  18. create table if not exists sensor.cfg_topic (feld varchar(256),
  19. pos smallint not null,
  20. minlength smallint );
  21. create table if not exists sensor.topic_list (topicid bigint not null primary key,
  22. topic varchar(256) not null unique);
  23. create table if not exists sensor.topic_def (topicid bigint not null,
  24. feld varchar(256),
  25. inhalt varchar(256));
  26. insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
  27. insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
  28. insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
  29. insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
  30. grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
  31. grant insert,select,update on sensor.last_data to akosensordatain with grant option;
  32. grant insert,select on sensor.topic_list to akosensordatain with grant option;
  33. grant insert,select on sensor.topic_def to akosensordatain with grant option;
  34. grant execute on all procedures in schema sensor to akosensordatain with grant option;
  35. grant select,insert,update on sensor.data_storage to sensor with grant option;
  36. grant insert,select,update on sensor.last_data to sensor with grant option;
  37. grant insert,select on sensor.topic_list to sensor with grant option;
  38. grant insert,select on sensor.topic_def to sensor with grant option;
  39. grant execute on all procedures in schema sensor to sensor with grant option;
  40. grant connect on database sensor to sensor;
  41. grant select on akosensor.data_storage to sensordataread with grant option;
  42. CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
  43. language plpgsql
  44. as $$
  45. declare
  46. feldname varchar(1024);
  47. tpos int;
  48. feldinhalt varchar(64);
  49. bcfgDone INT;
  50. cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
  51. begin
  52. OPEN cfg_curs;
  53. loop
  54. fetch cfg_curs into feldname,tpos;
  55. exit when not found;
  56. if (tpos < 0) then
  57. feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
  58. else
  59. feldinhalt := split_part(vtopic,'/',tpos);
  60. end if;
  61. if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then
  62. insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
  63. end if;
  64. end loop;
  65. close cfg_curs;
  66. end
  67. $$;
  68. CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
  69. language plpgsql
  70. as $$
  71. declare
  72. crctopic bigint;
  73. BEGIN
  74. if intopic is null then
  75. raise 'missing Topic' using message = 'Topic is missing';
  76. end if;
  77. crctopic=crc32(intopic);
  78. if invalue is null then
  79. raise 'missing value' using message = 'Value is missing';
  80. end if;
  81. if tstamp is null then
  82. raise 'missing time' using message = 'Time is missing';
  83. end if;
  84. if (crctopic in (select topicid from sensor.topic_list))=false then
  85. call sensor.insert_topic(intopic,crctopic);
  86. insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
  87. insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
  88. insert into sensor.last_data (topicid,value) values (crctopic,invalue);
  89. else
  90. if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
  91. insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
  92. update sensor.last_data set value=invalue where topicid=crctopic;
  93. else
  94. if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
  95. update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
  96. else
  97. insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
  98. end if;
  99. end if;
  100. end if;
  101. END
  102. $$;
  103. CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
  104. DECLARE
  105. tmp bigint;
  106. i int;
  107. j int;
  108. byte_length int;
  109. binary_string bytea;
  110. BEGIN
  111. IF text_string = '' THEN
  112. RETURN 0;
  113. END IF;
  114. i = 0;
  115. tmp = 4294967295;
  116. byte_length = bit_length(text_string) / 8;
  117. binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
  118. LOOP
  119. tmp = (tmp # get_byte(binary_string, i))::bigint;
  120. i = i + 1;
  121. j = 0;
  122. LOOP
  123. tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
  124. j = j + 1;
  125. IF j >= 8 THEN
  126. EXIT;
  127. END IF;
  128. END LOOP;
  129. IF i >= byte_length THEN
  130. EXIT;
  131. END IF;
  132. END LOOP;
  133. RETURN (tmp # 4294967295);
  134. END
  135. $$ IMMUTABLE LANGUAGE plpgsql;