6.1 KB

  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """
  3. Youtube (Videos)
  4. """
  5. from functools import reduce
  6. from json import loads, dumps
  7. from urllib.parse import quote_plus
  8. from random import random
  9. # about
  10. about = {
  11. "website": '',
  12. "wikidata_id": 'Q866',
  13. "official_api_documentation": '',
  14. "use_official_api": False,
  15. "require_api_key": False,
  16. "results": 'HTML',
  17. }
  18. # engine dependent config
  19. categories = ['videos', 'music']
  20. paging = True
  21. language_support = False
  22. time_range_support = True
  23. # search-url
  24. base_url = ''
  25. search_url = base_url + '?search_query={query}&page={page}&ucbcb=1'
  26. time_range_url = '&sp=EgII{time_range}%253D%253D'
  27. # the key seems to be constant
  28. next_page_url = ''
  29. time_range_dict = {'day': 'Ag',
  30. 'week': 'Aw',
  31. 'month': 'BA',
  32. 'year': 'BQ'}
  33. embedded_url = '<iframe width="540" height="304" ' +\
  34. 'data-src="{videoid}" ' +\
  35. 'frameborder="0" allowfullscreen></iframe>'
  36. base_youtube_url = ''
  37. # do search-request
  38. def request(query, params):
  39. params['cookies']['CONSENT'] = "PENDING+" + str(random() * 100)
  40. if not params['engine_data'].get('next_page_token'):
  41. params['url'] = search_url.format(query=quote_plus(query), page=params['pageno'])
  42. if params['time_range'] in time_range_dict:
  43. params['url'] += time_range_url.format(time_range=time_range_dict[params['time_range']])
  44. else:
  45. params['url'] = next_page_url
  46. params['method'] = 'POST'
  47. params['data'] = dumps({
  48. 'context': {"client": {"clientName": "WEB", "clientVersion": "2.20210310.12.01"}},
  49. 'continuation': params['engine_data']['next_page_token'],
  50. })
  51. params['headers']['Content-Type'] = 'application/json'
  52. return params
  53. # get response from search-request
  54. def response(resp):
  55. if resp.search_params.get('engine_data'):
  56. return parse_next_page_response(resp.text)
  57. return parse_first_page_response(resp.text)
  58. def parse_next_page_response(response_text):
  59. results = []
  60. result_json = loads(response_text)
  61. for section in (result_json['onResponseReceivedCommands'][0]
  62. .get('appendContinuationItemsAction')['continuationItems'][0]
  63. .get('itemSectionRenderer')['contents']):
  64. if 'videoRenderer' not in section:
  65. continue
  66. section = section['videoRenderer']
  67. content = "-"
  68. if 'descriptionSnippet' in section:
  69. content = ' '.join(x['text'] for x in section['descriptionSnippet']['runs'])
  70. results.append({
  71. 'url': base_youtube_url + section['videoId'],
  72. 'title': ' '.join(x['text'] for x in section['title']['runs']),
  73. 'content': content,
  74. 'author': section['ownerText']['runs'][0]['text'],
  75. 'length': section['lengthText']['simpleText'],
  76. 'template': 'videos.html',
  77. 'embedded': embedded_url.format(videoid=section['videoId']),
  78. 'thumbnail': section['thumbnail']['thumbnails'][-1]['url'],
  79. })
  80. try:
  81. token = result_json['onResponseReceivedCommands'][0]\
  82. .get('appendContinuationItemsAction')['continuationItems'][1]\
  83. .get('continuationItemRenderer')['continuationEndpoint']\
  84. .get('continuationCommand')['token']
  85. results.append({
  86. "engine_data": token,
  87. "key": "next_page_token",
  88. })
  89. except:
  90. pass
  91. return results
  92. def parse_first_page_response(response_text):
  93. results = []
  94. results_data = response_text[response_text.find('ytInitialData'):]
  95. results_data = results_data[results_data.find('{'):results_data.find(';</script>')]
  96. results_json = loads(results_data) if results_data else {}
  97. sections = results_json.get('contents', {})\
  98. .get('twoColumnSearchResultsRenderer', {})\
  99. .get('primaryContents', {})\
  100. .get('sectionListRenderer', {})\
  101. .get('contents', [])
  102. for section in sections:
  103. if "continuationItemRenderer" in section:
  104. next_page_token = section["continuationItemRenderer"]\
  105. .get("continuationEndpoint", {})\
  106. .get("continuationCommand", {})\
  107. .get("token", "")
  108. if next_page_token:
  109. results.append({
  110. "engine_data": next_page_token,
  111. "key": "next_page_token",
  112. })
  113. for video_container in section.get('itemSectionRenderer', {}).get('contents', []):
  114. video = video_container.get('videoRenderer', {})
  115. videoid = video.get('videoId')
  116. if videoid is not None:
  117. url = base_youtube_url + videoid
  118. thumbnail = '' + videoid + '/hqdefault.jpg'
  119. title = get_text_from_json(video.get('title', {}))
  120. content = get_text_from_json(video.get('descriptionSnippet', {}))
  121. embedded = embedded_url.format(videoid=videoid)
  122. author = get_text_from_json(video.get('ownerText', {}))
  123. length = get_text_from_json(video.get('lengthText', {}))
  124. # append result
  125. results.append({'url': url,
  126. 'title': title,
  127. 'content': content,
  128. 'author': author,
  129. 'length': length,
  130. 'template': 'videos.html',
  131. 'embedded': embedded,
  132. 'thumbnail': thumbnail})
  133. # return results
  134. return results
  135. def get_text_from_json(element):
  136. if 'runs' in element:
  137. return reduce(lambda a, b: a + b.get('text', ''), element.get('runs'), '')
  138. else:
  139. return element.get('simpleText', '')