db_client.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import psycopg2
  2. import psycopg2.extensions
  3. class db_message_log_client():
  4. def __init__(self, connstr):
  5. self._conn = None
  6. self._connstr = connstr
  7. def __del__(self):
  8. self._conn.close()
  9. def connect(self) -> bool:
  10. try:
  11. self._conn = psycopg2.connect(self._connstr)
  12. except Exception as e:
  13. print(f'Failed connect to db: {e}.')
  14. return False
  15. return True
  16. def insert_message(self, message, author_id, author_name, channel, timestamp):
  17. self._check_connection()
  18. self._check_user_exist(channel.id, channel.name)
  19. self._check_user_exist(author_id, author_name)
  20. try:
  21. #Добавляю сообщение в таблицу сообщений
  22. self._conn.cursor().execute("INSERT INTO messages (timestamp, channel_id, author_id, message) VALUES (%s, %s, %s, %s)", (timestamp, channel.id, author_id, message))
  23. self._conn.commit()
  24. except Exception as e:
  25. print(f'Failed insert to db: {e}.')
  26. def get_channel_author_last_activity(self, id):
  27. self._check_connection()
  28. try:
  29. cur = self._conn.cursor()
  30. cur.execute("""
  31. SELECT timestamp FROM messages
  32. WHERE author_id = %s AND
  33. channel_id = %s
  34. ORDER BY timestamp DESC
  35. LIMIT 1;
  36. """, (id, id))
  37. res = cur.fetchall()
  38. if res:
  39. return res[0][0]
  40. else:
  41. return None
  42. except Exception as e:
  43. print(f'Failed get user last activity in db: {e}.')
  44. return None
  45. def get_last_active_users(self, chanel_id):
  46. try:
  47. cur = self._conn.cursor()
  48. cur.execute("""
  49. SELECT u.name
  50. FROM messages AS m
  51. JOIN users AS u ON u.id=m.author_id
  52. WHERE m.timestamp >= date_trunc('second', now()) - INTERVAL '43200 second'
  53. AND m.timestamp < (date_trunc('second', now()))
  54. AND m.channel_id = '%s'
  55. GROUP BY u.id
  56. ORDER BY MAX(m.timestamp) DESC
  57. LIMIT 20
  58. """,
  59. [chanel_id])
  60. res = cur.fetchall()
  61. if res:
  62. return res
  63. except Exception as e:
  64. print(f'Failed check last active users in db: {e}.')
  65. return None
  66. def get_random_message_by_user(self, user_id):
  67. try:
  68. cur = self._conn.cursor()
  69. cur.execute("""
  70. SELECT message FROM public.messages AS m
  71. WHERE author_id='%s'
  72. ORDER BY random()
  73. LIMIT 1
  74. """,
  75. [user_id])
  76. res = cur.fetchall()
  77. if res:
  78. return res[0][0]
  79. except Exception as e:
  80. print(f'Failed check last active users in db: {e}.')
  81. return None
  82. def get_random_user_by_last_n_hours(self, channel_id, hours):
  83. try:
  84. cur = self._conn.cursor()
  85. cur.execute("""
  86. SELECT author_id FROM public.messages
  87. WHERE timestamp BETWEEN now() - interval '%s hours' AND now()
  88. AND channel_id = '%s'
  89. GROUP BY author_id
  90. ORDER BY random()
  91. LIMIT 1
  92. """,
  93. (hours, channel_id))
  94. res = cur.fetchall()
  95. if res:
  96. return res[0][0]
  97. except Exception as e:
  98. print(f'Failed GetRandomUserByLastNHours in db: {e}.')
  99. return None
  100. def update_ogey(self, channel_id, ogey_id):
  101. try:
  102. cur = self._conn.cursor()
  103. cur.execute("""
  104. INSERT INTO ogeyofday (channel_id, ogey_id, timestamp)
  105. VALUES (%s, %s, now())
  106. ON CONFLICT (channel_id) DO UPDATE
  107. SET ogey_id = %s,
  108. timestamp = now();
  109. """,
  110. (channel_id, ogey_id, ogey_id))
  111. self._conn.commit()
  112. except Exception as e:
  113. print(f'Failed UpdateOgey in db: {e}.')
  114. return False
  115. return True
  116. def get_ogey(self, channel_id):
  117. try:
  118. cur = self._conn.cursor()
  119. cur.execute("""
  120. SELECT u.name FROM ogeyofday AS o
  121. JOIN users AS u ON u.id = o.ogey_id
  122. WHERE o.channel_id = '%s'
  123. """,
  124. [channel_id])
  125. res = cur.fetchall()
  126. if res:
  127. return res[0][0]
  128. except Exception as e:
  129. print(f'Failed GetOgey in db: {e}.')
  130. return None
  131. def get_top_of_month_users(self, channel_id):
  132. try:
  133. cur = self._conn.cursor()
  134. cur.execute("""
  135. SELECT ua.name, COUNT(*) FROM messages AS m
  136. JOIN users AS ua ON ua.id = m.author_id
  137. WHERE m.timestamp >= date_trunc('month', now())
  138. AND m.timestamp < date_trunc('day' , now()) + interval '1 day'
  139. AND m.channel_id = '%s'
  140. GROUP BY ua.id
  141. ORDER BY COUNT(m.author_id) DESC
  142. LIMIT 15
  143. """,
  144. [channel_id])
  145. res = cur.fetchall()
  146. if res:
  147. return res
  148. except Exception as e:
  149. print(f'Failed get top of month users in db: {e}.')
  150. return None
  151. def get_users_message_count_for_mounth_by_name(self, channel_id, user_name):
  152. try:
  153. cur = self._conn.cursor()
  154. cur.execute("""
  155. SELECT COUNT(*) FROM messages AS m
  156. JOIN users AS ua ON ua.id = m.author_id
  157. WHERE m.timestamp >= date_trunc('month', now())
  158. AND m.timestamp < date_trunc('day' , now()) + interval '1 day'
  159. AND m.channel_id = %s
  160. AND ua.name = %s
  161. """,
  162. (channel_id, user_name))
  163. res = cur.fetchall()
  164. if res:
  165. return res[0][0]
  166. except Exception as e:
  167. print(f'Failed get message count of month for user {user_name} in db: {e}.')
  168. return None
  169. def get_all_users_message_count_for_mounth(self, channel_id):
  170. try:
  171. cur = self._conn.cursor()
  172. cur.execute("""
  173. SELECT COUNT(*) FROM messages AS m
  174. JOIN users AS ua ON ua.id = m.author_id
  175. WHERE m.timestamp >= date_trunc('month', now())
  176. AND m.timestamp < date_trunc('day' , now()) + interval '1 day'
  177. AND m.channel_id = %s
  178. """,
  179. [channel_id])
  180. res = cur.fetchall()
  181. if res:
  182. return res[0][0]
  183. except Exception as e:
  184. print(f'Failed get message count of month for channel {channel_id} in db: {e}.')
  185. return None
  186. def get_malenia_in_channel(self, channel_id):
  187. try:
  188. cur = self._conn.cursor()
  189. cur.execute("""
  190. SELECT COUNT(*) FROM messages AS m
  191. WHERE m.channel_id = '%s'
  192. AND (m.message ILIKE '%%мален%%' OR
  193. m.message ILIKE '%%маслени%%' OR
  194. m.message ILIKE '%%мелани%%' OR
  195. m.message ILIKE '%%милани%%'
  196. )
  197. """,
  198. [channel_id])
  199. res = cur.fetchall()
  200. if res:
  201. return res[0][0]
  202. except Exception as e:
  203. print(f'Failed GetMaleniaInChannel {channel_id} in db: {e}.')
  204. return None
  205. def _check_user_exist(self, id, name):
  206. try:
  207. #Добавляю пользователя если его нет в таблицу пользователей
  208. #TODO При подключении считывать таблицу пользователей в память и искать Id в памяти и потом пытаться записать пользователя в БД
  209. self._conn.cursor().execute("INSERT INTO users (id, name) VALUES (%s, %s) ON CONFLICT DO NOTHING", (id, name))
  210. self._conn.commit()
  211. except Exception as e:
  212. print(f'Failed check user in db: {e}.')
  213. def _check_connection(self):
  214. try:
  215. self._conn.cursor() #Через какоето время _conn теряется, поэтому проверяю таким способом его наличие
  216. except AttributeError as e:
  217. self.connect()