TGNetDiskLinkChecker.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. import json
  2. import os
  3. import logging
  4. import re
  5. import asyncio
  6. import httpx
  7. from telethon import TelegramClient
  8. from telethon.sessions import StringSession
  9. from telethon.errors import RPCError
  10. from bs4 import BeautifulSoup
  11. class TelegramLinkManager:
  12. def __init__(self, config):
  13. # 配置日志
  14. logging.basicConfig(
  15. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  16. level=logging.INFO
  17. )
  18. self.logger = logging.getLogger(__name__)
  19. # 屏蔽 httpx 的 INFO 日志
  20. logging.getLogger("httpx").setLevel(logging.WARNING)
  21. self.config = config
  22. # 初始化Telethon客户端
  23. self.client = TelegramClient(
  24. StringSession(config["STRING_SESSION"]),
  25. config["API_ID"],
  26. config["API_HASH"],
  27. proxy=config["PROXY"]
  28. )
  29. self.json_path_normal = config["JSON_PATH_NORMAL"]
  30. self.json_path_123 = config["JSON_PATH_123"]
  31. self.target_channel = config["TARGET_CHANNEL"]
  32. self.batch_size = config["BATCH_SIZE"]
  33. self.net_disk_domains = config["NET_DISK_DOMAINS"]
  34. # 提取消息中的网盘链接
  35. def extract_links(self, message_text: str):
  36. """从消息文本中提取网盘链接"""
  37. if not message_text:
  38. self.logger.debug("消息文本为空,跳过提取")
  39. return []
  40. url_pattern = r'https?://[^\s]+'
  41. urls = re.findall(url_pattern, message_text)
  42. net_disk_domains = self.net_disk_domains if self.net_disk_domains else [
  43. 'aliyundrive.com', 'alipan.com',
  44. 'pan.quark.cn',
  45. '115.com', '115cdn.com', 'anxia.com',
  46. 'pan.baidu.com', 'yun.baidu.com',
  47. 'mypikpak.com',
  48. '123684.com', '123685.com', '123912.com', '123pan.com', '123pan.cn', '123592.com',
  49. 'cloud.189.cn',
  50. 'drive.uc.cn'
  51. ]
  52. links = [url for url in urls if any(domain in url for domain in net_disk_domains)]
  53. return links
  54. # 异步读取JSON文件
  55. async def load_json_data(self, json_path: str):
  56. """读取JSON文件,若不存在则创建新文件"""
  57. try:
  58. with open(json_path, 'r', encoding='utf-8') as f:
  59. data = json.load(f)
  60. if "messages" not in data:
  61. data["messages"] = []
  62. if "last_processed_id" not in data:
  63. data["last_processed_id"] = 0
  64. self.logger.info(f"加载JSON数据: {json_path}, messages_count={len(data['messages'])}, last_processed_id={data['last_processed_id']}")
  65. return data
  66. except (FileNotFoundError, json.JSONDecodeError):
  67. self.logger.info(f"JSON文件未找到,创建新文件: {json_path}")
  68. data = {"messages": [], "last_processed_id": 0}
  69. await self.save_json_data(data, json_path)
  70. return data
  71. # 异步保存JSON文件
  72. async def save_json_data(self, data, json_path: str):
  73. """保存数据到JSON文件"""
  74. try:
  75. with open(json_path, 'w', encoding='utf-8') as f:
  76. json.dump(data, f, ensure_ascii=False, indent=2)
  77. self.logger.info(f"JSON文件保存成功: {json_path}, messages_count={len(data['messages'])}, last_processed_id={data['last_processed_id']}")
  78. except Exception as e:
  79. self.logger.error(f"保存JSON失败: {e}, 路径: {json_path}")
  80. # 获取并保存所有新消息(分批处理)
  81. async def fetch_and_save_all_messages(self, limit=None):
  82. """分批获取所有新消息并保存到JSON"""
  83. data_normal = await self.load_json_data(self.json_path_normal)
  84. data_123 = await self.load_json_data(self.json_path_123)
  85. last_processed_id = max(data_normal.get("last_processed_id", 0),
  86. data_123.get("last_processed_id", 0))
  87. offset_id = last_processed_id
  88. total_new_messages = 0
  89. while True:
  90. new_messages_normal = []
  91. new_messages_123 = []
  92. messages_fetched = 0
  93. try:
  94. async for message in self.client.iter_messages(
  95. self.target_channel,
  96. min_id=offset_id,
  97. reverse=True,
  98. limit=self.batch_size
  99. ):
  100. if message is None:
  101. break
  102. text = message.text or ""
  103. links = self.extract_links(text)
  104. if links:
  105. message_data = {
  106. "message_id": message.id,
  107. "urls": links,
  108. "invalid_urls": []
  109. }
  110. if any("123" in url for url in links):
  111. new_messages_123.append(message_data)
  112. else:
  113. new_messages_normal.append(message_data)
  114. offset_id = max(offset_id, message.id)
  115. messages_fetched += 1
  116. total_new_messages += 1
  117. if limit and total_new_messages >= limit:
  118. break
  119. if new_messages_normal or new_messages_123:
  120. data_normal["messages"].extend(new_messages_normal)
  121. data_123["messages"].extend(new_messages_123)
  122. data_normal["last_processed_id"] = offset_id
  123. data_123["last_processed_id"] = offset_id
  124. await self.save_json_data(data_normal, self.json_path_normal)
  125. await self.save_json_data(data_123, self.json_path_123)
  126. self.logger.info(f"本批次保存了 {len(new_messages_normal) + len(new_messages_123)} 条消息,总计 {total_new_messages} 条")
  127. if messages_fetched == 0 or (limit and total_new_messages >= limit):
  128. break
  129. except Exception as e:
  130. self.logger.error(f"获取消息失败: {e}")
  131. break
  132. self.logger.info(f"所有新消息保存完成,总计 {total_new_messages} 条")
  133. # 提取分享ID
  134. def extract_share_id(self, url: str):
  135. """从链接中提取分享ID,支持多域名网盘"""
  136. net_disk_patterns = {
  137. 'uc': {'domains': ['drive.uc.cn'], 'pattern': r"https?://drive\.uc\.cn/s/([a-zA-Z0-9]+)"},
  138. 'aliyun': {'domains': ['aliyundrive.com', 'alipan.com'], 'pattern': r"https?://(?:www\.)?(?:aliyundrive|alipan)\.com/s/([a-zA-Z0-9]+)"},
  139. 'quark': {'domains': ['pan.quark.cn'], 'pattern': r"https?://(?:www\.)?pan\.quark\.cn/s/([a-zA-Z0-9]+)"},
  140. '115': {'domains': ['115.com', '115cdn.com', 'anxia.com'], 'pattern': r"https?://(?:www\.)?(?:115|115cdn|anxia)\.com/s/([a-zA-Z0-9]+)"},
  141. 'baidu': {'domains': ['pan.baidu.com', 'yun.baidu.com'], 'pattern': r"https?://(?:[a-z]+\.)?(?:pan|yun)\.baidu\.com/(?:s/|share/init\?surl=)([a-zA-Z0-9_-]+)(?:\?|$)"},
  142. 'pikpak': {'domains': ['mypikpak.com'], 'pattern': r"https?://(?:www\.)?mypikpak\.com/s/([a-zA-Z0-9]+)"},
  143. '123': {'domains': ['123684.com', '123685.com', '123912.com', '123pan.com', '123pan.cn', '123592.com'], 'pattern': r"https?://(?:www\.)?(?:123684|123685|123912|123pan|123pan\.cn|123592)\.com/s/([a-zA-Z0-9-]+)"},
  144. 'tianyi': {'domains': ['cloud.189.cn'], 'pattern': r"https?://cloud\.189\.cn/(?:t/|web/share\?code=)([a-zA-Z0-9]+)"}
  145. }
  146. for net_disk, config in net_disk_patterns.items():
  147. if any(domain in url for domain in config['domains']):
  148. match = re.search(config['pattern'], url)
  149. if match:
  150. return match.group(1), net_disk
  151. return None, None
  152. # 检查网盘链接有效性
  153. async def check_uc(self, share_id: str):
  154. url = f"https://drive.uc.cn/s/{share_id}"
  155. headers = {"User-Agent": "Mozilla/5.0 (Linux; Android 10; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.101 Mobile Safari/537.36"}
  156. timeout = httpx.Timeout(10.0, connect=5.0, read=5.0)
  157. try:
  158. async with httpx.AsyncClient(timeout=timeout) as client:
  159. response = await client.get(url, headers=headers)
  160. response.raise_for_status()
  161. soup = BeautifulSoup(response.text, 'html.parser')
  162. page_text = soup.get_text(strip=True)
  163. error_keywords = ["失效", "不存在", "违规", "删除", "已过期", "被取消"]
  164. if any(keyword in page_text for keyword in error_keywords):
  165. return False
  166. if "文件" in page_text or "分享" in page_text:
  167. return True
  168. return False
  169. except httpx.TimeoutException as e:
  170. self.logger.error(f"UC网盘链接 {url} 检测超时: {str(e)}")
  171. return True
  172. except httpx.HTTPStatusError as e:
  173. self.logger.error(f"UC网盘链接 {url} HTTP错误: {e.response.status_code}")
  174. return False
  175. except Exception as e:
  176. if 'ConnectError' in str(e):
  177. return True
  178. self.logger.error(f"UC网盘链接 {url} 检测失败: {type(e).__name__}: {str(e)}")
  179. return False
  180. async def check_aliyun(self, share_id: str):
  181. api_url = "https://api.aliyundrive.com/adrive/v3/share_link/get_share_by_anonymous"
  182. headers = {"Content-Type": "application/json"}
  183. data = json.dumps({"share_id": share_id})
  184. try:
  185. async with httpx.AsyncClient() as client:
  186. response = await client.post(api_url, headers=headers, data=data)
  187. response_json = response.json()
  188. return bool(response_json.get('has_pwd') or response_json.get('file_infos'))
  189. except httpx.RequestError as e:
  190. self.logger.error(f"检测阿里云盘链接失败: {e}")
  191. return True
  192. async def check_115(self, share_id: str):
  193. api_url = "https://webapi.115.com/share/snap"
  194. params = {"share_code": share_id, "receive_code": ""}
  195. try:
  196. async with httpx.AsyncClient() as client:
  197. response = await client.get(api_url, params=params)
  198. response_json = response.json()
  199. return bool(response_json.get('state') or '请输入访问码' in response_json.get('error', ''))
  200. except httpx.RequestError as e:
  201. self.logger.error(f"检测115网盘链接失败: {e}")
  202. return True
  203. async def check_quark(self, share_id: str):
  204. api_url = "https://drive.quark.cn/1/clouddrive/share/sharepage/token"
  205. headers = {"Content-Type": "application/json"}
  206. data = json.dumps({"pwd_id": share_id, "passcode": ""})
  207. try:
  208. async with httpx.AsyncClient() as client:
  209. response = await client.post(api_url, headers=headers, data=data)
  210. response_json = response.json()
  211. return response_json.get('message') == "ok" or response_json.get('message') == "需要提取码"
  212. except httpx.RequestError as e:
  213. self.logger.error(f"检测夸克网盘链接失败: {e}")
  214. return True
  215. async def check_123(self, share_id: str):
  216. api_url = f"https://www.123pan.com/api/share/info?shareKey={share_id}"
  217. timeout = httpx.Timeout(10.0, connect=5.0, read=5.0)
  218. try:
  219. async with httpx.AsyncClient(timeout=timeout) as client:
  220. response = await client.get(api_url, headers={"User-Agent": "Mozilla/5.0"})
  221. if response.status_code == 403:
  222. return True
  223. response_json = response.json()
  224. return bool(response_json.get('data', {}).get('HasPwd', False) or response_json.get('code') == 0)
  225. except (httpx.RequestError, json.JSONDecodeError) as e:
  226. self.logger.error(f"检测123网盘链接失败: {e}")
  227. return True
  228. async def check_baidu(self, share_id: str):
  229. url = f"https://pan.baidu.com/s/{share_id}"
  230. headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
  231. try:
  232. async with httpx.AsyncClient() as client:
  233. response = await client.get(url, headers=headers, follow_redirects=True)
  234. text = response.text
  235. if "need verify" in text:
  236. return True
  237. if any(x in text for x in ["分享的文件已经被取消", "分享已过期", "你访问的页面不存在"]):
  238. return False
  239. return bool("请输入提取码" in text or "提取文件" in text or "过期时间" in text)
  240. except httpx.RequestError as e:
  241. self.logger.error(f"检测百度网盘链接失败: {e}")
  242. return False
  243. async def check_tianyi(self, share_id: str):
  244. api_url = "https://api.cloud.189.cn/open/share/getShareInfoByCodeV2.action"
  245. timeout = httpx.Timeout(10.0, connect=5.0, read=5.0)
  246. try:
  247. async with httpx.AsyncClient(timeout=timeout) as client:
  248. response = await client.post(api_url, data={"shareCode": share_id})
  249. response.raise_for_status()
  250. text = response.text
  251. if any(x in text for x in ["ShareInfoNotFound", "ShareNotFound", "FileNotFound", "ShareExpiredError", "ShareAuditNotPass"]):
  252. return False
  253. return True
  254. except httpx.TimeoutException as e:
  255. self.logger.error(f"天翼云盘链接 {share_id} 检测超时: {str(e)}")
  256. return True
  257. except httpx.HTTPStatusError as e:
  258. self.logger.error(f"天翼云盘链接 {share_id} HTTP错误: {e.response.status_code}")
  259. return False
  260. except Exception as e:
  261. if 'ConnectError' in str(e):
  262. return True
  263. self.logger.error(f"天翼云盘链接 {share_id} 检测失败: {type(e).__name__}: {str(e)}")
  264. return False
  265. # 检查单个链接有效性
  266. async def check_url(self, url: str, semaphore: asyncio.Semaphore):
  267. async with semaphore:
  268. # self.logger.info(f"开始检测链接: {url}")
  269. share_id, service = self.extract_share_id(url)
  270. if not share_id or not service:
  271. self.logger.warning(f"无法识别链接: {url}")
  272. return True
  273. check_functions = {
  274. "uc": self.check_uc, "aliyun": self.check_aliyun, "quark": self.check_quark,
  275. "115": self.check_115, "123": self.check_123, "baidu": self.check_baidu,
  276. "tianyi": self.check_tianyi
  277. }
  278. result = await check_functions.get(service, lambda x: True)(share_id)
  279. if not result:
  280. self.logger.info(f"链接 {url} 检测完成,结果: {result}")
  281. return result
  282. # 处理消息(批量检测)
  283. async def process_messages(self, delete, concurrency=500):
  284. data_normal = await self.load_json_data(self.json_path_normal)
  285. data_123 = await self.load_json_data(self.json_path_123)
  286. if delete == 1 or delete == 2:
  287. all_urls_123 = []
  288. all_urls_normal = []
  289. url_to_message = {}
  290. for message in data_normal["messages"] + data_123["messages"]:
  291. for url in message["urls"]:
  292. if "123" in url:
  293. all_urls_123.append(url)
  294. else:
  295. all_urls_normal.append(url)
  296. url_to_message[url] = message
  297. self.logger.info(f"总共有 {len(all_urls_123)} 条123网盘链接和 {len(all_urls_normal)} 条其他网盘链接需要检测")
  298. semaphore_123 = asyncio.Semaphore(min(10, concurrency))
  299. semaphore_normal = asyncio.Semaphore(concurrency)
  300. async def check_with_semaphore(url, semaphore):
  301. return await self.check_url(url, semaphore)
  302. # 检查123网盘链接
  303. if all_urls_123:
  304. tasks_123 = [check_with_semaphore(url, semaphore_123) for url in all_urls_123]
  305. try:
  306. results_123 = await asyncio.wait_for(
  307. asyncio.gather(*tasks_123, return_exceptions=True),
  308. timeout=max(120.0, len(all_urls_123) / 10 * 10)
  309. )
  310. for url, result in zip(all_urls_123, results_123):
  311. if not result or isinstance(result, Exception):
  312. if url not in url_to_message[url]["invalid_urls"]: # 避免重复添加
  313. url_to_message[url]["invalid_urls"].append(url)
  314. except asyncio.TimeoutError:
  315. self.logger.error(f"123网盘链接检测超时,总链接数: {len(all_urls_123)}")
  316. for url in all_urls_123:
  317. if url not in url_to_message[url]["invalid_urls"]:
  318. url_to_message[url]["invalid_urls"].append(url)
  319. # 检查其他网盘链接
  320. if all_urls_normal:
  321. tasks_normal = [check_with_semaphore(url, semaphore_normal) for url in all_urls_normal]
  322. try:
  323. results_normal = await asyncio.wait_for(
  324. asyncio.gather(*tasks_normal, return_exceptions=True),
  325. timeout=max(120.0, len(all_urls_normal) / concurrency * 10)
  326. )
  327. for url, result in zip(all_urls_normal, results_normal):
  328. if not result or isinstance(result, Exception):
  329. if url not in url_to_message[url]["invalid_urls"]: # 避免重复添加
  330. url_to_message[url]["invalid_urls"].append(url)
  331. except asyncio.TimeoutError:
  332. self.logger.error(f"其他网盘链接检测超时,总链接数: {len(all_urls_normal)}")
  333. for url in all_urls_normal:
  334. if url not in url_to_message[url]["invalid_urls"]:
  335. url_to_message[url]["invalid_urls"].append(url)
  336. if delete == 1:
  337. for data in [data_normal]: # 只处理普通网盘
  338. messages = data["messages"]
  339. for message in messages[:]:
  340. if message["invalid_urls"]:
  341. try:
  342. await self.client.delete_messages(self.target_channel, message["message_id"])
  343. self.logger.info(f"删除失效消息: {message['message_id']}")
  344. messages.remove(message)
  345. except RPCError as e:
  346. self.logger.error(f"删除消息失败: {e}")
  347. # 对于123网盘消息,只有在所有链接都失效时才删除
  348. self.logger.warning("注意:123网盘检测较为严格,可能会有误判,请谨慎删除")
  349. messages_123 = data_123["messages"]
  350. for message in messages_123[:]:
  351. if message.get("invalid_urls") and len(message.get("invalid_urls", [])) == len(message.get("urls", [])):
  352. try:
  353. await self.client.delete_messages(self.target_channel, message["message_id"])
  354. self.logger.info(f"删除123网盘失效消息: {message['message_id']}")
  355. messages_123.remove(message)
  356. except RPCError as e:
  357. self.logger.error(f"删除123网盘消息失败: {e}")
  358. elif delete == 3:
  359. # 处理普通网盘
  360. messages = data_normal["messages"]
  361. for message in messages[:]:
  362. if message.get("invalid_urls"):
  363. try:
  364. await self.client.delete_messages(self.target_channel, message["message_id"])
  365. self.logger.info(f"删除失效消息: {message['message_id']}")
  366. messages.remove(message)
  367. except RPCError as e:
  368. self.logger.error(f"删除消息失败: {e}")
  369. # 处理123网盘,只有所有链接都失效时才删除
  370. self.logger.warning("注意:123网盘检测较为严格,可能会有误判,请谨慎删除")
  371. messages_123 = data_123["messages"]
  372. for message in messages_123[:]:
  373. if message.get("invalid_urls") and len(message.get("invalid_urls", [])) == len(message.get("urls", [])):
  374. try:
  375. await self.client.delete_messages(self.target_channel, message["message_id"])
  376. self.logger.info(f"删除123网盘失效消息: {message['message_id']}")
  377. messages_123.remove(message)
  378. except RPCError as e:
  379. self.logger.error(f"删除123网盘消息失败: {e}")
  380. await self.save_json_data(data_normal, self.json_path_normal)
  381. await self.save_json_data(data_123, self.json_path_123)
  382. # 重新检测失效链接
  383. async def recheck_invalid_urls(self, concurrency=500):
  384. """重新检测所有标记为失效的链接,并更新JSON"""
  385. data_normal = await self.load_json_data(self.json_path_normal)
  386. data_123 = await self.load_json_data(self.json_path_123)
  387. invalid_urls_123 = []
  388. invalid_urls_normal = []
  389. url_to_message = {}
  390. # 收集所有失效链接
  391. for message in data_normal["messages"] + data_123["messages"]:
  392. for url in message.get("invalid_urls", []):
  393. if "123" in url:
  394. invalid_urls_123.append(url)
  395. else:
  396. invalid_urls_normal.append(url)
  397. url_to_message[url] = message
  398. self.logger.info(f"总共有 {len(invalid_urls_123)} 条123网盘失效链接和 {len(invalid_urls_normal)} 条其他网盘失效链接需要重新检测")
  399. semaphore_123 = asyncio.Semaphore(min(10, concurrency))
  400. semaphore_normal = asyncio.Semaphore(concurrency)
  401. async def check_with_semaphore(url, semaphore):
  402. return await self.check_url(url, semaphore)
  403. # 重新检测123网盘链接
  404. if invalid_urls_123:
  405. tasks_123 = [check_with_semaphore(url, semaphore_123) for url in invalid_urls_123]
  406. try:
  407. results_123 = await asyncio.wait_for(
  408. asyncio.gather(*tasks_123, return_exceptions=True),
  409. timeout=max(120.0, len(invalid_urls_123) / 10 * 10)
  410. )
  411. for url, result in zip(invalid_urls_123, results_123):
  412. if result and not isinstance(result, Exception):
  413. # 如果重新检测有效,从invalid_urls中移除
  414. url_to_message[url]["invalid_urls"] = [u for u in url_to_message[url]["invalid_urls"] if u != url]
  415. self.logger.info(f"链接 {url} 重新检测有效,已从失效列表移除")
  416. except asyncio.TimeoutError:
  417. self.logger.error(f"123网盘失效链接重新检测超时,总链接数: {len(invalid_urls_123)}")
  418. # 重新检测其他网盘链接
  419. if invalid_urls_normal:
  420. tasks_normal = [check_with_semaphore(url, semaphore_normal) for url in invalid_urls_normal]
  421. try:
  422. results_normal = await asyncio.wait_for(
  423. asyncio.gather(*tasks_normal, return_exceptions=True),
  424. timeout=max(120.0, len(invalid_urls_normal) / concurrency * 10)
  425. )
  426. for url, result in zip(invalid_urls_normal, results_normal):
  427. if result and not isinstance(result, Exception):
  428. # 如果重新检测有效,从invalid_urls中移除
  429. url_to_message[url]["invalid_urls"] = [u for u in url_to_message[url]["invalid_urls"] if u != url]
  430. self.logger.info(f"链接 {url} 重新检测有效,已从失效列表移除")
  431. except asyncio.TimeoutError:
  432. self.logger.error(f"其他网盘失效链接重新检测超时,总链接数: {len(invalid_urls_normal)}")
  433. await self.save_json_data(data_normal, self.json_path_normal)
  434. await self.save_json_data(data_123, self.json_path_123)
  435. # 主运行逻辑
  436. async def run_async(self, delete, limit=None, concurrency=500, recheck=False):
  437. if delete in [1, 2]:
  438. await self.fetch_and_save_all_messages(limit)
  439. await self.process_messages(delete=2, concurrency=concurrency) # 始终先以检测模式运行
  440. if recheck: # 如果指定重新检测
  441. await self.recheck_invalid_urls(concurrency)
  442. # 在重新检测后,如果原始模式是删除模式,则执行删除操作
  443. if delete == 1:
  444. # 专门处理删除操作
  445. data_normal = await self.load_json_data(self.json_path_normal)
  446. data_123 = await self.load_json_data(self.json_path_123)
  447. # 先处理普通网盘消息
  448. messages = data_normal["messages"]
  449. for message in messages[:]:
  450. if message.get("invalid_urls"):
  451. try:
  452. await self.client.delete_messages(self.target_channel, message["message_id"])
  453. self.logger.info(f"删除失效消息: {message['message_id']}")
  454. messages.remove(message)
  455. except RPCError as e:
  456. self.logger.error(f"删除消息失败: {e}")
  457. # 对于123网盘消息,考虑到其特殊性,只有在确认消息中所有链接都失效时才删除
  458. self.logger.warning("注意:123网盘检测较为严格,可能会有误判,请谨慎删除")
  459. messages_123 = data_123["messages"]
  460. for message in messages_123[:]:
  461. if message.get("invalid_urls") and len(message.get("invalid_urls", [])) == len(message.get("urls", [])):
  462. try:
  463. await self.client.delete_messages(self.target_channel, message["message_id"])
  464. self.logger.info(f"删除123网盘失效消息: {message['message_id']}")
  465. messages_123.remove(message)
  466. except RPCError as e:
  467. self.logger.error(f"删除123网盘消息失败: {e}")
  468. await self.save_json_data(data_normal, self.json_path_normal)
  469. await self.save_json_data(data_123, self.json_path_123)
  470. else:
  471. await self.process_messages(delete, concurrency)
  472. def run(self, delete=None, limit=None, concurrency=None, recheck=None):
  473. # 如果没有指定参数,则使用配置中的默认值
  474. delete = delete if delete is not None else self.config["DELETE_MODE"]
  475. limit = limit if limit is not None else self.config["LIMIT"]
  476. concurrency = concurrency if concurrency is not None else self.config["CONCURRENCY"]
  477. recheck = recheck if recheck is not None else self.config["RECHECK"]
  478. with self.client.start():
  479. loop = asyncio.get_event_loop()
  480. loop.run_until_complete(self.run_async(delete, limit, concurrency, recheck))
  481. # 示例使用
  482. if __name__ == "__main__":
  483. logger = logging.getLogger(__name__)
  484. logger.info(f"当前工作目录: {os.getcwd()}")
  485. # 配置项
  486. CONFIG = {
  487. # Telethon客户端配置
  488. "API_ID": 6627460,
  489. "API_HASH": "27a53a0965e486a2bc1b1fcde473b1c4",
  490. "STRING_SESSION": "xxx",
  491. "JSON_PATH_NORMAL": os.path.join(os.getcwd(), "messages.json"),
  492. "JSON_PATH_123": os.path.join(os.getcwd(), "messages_123.json"),
  493. "TARGET_CHANNEL": "tgsearchers",
  494. "PROXY": None,
  495. "BATCH_SIZE": 500,
  496. # 运行配置
  497. "DELETE_MODE": 2, # 1: 检测并删除 (重新检测后再删除), 2: 仅检测, 3: 删除标记为失效的消息
  498. "LIMIT": 1000, # 每次检测的最大消息数量
  499. "CONCURRENCY": 20, # 并发数
  500. "RECHECK": True, # 是否重新检测标记为失效的链接
  501. "NET_DISK_DOMAINS":
  502. [
  503. 'pan.quark.cn',
  504. # 'aliyundrive.com', 'alipan.com',
  505. # '115.com', '115cdn.com', 'anxia.com',
  506. # 'pan.baidu.com', 'yun.baidu.com',
  507. # 'mypikpak.com',
  508. # '123684.com', '123685.com', '123912.com', '123pan.com', '123pan.cn', '123592.com',
  509. # 'cloud.189.cn',
  510. # 'drive.uc.cn'
  511. ]
  512. }
  513. # 创建管理器实例
  514. manager = TelegramLinkManager(CONFIG)
  515. # 运行主程序 - 使用配置中的默认设置
  516. manager.run()