main.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import re
  2. import requests
  3. import logging
  4. from collections import OrderedDict
  5. from datetime import datetime
  6. import config
  7. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler("function.log", "w", encoding="utf-8"), logging.StreamHandler()])
  8. def parse_template(template_file):
  9. template_channels = OrderedDict()
  10. current_category = None
  11. with open(template_file, "r", encoding="utf-8") as f:
  12. for line in f:
  13. line = line.strip()
  14. if line and not line.startswith("#"):
  15. if "#genre#" in line:
  16. current_category = line.split(",")[0].strip()
  17. template_channels[current_category] = []
  18. elif current_category:
  19. channel_name = line.split(",")[0].strip()
  20. template_channels[current_category].append(channel_name)
  21. return template_channels
  22. def fetch_channels(url):
  23. channels = OrderedDict()
  24. try:
  25. response = requests.get(url)
  26. response.raise_for_status()
  27. response.encoding = 'utf-8'
  28. lines = response.text.split("\n")
  29. current_category = None
  30. is_m3u = any("#EXTINF" in line for line in lines[:15])
  31. source_type = "m3u" if is_m3u else "txt"
  32. logging.info(f"url: {url} 获取成功,判断为{source_type}格式")
  33. if is_m3u:
  34. for line in lines:
  35. line = line.strip()
  36. if line.startswith("#EXTINF"):
  37. match = re.search(r'group-title="(.*?)",(.*)', line)
  38. if match:
  39. current_category = match.group(1).strip()
  40. channel_name = match.group(2).strip()
  41. if current_category not in channels:
  42. channels[current_category] = []
  43. elif line and not line.startswith("#"):
  44. channel_url = line.strip()
  45. if current_category and channel_name:
  46. channels[current_category].append((channel_name, channel_url))
  47. else:
  48. for line in lines:
  49. line = line.strip()
  50. if "#genre#" in line:
  51. current_category = line.split(",")[0].strip()
  52. channels[current_category] = []
  53. elif current_category:
  54. match = re.match(r"^(.*?),(.*?)$", line)
  55. if match:
  56. channel_name = match.group(1).strip()
  57. channel_url = match.group(2).strip()
  58. channels[current_category].append((channel_name, channel_url))
  59. elif line:
  60. channels[current_category].append((line, ''))
  61. if channels:
  62. categories = ", ".join(channels.keys())
  63. logging.info(f"url: {url} 爬取成功✅,包含频道分类: {categories}")
  64. except requests.RequestException as e:
  65. logging.error(f"url: {url} 爬取失败❌, Error: {e}")
  66. return channels
  67. def match_channels(template_channels, all_channels):
  68. matched_channels = OrderedDict()
  69. for category, channel_list in template_channels.items():
  70. matched_channels[category] = OrderedDict()
  71. for channel_name in channel_list:
  72. for online_category, online_channel_list in all_channels.items():
  73. for online_channel_name, online_channel_url in online_channel_list:
  74. if channel_name == online_channel_name:
  75. matched_channels[category].setdefault(channel_name, []).append(online_channel_url)
  76. return matched_channels
  77. def filter_source_urls(template_file):
  78. template_channels = parse_template(template_file)
  79. source_urls = config.source_urls
  80. all_channels = OrderedDict()
  81. for url in source_urls:
  82. fetched_channels = fetch_channels(url)
  83. for category, channel_list in fetched_channels.items():
  84. if category in all_channels:
  85. all_channels[category].extend(channel_list)
  86. else:
  87. all_channels[category] = channel_list
  88. matched_channels = match_channels(template_channels, all_channels)
  89. return matched_channels, template_channels
  90. def is_ipv6(url):
  91. return re.match(r'^http:\/\/\[[0-9a-fA-F:]+\]', url) is not None
  92. def updateChannelUrlsM3U(channels, template_channels):
  93. written_urls = set()
  94. current_date = datetime.now().strftime("%Y-%m-%d")
  95. for group in config.announcements:
  96. for announcement in group['entries']:
  97. if announcement['name'] is None:
  98. announcement['name'] = current_date
  99. with open("live.m3u", "w", encoding="utf-8") as f_m3u:
  100. f_m3u.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
  101. with open("live.txt", "w", encoding="utf-8") as f_txt:
  102. for group in config.announcements:
  103. f_txt.write(f"{group['channel']},#genre#\n")
  104. for announcement in group['entries']:
  105. f_m3u.write(f"""#EXTINF:-1 tvg-id="1" tvg-name="{announcement['name']}" tvg-logo="{announcement['logo']}" group-title="{group['channel']}",{announcement['name']}\n""")
  106. f_m3u.write(f"{announcement['url']}\n")
  107. f_txt.write(f"{announcement['name']},{announcement['url']}\n")
  108. for category, channel_list in template_channels.items():
  109. f_txt.write(f"{category},#genre#\n")
  110. if category in channels:
  111. for channel_name in channel_list:
  112. if channel_name in channels[category]:
  113. sorted_urls = sorted(channels[category][channel_name], key=lambda url: not is_ipv6(url) if config.ip_version_priority == "ipv6" else is_ipv6(url))
  114. filtered_urls = []
  115. for url in sorted_urls:
  116. if url and url not in written_urls and not any(blacklist in url for blacklist in config.url_blacklist):
  117. filtered_urls.append(url)
  118. written_urls.add(url)
  119. total_urls = len(filtered_urls)
  120. for index, url in enumerate(filtered_urls, start=1):
  121. if is_ipv6(url):
  122. url_suffix = f"$LR•IPV6" if total_urls == 1 else f"$LR•IPV6『线路{index}』"
  123. else:
  124. url_suffix = f"$LR•IPV4" if total_urls == 1 else f"$LR•IPV4『线路{index}』"
  125. if '$' in url:
  126. base_url = url.split('$', 1)[0]
  127. else:
  128. base_url = url
  129. new_url = f"{base_url}{url_suffix}"
  130. f_m3u.write(f"#EXTINF:-1 tvg-id=\"{index}\" tvg-name=\"{channel_name}\" tvg-logo=\"https://gcore.jsdelivr.net/gh/yuanzl77/TVlogo@master/png/{channel_name}.png\" group-title=\"{category}\",{channel_name}\n")
  131. f_m3u.write(new_url + "\n")
  132. f_txt.write(f"{channel_name},{new_url}\n")
  133. f_txt.write("\n")
  134. if __name__ == "__main__":
  135. template_file = "demo.txt"
  136. channels, template_channels = filter_source_urls(template_file)
  137. updateChannelUrlsM3U(channels, template_channels)