|
@@ -0,0 +1,93 @@
|
|
|
+from helpers.logger import Logger
|
|
|
+import requests
|
|
|
+from bs4 import BeautifulSoup as bs
|
|
|
+import datetime
|
|
|
+
|
|
|
+
|
|
|
+class NotYoutube:
|
|
|
+
|
|
|
+ def __init__(self, cache_dir: str):
|
|
|
+ self.cache_file = cache_dir + "not_youtube.cache"
|
|
|
+ self.source_cache = {}
|
|
|
+ self.list = self._restore_from_cache()
|
|
|
+
|
|
|
+ def get_channel_name(self, channel_url: str) -> str:
|
|
|
+ if NotYoutube._is_not_youtube_url(channel_url) is not True:
|
|
|
+ Logger.log_info("link is not for NotYoutube", channel_url)
|
|
|
+ return ""
|
|
|
+
|
|
|
+ Logger.log_info("try get info about channel", channel_url)
|
|
|
+ cache_name = self._check_in_cache(channel_url)
|
|
|
+ if cache_name != "":
|
|
|
+ return cache_name
|
|
|
+
|
|
|
+ page_source = requests.get(channel_url)
|
|
|
+ if page_source.status_code != 200:
|
|
|
+ Logger.log_error("can't load info about channel", channel_url)
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # put request in cache, will reuse later on channels load
|
|
|
+ self.source_cache[channel_url] = {"src": page_source, "tms": datetime.datetime.now().timestamp()}
|
|
|
+ page_source.encoding = 'utf-8'
|
|
|
+ pre_soup = bs(page_source.text, 'html.parser')
|
|
|
+ pre_name = pre_soup.select("span")
|
|
|
+ if len(pre_name) == 0:
|
|
|
+ Logger.log_error("can't get channel name", channel_url)
|
|
|
+ return ""
|
|
|
+ channel_name = pre_name[1].text
|
|
|
+ self._put_in_cache(channel_url, channel_name)
|
|
|
+ return channel_name
|
|
|
+
|
|
|
+ def get_channel_video(self, channel_url: str) -> dict:
|
|
|
+ source = self._get_page_source(channel_url)
|
|
|
+ source.encoding = 'utf-8'
|
|
|
+ soup = bs(source.text, 'html.parser')
|
|
|
+ video_list = {}
|
|
|
+ for link in soup.select("div.h-box p a"):
|
|
|
+ if link['href'].startswith('/watch?') is not True:
|
|
|
+ continue
|
|
|
+ if link.text.strip() == "":
|
|
|
+ continue
|
|
|
+ video_list[link.text] = link['href']
|
|
|
+ return video_list
|
|
|
+
|
|
|
+ def _get_page_source(self, url: str):
|
|
|
+ current_tms = datetime.datetime.now().timestamp()
|
|
|
+ if self.source_cache.get(url) is not None:
|
|
|
+ # found page in cache, check how old is it
|
|
|
+ if current_tms - self.source_cache.get(url)['tms'] < 300: # if cache is not older than 5 min - return
|
|
|
+ return self.source_cache.get(url)['src']
|
|
|
+ page_source = requests.get(url)
|
|
|
+ if page_source.status_code == 200:
|
|
|
+ self.source_cache[url] = {"src": page_source, "tms": current_tms}
|
|
|
+ return page_source
|
|
|
+
|
|
|
+ def _restore_from_cache(self) -> dict:
|
|
|
+ result = {}
|
|
|
+ try:
|
|
|
+ channels_list_file = open(self.cache_file, 'r+')
|
|
|
+ for i in channels_list_file.readlines():
|
|
|
+ ch_data = i.split(";")
|
|
|
+ if len(ch_data) != 2:
|
|
|
+ Logger.log_error("invalid cache data, skip it", i)
|
|
|
+ continue
|
|
|
+ result[ch_data[0]] = ch_data[1]
|
|
|
+ channels_list_file.close()
|
|
|
+ return result
|
|
|
+ except Exception as e:
|
|
|
+ Logger.log_error("error read from cache", str(e))
|
|
|
+ return result
|
|
|
+
|
|
|
+ def _check_in_cache(self, channel_url: str) -> str:
|
|
|
+ if self.list.get(channel_url) is not None:
|
|
|
+ return self.list.get(channel_url)
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def _put_in_cache(self, channel_url: str, channel_name: str):
|
|
|
+ self.list[channel_url] = channel_name
|
|
|
+ with open(self.cache_file, "a") as cache_file:
|
|
|
+ cache_file.write("{0};{1}".format(channel_url, channel_name))
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _is_not_youtube_url(channel_url: str) -> bool:
|
|
|
+ return "notyoutube.org" in channel_url
|