123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from time import time
- import datetime
- import os
- import urllib.parse
- import ipaddress
- from urllib.parse import urlparse
- import socket
- from utils.config import get_config, resource_path
- import re
- from bs4 import BeautifulSoup
- config = get_config()
- def get_pbar_remaining(pbar, start_time):
- """
- Get the remaining time of the progress bar
- """
- try:
- elapsed = time() - start_time
- completed_tasks = pbar.n
- if completed_tasks > 0:
- avg_time_per_task = elapsed / completed_tasks
- remaining_tasks = pbar.total - completed_tasks
- remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
- else:
- remaining_time = "未知"
- return remaining_time
- except Exception as e:
- print(f"Error: {e}")
- def update_file(final_file, old_file):
- """
- Update the file
- """
- old_file_path = resource_path(old_file, persistent=True)
- final_file_path = resource_path(final_file, persistent=True)
- if os.path.exists(old_file_path):
- os.replace(old_file_path, final_file_path)
- def filter_by_date(data):
- """
- Filter by date and limit
- """
- default_recent_days = 30
- use_recent_days = getattr(config, "recent_days", 30)
- if not isinstance(use_recent_days, int) or use_recent_days <= 0:
- use_recent_days = default_recent_days
- start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
- recent_data = []
- unrecent_data = []
- for (url, date, resolution), response_time in data:
- item = ((url, date, resolution), response_time)
- if date:
- date = datetime.datetime.strptime(date, "%m-%d-%Y")
- if date >= start_date:
- recent_data.append(item)
- else:
- unrecent_data.append(item)
- else:
- unrecent_data.append(item)
- recent_data_len = len(recent_data)
- if recent_data_len == 0:
- recent_data = unrecent_data
- elif recent_data_len < config.urls_limit:
- recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
- return recent_data
- def get_soup(source):
- """
- Get soup from source
- """
- source = re.sub(
- r"<!--.*?-->",
- "",
- source,
- flags=re.DOTALL,
- )
- soup = BeautifulSoup(source, "html.parser")
- return soup
- def get_total_urls_from_info_list(infoList):
- """
- Get the total urls from info list
- """
- total_urls = [url for url, _, _ in infoList]
- return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
- def get_total_urls_from_sorted_data(data):
- """
- Get the total urls with filter by date and depulicate from sorted data
- """
- total_urls = []
- if len(data) > config.urls_limit:
- total_urls = [url for (url, _, _), _ in filter_by_date(data)]
- else:
- total_urls = [url for (url, _, _), _ in data]
- return list(dict.fromkeys(total_urls))[: config.urls_limit]
- def is_ipv6(url):
- """
- Check if the url is ipv6
- """
- try:
- host = urllib.parse.urlparse(url).hostname
- ipaddress.IPv6Address(host)
- return True
- except ValueError:
- return False
- def check_url_ipv_type(url):
- """
- Check if the url is compatible with the ipv type in the config
- """
- ipv_type = getattr(config, "ipv_type", "ipv4")
- if ipv_type == "ipv4":
- return not is_ipv6(url)
- elif ipv_type == "ipv6":
- return is_ipv6(url)
- else:
- return True
- def check_by_domain_blacklist(url):
- """
- Check by domain blacklist
- """
- domain_blacklist = [
- urlparse(domain).netloc if urlparse(domain).scheme else domain
- for domain in getattr(config, "domain_blacklist", [])
- ]
- return urlparse(url).netloc not in domain_blacklist
- def check_by_url_keywords_blacklist(url):
- """
- Check by URL blacklist keywords
- """
- url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
- return not any(keyword in url for keyword in url_keywords_blacklist)
- def check_url_by_patterns(url):
- """
- Check the url by patterns
- """
- return (
- check_url_ipv_type(url)
- and check_by_domain_blacklist(url)
- and check_by_url_keywords_blacklist(url)
- )
- def filter_urls_by_patterns(urls):
- """
- Filter urls by patterns
- """
- urls = [url for url in urls if check_url_ipv_type(url)]
- urls = [url for url in urls if check_by_domain_blacklist(url)]
- urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
- return urls
- def merge_objects(*objects):
- """
- Merge objects
- """
- merged_dict = {}
- for obj in objects:
- if not isinstance(obj, dict):
- raise TypeError("All input objects must be dictionaries")
- for key, value in obj.items():
- if key not in merged_dict:
- merged_dict[key] = set()
- if isinstance(value, set):
- merged_dict[key].update(value)
- elif isinstance(value, list):
- for item in value:
- merged_dict[key].add(item)
- else:
- merged_dict[key].add(value)
- for key, value in merged_dict.items():
- merged_dict[key] = list(value)
- return merged_dict
- def get_ip_address():
- """
- Get the IP address
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- s.connect(("10.255.255.255", 1))
- IP = s.getsockname()[0]
- except Exception:
- IP = "127.0.0.1"
- finally:
- s.close()
- return f"http://{IP}:8000"
|