采集分类生成器.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # File : 采集分类生成器.py
  4. # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
  5. # Date : 2024/6/21
  6. import os
  7. import json
  8. import gzip
  9. import base64
  10. from urllib.parse import urljoin
  11. from concurrent.futures import ThreadPoolExecutor
  12. from pprint import pprint
  13. import time
  14. import requests
  15. import warnings
  16. # 关闭警告
  17. warnings.filterwarnings("ignore")
  18. requests.packages.urllib3.disable_warnings()
  19. pool = ThreadPoolExecutor(max_workers=20) # 初始化线程池内线程数量为20
  20. headers = {
  21. 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
  22. 'Connection': 'close' # 设置为关闭长连接
  23. }
  24. timeout = 5 # 5秒
  25. use_gzip = False
  26. def compress_and_encode(data: str):
  27. # 压缩数据
  28. compressed_data = gzip.compress(data.encode('utf-8'))
  29. # 对压缩数据进行Base64编码
  30. encoded_data = base64.b64encode(compressed_data).decode('utf-8')
  31. return encoded_data
  32. def decode_and_decompress(encoded_data: str):
  33. # 解码Base64数据
  34. decoded_data = base64.b64decode(encoded_data.encode('utf-8'))
  35. # 解压缩数据
  36. decompressed_data = gzip.decompress(decoded_data).decode('utf-8')
  37. return decompressed_data
  38. def get_classes(rec):
  39. classes = None
  40. if rec.get('url') and str(rec['url']).startswith('http'):
  41. _class_api = rec.get('api') or '/api.php/provide/vod/'
  42. _api = urljoin(str(rec['url']).rstrip('/'), _class_api)
  43. # _api = urljoin(rec['url'], '/api.php/provide/vod/at/json')
  44. print(_api)
  45. try:
  46. r = requests.get(_api, headers=headers, timeout=timeout, verify=False)
  47. ret = r.json()
  48. if rec.get('name') == '乐视资源':
  49. print('=======乐视=========')
  50. print(ret)
  51. # print(ret)
  52. classes = ret.get('class')
  53. except Exception as e:
  54. print(f'获取资源【{rec["name"]}】({_api})分类发生错误:{e}')
  55. return classes
  56. def convert_class(classes, name=None):
  57. """
  58. 获取的分类转静态分类格式
  59. @param classes:
  60. @return:
  61. """
  62. if name is None:
  63. name = ''
  64. if not classes:
  65. return {
  66. "name": "",
  67. "class_name": "",
  68. "class_url": "",
  69. }
  70. class_names = []
  71. class_urls = []
  72. for cls in classes:
  73. if cls.get('type_name') and cls.get('type_id'):
  74. class_urls.append(str(cls['type_id']))
  75. class_names.append(str(cls['type_name']))
  76. global use_gzip
  77. return {
  78. "name": name,
  79. "class_name": compress_and_encode('&'.join(class_names)) if use_gzip else '&'.join(class_names),
  80. "class_url": '&'.join(class_urls),
  81. }
  82. def get_convert_classes(rec):
  83. classes = get_classes(rec)
  84. classes = convert_class(classes, rec.get('name'))
  85. return classes
  86. def check_class(api, type_name, type_id, limit_count=6):
  87. _url = f'{api}?ac=detail&pg=1&t={type_id}'
  88. try:
  89. r = requests.get(_url, headers=headers, timeout=timeout, verify=False)
  90. ret = r.json()
  91. if not ret.get("list") or len(ret["list"]) < limit_count:
  92. print(f'获取资源 {api} 分类【{type_name}】数量为:{len(ret["list"])} 小于{limit_count}视为排除')
  93. return False
  94. except Exception as e:
  95. print(f'获取资源 {_url} 分类【{type_name}】发生错误:{e}')
  96. return True
  97. def check_active(api):
  98. try:
  99. r = requests.get(api, headers=headers, timeout=timeout, verify=False)
  100. ret = r.json()
  101. if not ret.get("class"):
  102. return False
  103. except Exception as e:
  104. print(f'检查api: {api} 存活发生错误:{e}')
  105. return False
  106. return True
  107. def main(fname='采集'):
  108. file_path = f'./{fname}.json'
  109. out_file_path = file_path.replace('.json', '静态.json')
  110. if not os.path.exists(file_path):
  111. exit(f'不存在采集文件路径:{file_path}')
  112. with open(file_path, encoding='utf-8') as f:
  113. data = f.read()
  114. records = json.loads(data)
  115. print(records)
  116. # for rec in records:
  117. # ret = get_convert_classes(rec)
  118. # pprint(ret)
  119. tasks = [pool.submit(get_convert_classes, rec) for rec in records] # 构造一个列表,循环向线程池内submit提交执行的方法
  120. pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
  121. results = [task.result() for task in tasks]
  122. print(results)
  123. new_records = []
  124. for record in records:
  125. rec_name = record["name"]
  126. if rec_name:
  127. has_name = [ret for ret in results if ret.get("name") == rec_name]
  128. if has_name:
  129. record.update(has_name[-1])
  130. new_records.append(record)
  131. pprint(new_records)
  132. print(f'转换静态数据成功记录数:{len(new_records)}')
  133. with open(out_file_path, mode='w+', encoding='utf-8') as f:
  134. f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
  135. def main_exclude(fname='采集静态', max_workers=0):
  136. file_path = f'./{fname}.json'
  137. if not os.path.exists(file_path):
  138. exit(f'不存在采集文件路径:{file_path}')
  139. with open(file_path, encoding='utf-8') as f:
  140. data = f.read()
  141. records = json.loads(data)
  142. if len(records) < 1 or not records[0].get('class_name'):
  143. exit('输入数据有误,疑似不是静态数据')
  144. print(records)
  145. new_records = []
  146. for rec in records:
  147. new_rec = rec.copy()
  148. if rec.get('api'):
  149. api_url = urljoin(rec['url'], rec['api'])
  150. else:
  151. api_url = urljoin(rec['url'], '/api.php/provide/vod/')
  152. print(api_url)
  153. cate_excludes = []
  154. if not check_active(api_url):
  155. print(f'{rec["name"]} ({rec["url"]})视为不存活,跳过分类检测')
  156. else:
  157. class_names = decode_and_decompress(rec['class_name']).split('&')
  158. class_urls = rec['class_url'].split('&')
  159. rec_pool = ThreadPoolExecutor(max_workers=max_workers or len(class_names)) # 初始化线程池内线程数量为分类数量
  160. tasks = []
  161. for i in range(len(class_names)):
  162. type_name = class_names[i]
  163. type_id = class_urls[i]
  164. tasks.append(rec_pool.submit(check_class, api_url, type_name, type_id))
  165. rec_pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
  166. results = [task.result() for task in tasks]
  167. print(results)
  168. for i in range(len(class_names)):
  169. type_name = class_names[i]
  170. # type_id = class_urls[i]
  171. if not results[i]:
  172. cate_excludes.append(type_name)
  173. if len(cate_excludes) > 0:
  174. new_rec['cate_excludes'] = cate_excludes
  175. new_records.append(new_rec)
  176. with open(file_path, mode='w+', encoding='utf-8') as f:
  177. f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
  178. if __name__ == '__main__':
  179. use_gzip = True
  180. fmode = str(input('请输入处理文件方式(0:生成分类 1:添加分类过滤),留空默认为生成静态分类:\n'))
  181. ftips = '采集静态' if fmode == '1' else '采集'
  182. fname = str(input(f'请输入文件名(q结束程序),留空默认为{ftips}:\n'))
  183. t1 = time.time()
  184. if fname == 'q':
  185. exit('已主动结束脚本')
  186. if not fmode or fmode == '0':
  187. fname = fname or '采集'
  188. main(fname)
  189. elif fmode == '1':
  190. fname = fname or '采集静态'
  191. main_exclude(fname, 10)
  192. else:
  193. exit(f'未知的处理类型:{fmode}')
  194. t2 = time.time()
  195. print(f'本次程序运行耗时:{round(t2 - t1, 2)}秒')