youtube_live_chat.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import json
  2. import time
  3. from .fragment import FragmentFD
  4. from ..networking.exceptions import HTTPError
  5. from ..utils import (
  6. RegexNotFoundError,
  7. RetryManager,
  8. dict_get,
  9. int_or_none,
  10. try_get,
  11. )
  12. from ..utils.networking import HTTPHeaderDict
  13. class YoutubeLiveChatFD(FragmentFD):
  14. """ Downloads YouTube live chats fragment by fragment """
  15. def real_download(self, filename, info_dict):
  16. video_id = info_dict['video_id']
  17. self.to_screen('[%s] Downloading live chat' % self.FD_NAME)
  18. if not self.params.get('skip_download') and info_dict['protocol'] == 'youtube_live_chat':
  19. self.report_warning('Live chat download runs until the livestream ends. '
  20. 'If you wish to download the video simultaneously, run a separate hypervideo instance')
  21. test = self.params.get('test', False)
  22. ctx = {
  23. 'filename': filename,
  24. 'live': True,
  25. 'total_frags': None,
  26. }
  27. from ..extractor.youtube import YoutubeBaseInfoExtractor
  28. ie = YoutubeBaseInfoExtractor(self.ydl)
  29. start_time = int(time.time() * 1000)
  30. def dl_fragment(url, data=None, headers=None):
  31. http_headers = HTTPHeaderDict(info_dict.get('http_headers'), headers)
  32. return self._download_fragment(ctx, url, info_dict, http_headers, data)
  33. def parse_actions_replay(live_chat_continuation):
  34. offset = continuation_id = click_tracking_params = None
  35. processed_fragment = bytearray()
  36. for action in live_chat_continuation.get('actions', []):
  37. if 'replayChatItemAction' in action:
  38. replay_chat_item_action = action['replayChatItemAction']
  39. offset = int(replay_chat_item_action['videoOffsetTimeMsec'])
  40. processed_fragment.extend(
  41. json.dumps(action, ensure_ascii=False).encode() + b'\n')
  42. if offset is not None:
  43. continuation = try_get(
  44. live_chat_continuation,
  45. lambda x: x['continuations'][0]['liveChatReplayContinuationData'], dict)
  46. if continuation:
  47. continuation_id = continuation.get('continuation')
  48. click_tracking_params = continuation.get('clickTrackingParams')
  49. self._append_fragment(ctx, processed_fragment)
  50. return continuation_id, offset, click_tracking_params
  51. def try_refresh_replay_beginning(live_chat_continuation):
  52. # choose the second option that contains the unfiltered live chat replay
  53. refresh_continuation = try_get(
  54. live_chat_continuation,
  55. lambda x: x['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
  56. if refresh_continuation:
  57. # no data yet but required to call _append_fragment
  58. self._append_fragment(ctx, b'')
  59. refresh_continuation_id = refresh_continuation.get('continuation')
  60. offset = 0
  61. click_tracking_params = refresh_continuation.get('trackingParams')
  62. return refresh_continuation_id, offset, click_tracking_params
  63. return parse_actions_replay(live_chat_continuation)
  64. live_offset = 0
  65. def parse_actions_live(live_chat_continuation):
  66. nonlocal live_offset
  67. continuation_id = click_tracking_params = None
  68. processed_fragment = bytearray()
  69. for action in live_chat_continuation.get('actions', []):
  70. timestamp = self.parse_live_timestamp(action)
  71. if timestamp is not None:
  72. live_offset = timestamp - start_time
  73. # compatibility with replay format
  74. pseudo_action = {
  75. 'replayChatItemAction': {'actions': [action]},
  76. 'videoOffsetTimeMsec': str(live_offset),
  77. 'isLive': True,
  78. }
  79. processed_fragment.extend(
  80. json.dumps(pseudo_action, ensure_ascii=False).encode() + b'\n')
  81. continuation_data_getters = [
  82. lambda x: x['continuations'][0]['invalidationContinuationData'],
  83. lambda x: x['continuations'][0]['timedContinuationData'],
  84. ]
  85. continuation_data = try_get(live_chat_continuation, continuation_data_getters, dict)
  86. if continuation_data:
  87. continuation_id = continuation_data.get('continuation')
  88. click_tracking_params = continuation_data.get('clickTrackingParams')
  89. timeout_ms = int_or_none(continuation_data.get('timeoutMs'))
  90. if timeout_ms is not None:
  91. time.sleep(timeout_ms / 1000)
  92. self._append_fragment(ctx, processed_fragment)
  93. return continuation_id, live_offset, click_tracking_params
  94. def download_and_parse_fragment(url, frag_index, request_data=None, headers=None):
  95. for retry in RetryManager(self.params.get('fragment_retries'), self.report_retry, frag_index=frag_index):
  96. try:
  97. success = dl_fragment(url, request_data, headers)
  98. if not success:
  99. return False, None, None, None
  100. raw_fragment = self._read_fragment(ctx)
  101. try:
  102. data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
  103. except RegexNotFoundError:
  104. data = None
  105. if not data:
  106. data = json.loads(raw_fragment)
  107. live_chat_continuation = try_get(
  108. data,
  109. lambda x: x['continuationContents']['liveChatContinuation'], dict) or {}
  110. func = (info_dict['protocol'] == 'youtube_live_chat' and parse_actions_live
  111. or frag_index == 1 and try_refresh_replay_beginning
  112. or parse_actions_replay)
  113. return (True, *func(live_chat_continuation))
  114. except HTTPError as err:
  115. retry.error = err
  116. continue
  117. return False, None, None, None
  118. self._prepare_and_start_frag_download(ctx, info_dict)
  119. success = dl_fragment(info_dict['url'])
  120. if not success:
  121. return False
  122. raw_fragment = self._read_fragment(ctx)
  123. try:
  124. data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
  125. except RegexNotFoundError:
  126. return False
  127. continuation_id = try_get(
  128. data,
  129. lambda x: x['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
  130. # no data yet but required to call _append_fragment
  131. self._append_fragment(ctx, b'')
  132. ytcfg = ie.extract_ytcfg(video_id, raw_fragment.decode('utf-8', 'replace'))
  133. if not ytcfg:
  134. return False
  135. api_key = try_get(ytcfg, lambda x: x['INNERTUBE_API_KEY'])
  136. innertube_context = try_get(ytcfg, lambda x: x['INNERTUBE_CONTEXT'])
  137. if not api_key or not innertube_context:
  138. return False
  139. visitor_data = try_get(innertube_context, lambda x: x['client']['visitorData'], str)
  140. if info_dict['protocol'] == 'youtube_live_chat_replay':
  141. url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
  142. chat_page_url = 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
  143. elif info_dict['protocol'] == 'youtube_live_chat':
  144. url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
  145. chat_page_url = 'https://www.youtube.com/live_chat?continuation=' + continuation_id
  146. frag_index = offset = 0
  147. click_tracking_params = None
  148. while continuation_id is not None:
  149. frag_index += 1
  150. request_data = {
  151. 'context': innertube_context,
  152. 'continuation': continuation_id,
  153. }
  154. if frag_index > 1:
  155. request_data['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
  156. if click_tracking_params:
  157. request_data['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
  158. headers = ie.generate_api_headers(ytcfg=ytcfg, visitor_data=visitor_data)
  159. headers.update({'content-type': 'application/json'})
  160. fragment_request_data = json.dumps(request_data, ensure_ascii=False).encode() + b'\n'
  161. success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
  162. url, frag_index, fragment_request_data, headers)
  163. else:
  164. success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
  165. chat_page_url, frag_index)
  166. if not success:
  167. return False
  168. if test:
  169. break
  170. return self._finish_frag_download(ctx, info_dict)
  171. @staticmethod
  172. def parse_live_timestamp(action):
  173. action_content = dict_get(
  174. action,
  175. ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
  176. if not isinstance(action_content, dict):
  177. return None
  178. item = dict_get(action_content, ['item', 'bannerRenderer'])
  179. if not isinstance(item, dict):
  180. return None
  181. renderer = dict_get(item, [
  182. # text
  183. 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
  184. 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
  185. # ticker
  186. 'liveChatTickerPaidMessageItemRenderer',
  187. 'liveChatTickerSponsorItemRenderer',
  188. # banner
  189. 'liveChatBannerRenderer',
  190. ])
  191. if not isinstance(renderer, dict):
  192. return None
  193. parent_item_getters = [
  194. lambda x: x['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
  195. lambda x: x['contents'],
  196. ]
  197. parent_item = try_get(renderer, parent_item_getters, dict)
  198. if parent_item:
  199. renderer = dict_get(parent_item, [
  200. 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
  201. 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
  202. ])
  203. if not isinstance(renderer, dict):
  204. return None
  205. return int_or_none(renderer.get('timestampUsec'), 1000)