request.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from utils.config import get_config
  2. from tqdm.asyncio import tqdm_asyncio
  3. from time import time
  4. from requests import get
  5. from concurrent.futures import ThreadPoolExecutor
  6. import fofa_map
  7. from driver.setup import setup_driver
  8. import re
  9. from utils.retry import retry_func
  10. from utils.channel import format_channel_name
  11. from utils.tools import merge_objects, get_pbar_remaining
  12. from proxy import get_proxy, get_proxy_next
  13. from requests_custom.utils import get_source_requests, close_session
  14. config = get_config()
  15. timeout = 30
  16. def get_fofa_urls_from_region_list():
  17. """
  18. Get the FOFA url from region
  19. """
  20. region_list = getattr(config, "region_list", [])
  21. urls = []
  22. region_url = getattr(fofa_map, "region_url")
  23. if "all" in region_list:
  24. urls = [url for url in region_url.values() if url]
  25. else:
  26. for region in region_list:
  27. if region in region_url:
  28. urls.append(region_url[region])
  29. return urls
  30. async def get_channels_by_fofa(callback):
  31. """
  32. Get the channel by FOFA
  33. """
  34. fofa_urls = get_fofa_urls_from_region_list()
  35. fofa_urls_len = len(fofa_urls)
  36. pbar = tqdm_asyncio(total=fofa_urls_len, desc="Processing multicast")
  37. start_time = time()
  38. fofa_results = {}
  39. callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
  40. proxy = None
  41. if config.open_proxy:
  42. proxy = await get_proxy(fofa_urls[0], best=True, with_test=True)
  43. def process_fofa_channels(fofa_url):
  44. nonlocal proxy, fofa_urls_len
  45. results = {}
  46. try:
  47. if config.open_driver:
  48. driver = setup_driver(proxy)
  49. try:
  50. retry_func(lambda: driver.get(fofa_url), name=fofa_url)
  51. except Exception as e:
  52. if config.open_proxy:
  53. proxy = get_proxy_next()
  54. driver.close()
  55. driver.quit()
  56. driver = setup_driver(proxy)
  57. driver.get(fofa_url)
  58. page_source = driver.page_source
  59. else:
  60. page_source = retry_func(
  61. lambda: get_source_requests(fofa_url), name=fofa_url
  62. )
  63. fofa_source = re.sub(r"<!--.*?-->", "", page_source, flags=re.DOTALL)
  64. urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
  65. with ThreadPoolExecutor(max_workers=100) as executor:
  66. futures = [executor.submit(process_fofa_json_url, url) for url in urls]
  67. for future in futures:
  68. merge_objects(results, future.result())
  69. except Exception as e:
  70. print(e)
  71. finally:
  72. if config.open_driver:
  73. driver.close()
  74. driver.quit()
  75. pbar.update()
  76. remain = fofa_urls_len - pbar.n
  77. callback(
  78. f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
  79. int((pbar.n / fofa_urls_len) * 100),
  80. )
  81. if config.open_online_search and pbar.n / fofa_urls_len == 1:
  82. callback("正在获取在线搜索结果, 请耐心等待", 0)
  83. return results
  84. max_workers = 3 if config.open_driver else 10
  85. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  86. futures = [
  87. executor.submit(process_fofa_channels, fofa_url) for fofa_url in fofa_urls
  88. ]
  89. for future in futures:
  90. merge_objects(fofa_results, future.result())
  91. if not config.open_driver:
  92. close_session()
  93. pbar.close()
  94. return fofa_results
  95. def process_fofa_json_url(url):
  96. """
  97. Process the FOFA json url
  98. """
  99. channels = {}
  100. try:
  101. final_url = url + "/iptv/live/1000.json?key=txiptv"
  102. # response = retry_func(
  103. # lambda: get(final_url, timeout=timeout),
  104. # name=final_url,
  105. # )
  106. response = get(final_url, timeout=timeout)
  107. try:
  108. json_data = response.json()
  109. if json_data["code"] == 0:
  110. try:
  111. for item in json_data["data"]:
  112. if isinstance(item, dict):
  113. item_name = format_channel_name(item.get("name"))
  114. item_url = item.get("url").strip()
  115. if item_name and item_url:
  116. total_url = url + item_url
  117. if item_name not in channels:
  118. channels[item_name] = [total_url]
  119. else:
  120. channels[item_name].append(total_url)
  121. except Exception as e:
  122. # print(f"Error on fofa: {e}")
  123. pass
  124. except Exception as e:
  125. # print(f"{url}: {e}")
  126. pass
  127. except Exception as e:
  128. # print(f"{url}: {e}")
  129. pass
  130. finally:
  131. return channels