123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- from utils.config import get_config
- from tqdm.asyncio import tqdm_asyncio
- from time import time
- from requests import get
- from concurrent.futures import ThreadPoolExecutor
- import fofa_map
- from driver.setup import setup_driver
- import re
- from utils.retry import retry_func
- from utils.channel import format_channel_name
- from utils.tools import merge_objects, get_pbar_remaining
- from proxy import get_proxy, get_proxy_next
- from requests_custom.utils import get_source_requests, close_session
- config = get_config()
- timeout = 30
- def get_fofa_urls_from_region_list():
- """
- Get the FOFA url from region
- """
- region_list = getattr(config, "region_list", [])
- urls = []
- region_url = getattr(fofa_map, "region_url")
- if "all" in region_list:
- urls = [url for url in region_url.values() if url]
- else:
- for region in region_list:
- if region in region_url:
- urls.append(region_url[region])
- return urls
- async def get_channels_by_fofa(callback):
- """
- Get the channel by FOFA
- """
- fofa_urls = get_fofa_urls_from_region_list()
- fofa_urls_len = len(fofa_urls)
- pbar = tqdm_asyncio(total=fofa_urls_len, desc="Processing multicast")
- start_time = time()
- fofa_results = {}
- callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
- proxy = None
- if config.open_proxy:
- proxy = await get_proxy(fofa_urls[0], best=True, with_test=True)
- def process_fofa_channels(fofa_url):
- nonlocal proxy, fofa_urls_len
- results = {}
- try:
- if config.open_driver:
- driver = setup_driver(proxy)
- try:
- retry_func(lambda: driver.get(fofa_url), name=fofa_url)
- except Exception as e:
- if config.open_proxy:
- proxy = get_proxy_next()
- driver.close()
- driver.quit()
- driver = setup_driver(proxy)
- driver.get(fofa_url)
- page_source = driver.page_source
- else:
- page_source = retry_func(
- lambda: get_source_requests(fofa_url), name=fofa_url
- )
- fofa_source = re.sub(r"<!--.*?-->", "", page_source, flags=re.DOTALL)
- urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
- with ThreadPoolExecutor(max_workers=100) as executor:
- futures = [executor.submit(process_fofa_json_url, url) for url in urls]
- for future in futures:
- merge_objects(results, future.result())
- except Exception as e:
- print(e)
- finally:
- if config.open_driver:
- driver.close()
- driver.quit()
- pbar.update()
- remain = fofa_urls_len - pbar.n
- callback(
- f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
- int((pbar.n / fofa_urls_len) * 100),
- )
- if config.open_online_search and pbar.n / fofa_urls_len == 1:
- callback("正在获取在线搜索结果, 请耐心等待", 0)
- return results
- max_workers = 3 if config.open_driver else 10
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = [
- executor.submit(process_fofa_channels, fofa_url) for fofa_url in fofa_urls
- ]
- for future in futures:
- merge_objects(fofa_results, future.result())
- if not config.open_driver:
- close_session()
- pbar.close()
- return fofa_results
- def process_fofa_json_url(url):
- """
- Process the FOFA json url
- """
- channels = {}
- try:
- final_url = url + "/iptv/live/1000.json?key=txiptv"
- # response = retry_func(
- # lambda: get(final_url, timeout=timeout),
- # name=final_url,
- # )
- response = get(final_url, timeout=timeout)
- try:
- json_data = response.json()
- if json_data["code"] == 0:
- try:
- for item in json_data["data"]:
- if isinstance(item, dict):
- item_name = format_channel_name(item.get("name"))
- item_url = item.get("url").strip()
- if item_name and item_url:
- total_url = url + item_url
- if item_name not in channels:
- channels[item_name] = [total_url]
- else:
- channels[item_name].append(total_url)
- except Exception as e:
- # print(f"Error on fofa: {e}")
- pass
- except Exception as e:
- # print(f"{url}: {e}")
- pass
- except Exception as e:
- # print(f"{url}: {e}")
- pass
- finally:
- return channels
|