main.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/usr/bin/env python3
  2. # - built in - #
  3. import sys
  4. import json
  5. import asyncio
  6. import logging
  7. from pathlib import Path
  8. from typing import Optional
  9. from datetime import datetime
  10. from dataclasses import dataclass
  11. # - pypi - #
  12. from telethon import hints # type: ignore
  13. from telethon import TelegramClient
  14. from telethon.tl.types import ( # type: ignore
  15. Message,
  16. PeerChannel,
  17. ChannelParticipantsAdmins as AdmFilter
  18. )
  19. # - local -#
  20. import config
  21. import invidious
  22. from database import Database
  23. # logging.basicConfig(level='DEBUG')
  24. @dataclass
  25. class YTNotifyData:
  26. db: Database
  27. bot: TelegramClient
  28. api: invidious.Invidious
  29. chat: PeerChannel
  30. delay: int
  31. channel: str
  32. _channel_name: Optional[str] = None
  33. class YTNotify(YTNotifyData):
  34. _NAME = 'YTNotify'
  35. # - log - #
  36. __handler = logging.StreamHandler()
  37. __handler.setLevel(config.LOGLEVEL)
  38. __handler.setFormatter(logging.Formatter(config.LOGFMT, config.TIMEFMT))
  39. _log = logging.getLogger(_NAME)
  40. _log.addHandler(__handler)
  41. _log.setLevel(config.LOGLEVEL)
  42. @property
  43. def channel_name(self) -> str:
  44. return self._channel_name or self.channel
  45. @staticmethod
  46. def is_live(info: invidious.VideoInfo) -> bool:
  47. return info.lengthSeconds == 0
  48. @staticmethod
  49. def utc_now() -> int:
  50. return int(datetime.utcnow().timestamp())
  51. async def _send_message(self, text: str,
  52. chat: hints.EntityLike = None) -> Message:
  53. if not chat:
  54. chat = await self.bot.get_entity(self.chat)
  55. return await self.bot.send_message(chat, text)
  56. async def _get_last_live(self) -> Optional[str]:
  57. return await self.db.get_last_live(self.channel)
  58. async def _update_last_live(self, video_id: str) -> int:
  59. return await self.db.update_last_live(self.channel, video_id)
  60. async def _warn_admins(self, attempts: int) -> None:
  61. text = ('cannot get latest video from '
  62. '[%s](https://youtube.com/channel/%s)\n\n'
  63. 'attempts: %d' % (self.channel_name, self.channel, attempts))
  64. chat = await self.bot.get_entity(self.chat)
  65. async for adm in self.bot.iter_participants(chat, filter=AdmFilter):
  66. if not adm.bot:
  67. await self._send_message(text, chat=adm)
  68. async def _video_handler(self, video: invidious.VideoInfo) -> None:
  69. last_id = await self._get_last_live()
  70. if video == invidious.NO_VIDEOS:
  71. self._log.debug(
  72. '%s: looks like there are no videos for', self.channel)
  73. return
  74. if self.is_live(video):
  75. if last_id == video.videoId:
  76. return
  77. self._log.info('%s: new live found: %s',
  78. self.channel, video.videoId)
  79. await asyncio.wait([
  80. self._update_last_live(video.videoId),
  81. self._send_message('https://youtu.be/%s' % video.videoId)
  82. ])
  83. elif not last_id:
  84. self._log.debug('%s: last stream id not found in db, '
  85. 'adding 0.' % self.channel)
  86. await self._update_last_live('0')
  87. async def run(self) -> None:
  88. attempts: int = int()
  89. refreshed: bool = False
  90. while True:
  91. info = await self.api.get_latest_video(self.channel)
  92. if info is not None:
  93. if not self.channel_name:
  94. self._channel_name = info.author
  95. await self._video_handler(info)
  96. attempts = 0
  97. refreshed = False
  98. else:
  99. self._log.warning('%s: cannot get latest video', self.channel)
  100. if (attempts := attempts + 1) >= config.ATEMPTS:
  101. if refreshed:
  102. await self._warn_admins(attempts)
  103. else:
  104. await self.api.find_instance()
  105. refreshed = True
  106. self._log.debug('%s: sleeping for %d seconds',
  107. self.channel, self.delay)
  108. await asyncio.sleep(self.delay)
  109. async def main() -> int:
  110. loop = asyncio.get_event_loop()
  111. db = Database(Path(config.DB_PATH))
  112. bot = TelegramClient('bot', config.API_ID, config.API_HASH)
  113. api = invidious.Invidious()
  114. if config.INSTANCE:
  115. api.set_instance(config.INSTANCE)
  116. else:
  117. if not await api.find_instance():
  118. return 1
  119. await bot.start(bot_token=config.TOKEN)
  120. with Path(config.CHANNELS_PATH).open() as fptr:
  121. channels = json.load(fptr)
  122. for channel in channels:
  123. loop.create_task(
  124. YTNotify(
  125. db, bot, api,
  126. chat=channel.get('chat', config.CHAT_ID),
  127. delay=channel.get('timeout', 10 * 60),
  128. channel=channel['id'],
  129. ).run()
  130. )
  131. await bot.run_until_disconnected()
  132. return 0
  133. if __name__ == '__main__':
  134. try:
  135. sys.exit(
  136. asyncio
  137. .get_event_loop_policy()
  138. .get_event_loop()
  139. .run_until_complete(main())
  140. )
  141. except KeyboardInterrupt:
  142. pass