tools.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from time import time
  2. import datetime
  3. import os
  4. import urllib.parse
  5. import ipaddress
  6. from urllib.parse import urlparse
  7. import socket
  8. from utils.config import get_config, resource_path
  9. import re
  10. from bs4 import BeautifulSoup
  11. config = get_config()
  12. def get_pbar_remaining(pbar, start_time):
  13. """
  14. Get the remaining time of the progress bar
  15. """
  16. try:
  17. elapsed = time() - start_time
  18. completed_tasks = pbar.n
  19. if completed_tasks > 0:
  20. avg_time_per_task = elapsed / completed_tasks
  21. remaining_tasks = pbar.total - completed_tasks
  22. remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
  23. else:
  24. remaining_time = "未知"
  25. return remaining_time
  26. except Exception as e:
  27. print(f"Error: {e}")
  28. def update_file(final_file, old_file):
  29. """
  30. Update the file
  31. """
  32. old_file_path = resource_path(old_file, persistent=True)
  33. final_file_path = resource_path(final_file, persistent=True)
  34. if os.path.exists(old_file_path):
  35. os.replace(old_file_path, final_file_path)
  36. def filter_by_date(data):
  37. """
  38. Filter by date and limit
  39. """
  40. default_recent_days = 30
  41. use_recent_days = getattr(config, "recent_days", 30)
  42. if not isinstance(use_recent_days, int) or use_recent_days <= 0:
  43. use_recent_days = default_recent_days
  44. start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
  45. recent_data = []
  46. unrecent_data = []
  47. for (url, date, resolution), response_time in data:
  48. item = ((url, date, resolution), response_time)
  49. if date:
  50. date = datetime.datetime.strptime(date, "%m-%d-%Y")
  51. if date >= start_date:
  52. recent_data.append(item)
  53. else:
  54. unrecent_data.append(item)
  55. else:
  56. unrecent_data.append(item)
  57. recent_data_len = len(recent_data)
  58. if recent_data_len == 0:
  59. recent_data = unrecent_data
  60. elif recent_data_len < config.urls_limit:
  61. recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
  62. return recent_data
  63. def get_soup(source):
  64. """
  65. Get soup from source
  66. """
  67. source = re.sub(
  68. r"<!--.*?-->",
  69. "",
  70. source,
  71. flags=re.DOTALL,
  72. )
  73. soup = BeautifulSoup(source, "html.parser")
  74. return soup
  75. def get_total_urls_from_info_list(infoList):
  76. """
  77. Get the total urls from info list
  78. """
  79. total_urls = [url for url, _, _ in infoList]
  80. return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
  81. def get_total_urls_from_sorted_data(data):
  82. """
  83. Get the total urls with filter by date and depulicate from sorted data
  84. """
  85. total_urls = []
  86. if len(data) > config.urls_limit:
  87. total_urls = [url for (url, _, _), _ in filter_by_date(data)]
  88. else:
  89. total_urls = [url for (url, _, _), _ in data]
  90. return list(dict.fromkeys(total_urls))[: config.urls_limit]
  91. def is_ipv6(url):
  92. """
  93. Check if the url is ipv6
  94. """
  95. try:
  96. host = urllib.parse.urlparse(url).hostname
  97. ipaddress.IPv6Address(host)
  98. return True
  99. except ValueError:
  100. return False
  101. def check_url_ipv_type(url):
  102. """
  103. Check if the url is compatible with the ipv type in the config
  104. """
  105. ipv_type = getattr(config, "ipv_type", "ipv4")
  106. if ipv_type == "ipv4":
  107. return not is_ipv6(url)
  108. elif ipv_type == "ipv6":
  109. return is_ipv6(url)
  110. else:
  111. return True
  112. def check_by_domain_blacklist(url):
  113. """
  114. Check by domain blacklist
  115. """
  116. domain_blacklist = [
  117. urlparse(domain).netloc if urlparse(domain).scheme else domain
  118. for domain in getattr(config, "domain_blacklist", [])
  119. ]
  120. return urlparse(url).netloc not in domain_blacklist
  121. def check_by_url_keywords_blacklist(url):
  122. """
  123. Check by URL blacklist keywords
  124. """
  125. url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
  126. return not any(keyword in url for keyword in url_keywords_blacklist)
  127. def check_url_by_patterns(url):
  128. """
  129. Check the url by patterns
  130. """
  131. return (
  132. check_url_ipv_type(url)
  133. and check_by_domain_blacklist(url)
  134. and check_by_url_keywords_blacklist(url)
  135. )
  136. def filter_urls_by_patterns(urls):
  137. """
  138. Filter urls by patterns
  139. """
  140. urls = [url for url in urls if check_url_ipv_type(url)]
  141. urls = [url for url in urls if check_by_domain_blacklist(url)]
  142. urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
  143. return urls
  144. def merge_objects(*objects):
  145. """
  146. Merge objects
  147. """
  148. merged_dict = {}
  149. for obj in objects:
  150. if not isinstance(obj, dict):
  151. raise TypeError("All input objects must be dictionaries")
  152. for key, value in obj.items():
  153. if key not in merged_dict:
  154. merged_dict[key] = set()
  155. if isinstance(value, set):
  156. merged_dict[key].update(value)
  157. elif isinstance(value, list):
  158. for item in value:
  159. merged_dict[key].add(item)
  160. else:
  161. merged_dict[key].add(value)
  162. for key, value in merged_dict.items():
  163. merged_dict[key] = list(value)
  164. return merged_dict
  165. def get_ip_address():
  166. """
  167. Get the IP address
  168. """
  169. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  170. try:
  171. s.connect(("10.255.255.255", 1))
  172. IP = s.getsockname()[0]
  173. except Exception:
  174. IP = "127.0.0.1"
  175. finally:
  176. s.close()
  177. return f"http://{IP}:8000"