request.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from utils.config import get_config
  2. from tqdm.asyncio import tqdm_asyncio
  3. from time import time
  4. from requests import Session, exceptions
  5. from utils.retry import retry_func
  6. import re
  7. from utils.channel import format_channel_name
  8. from utils.tools import merge_objects, get_pbar_remaining
  9. from concurrent.futures import ThreadPoolExecutor
  10. config = get_config()
  11. timeout = 30
  12. async def get_channels_by_subscribe_urls(callback):
  13. """
  14. Get the channels by subscribe urls
  15. """
  16. subscribe_results = {}
  17. pattern = r"^(.*?),(?!#genre#)(.*?)$"
  18. subscribe_urls_len = len(config.subscribe_urls)
  19. pbar = tqdm_asyncio(total=subscribe_urls_len, desc="Processing subscribe")
  20. start_time = time()
  21. callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
  22. session = Session()
  23. def process_subscribe_channels(subscribe_url):
  24. channels = {}
  25. try:
  26. response = None
  27. try:
  28. response = retry_func(
  29. lambda: session.get(subscribe_url, timeout=timeout),
  30. name=subscribe_url,
  31. )
  32. except exceptions.Timeout:
  33. print(f"Timeout on subscribe: {subscribe_url}")
  34. if response:
  35. content = response.text
  36. lines = content.split("\n")
  37. for line in lines:
  38. matcher = re.match(pattern, line)
  39. if matcher is not None:
  40. key = matcher.group(1)
  41. resolution_match = re.search(r"_(\((.*?)\))", key)
  42. resolution = (
  43. resolution_match.group(2)
  44. if resolution_match is not None
  45. else None
  46. )
  47. url = matcher.group(2)
  48. value = (url, None, resolution)
  49. name = format_channel_name(key)
  50. if name in channels:
  51. if value not in channels[name]:
  52. channels[name].append(value)
  53. else:
  54. channels[name] = [value]
  55. except Exception as e:
  56. print(f"Error on {subscribe_url}: {e}")
  57. finally:
  58. pbar.update()
  59. remain = subscribe_urls_len - pbar.n
  60. callback(
  61. f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
  62. int((pbar.n / subscribe_urls_len) * 100),
  63. )
  64. if config.open_online_search and pbar.n / subscribe_urls_len == 1:
  65. callback("正在获取在线搜索结果, 请耐心等待", 0)
  66. return channels
  67. with ThreadPoolExecutor(max_workers=100) as executor:
  68. futures = [
  69. executor.submit(process_subscribe_channels, subscribe_url)
  70. for subscribe_url in config.subscribe_urls
  71. ]
  72. for future in futures:
  73. merge_objects(subscribe_results, future.result())
  74. session.close()
  75. pbar.close()
  76. return subscribe_results