123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # File : 采集分类生成器.py
- # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
- # Date : 2024/6/21
- import os
- import json
- import gzip
- import base64
- from urllib.parse import urljoin
- from concurrent.futures import ThreadPoolExecutor
- from pprint import pprint
- import time
- import requests
- import warnings
- # 关闭警告
- warnings.filterwarnings("ignore")
- requests.packages.urllib3.disable_warnings()
- pool = ThreadPoolExecutor(max_workers=20) # 初始化线程池内线程数量为20
- headers = {
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
- 'Connection': 'close' # 设置为关闭长连接
- }
- timeout = 5 # 5秒
- use_gzip = False
- def compress_and_encode(data: str):
- # 压缩数据
- compressed_data = gzip.compress(data.encode('utf-8'))
- # 对压缩数据进行Base64编码
- encoded_data = base64.b64encode(compressed_data).decode('utf-8')
- return encoded_data
- def decode_and_decompress(encoded_data: str):
- # 解码Base64数据
- decoded_data = base64.b64decode(encoded_data.encode('utf-8'))
- # 解压缩数据
- decompressed_data = gzip.decompress(decoded_data).decode('utf-8')
- return decompressed_data
- def get_classes(rec):
- classes = None
- if rec.get('url') and str(rec['url']).startswith('http'):
- _class_api = rec.get('api') or '/api.php/provide/vod/'
- _api = urljoin(str(rec['url']).rstrip('/'), _class_api)
- # _api = urljoin(rec['url'], '/api.php/provide/vod/at/json')
- print(_api)
- try:
- r = requests.get(_api, headers=headers, timeout=timeout, verify=False)
- ret = r.json()
- if rec.get('name') == '乐视资源':
- print('=======乐视=========')
- print(ret)
- # print(ret)
- classes = ret.get('class')
- except Exception as e:
- print(f'获取资源【{rec["name"]}】({_api})分类发生错误:{e}')
- return classes
- def convert_class(classes, name=None):
- """
- 获取的分类转静态分类格式
- @param classes:
- @return:
- """
- if name is None:
- name = ''
- if not classes:
- return {
- "name": "",
- "class_name": "",
- "class_url": "",
- }
- class_names = []
- class_urls = []
- for cls in classes:
- if cls.get('type_name') and cls.get('type_id'):
- class_urls.append(str(cls['type_id']))
- class_names.append(str(cls['type_name']))
- global use_gzip
- return {
- "name": name,
- "class_name": compress_and_encode('&'.join(class_names)) if use_gzip else '&'.join(class_names),
- "class_url": '&'.join(class_urls),
- }
- def get_convert_classes(rec):
- classes = get_classes(rec)
- classes = convert_class(classes, rec.get('name'))
- return classes
- def check_class(api, type_name, type_id, limit_count=6):
- _url = f'{api}?ac=detail&pg=1&t={type_id}'
- try:
- r = requests.get(_url, headers=headers, timeout=timeout, verify=False)
- ret = r.json()
- if not ret.get("list") or len(ret["list"]) < limit_count:
- print(f'获取资源 {api} 分类【{type_name}】数量为:{len(ret["list"])} 小于{limit_count}视为排除')
- return False
- except Exception as e:
- print(f'获取资源 {_url} 分类【{type_name}】发生错误:{e}')
- return True
- def check_active(api):
- try:
- r = requests.get(api, headers=headers, timeout=timeout, verify=False)
- ret = r.json()
- if not ret.get("class"):
- return False
- except Exception as e:
- print(f'检查api: {api} 存活发生错误:{e}')
- return False
- return True
- def main(fname='采集'):
- file_path = f'./{fname}.json'
- out_file_path = file_path.replace('.json', '静态.json')
- if not os.path.exists(file_path):
- exit(f'不存在采集文件路径:{file_path}')
- with open(file_path, encoding='utf-8') as f:
- data = f.read()
- records = json.loads(data)
- print(records)
- # for rec in records:
- # ret = get_convert_classes(rec)
- # pprint(ret)
- tasks = [pool.submit(get_convert_classes, rec) for rec in records] # 构造一个列表,循环向线程池内submit提交执行的方法
- pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
- results = [task.result() for task in tasks]
- print(results)
- new_records = []
- for record in records:
- rec_name = record["name"]
- if rec_name:
- has_name = [ret for ret in results if ret.get("name") == rec_name]
- if has_name:
- record.update(has_name[-1])
- new_records.append(record)
- pprint(new_records)
- print(f'转换静态数据成功记录数:{len(new_records)}')
- with open(out_file_path, mode='w+', encoding='utf-8') as f:
- f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
- def main_exclude(fname='采集静态', max_workers=0):
- file_path = f'./{fname}.json'
- if not os.path.exists(file_path):
- exit(f'不存在采集文件路径:{file_path}')
- with open(file_path, encoding='utf-8') as f:
- data = f.read()
- records = json.loads(data)
- if len(records) < 1 or not records[0].get('class_name'):
- exit('输入数据有误,疑似不是静态数据')
- print(records)
- new_records = []
- for rec in records:
- new_rec = rec.copy()
- if rec.get('api'):
- api_url = urljoin(rec['url'], rec['api'])
- else:
- api_url = urljoin(rec['url'], '/api.php/provide/vod/')
- print(api_url)
- cate_excludes = []
- if not check_active(api_url):
- print(f'{rec["name"]} ({rec["url"]})视为不存活,跳过分类检测')
- else:
- class_names = decode_and_decompress(rec['class_name']).split('&')
- class_urls = rec['class_url'].split('&')
- rec_pool = ThreadPoolExecutor(max_workers=max_workers or len(class_names)) # 初始化线程池内线程数量为分类数量
- tasks = []
- for i in range(len(class_names)):
- type_name = class_names[i]
- type_id = class_urls[i]
- tasks.append(rec_pool.submit(check_class, api_url, type_name, type_id))
- rec_pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
- results = [task.result() for task in tasks]
- print(results)
- for i in range(len(class_names)):
- type_name = class_names[i]
- # type_id = class_urls[i]
- if not results[i]:
- cate_excludes.append(type_name)
- if len(cate_excludes) > 0:
- new_rec['cate_excludes'] = cate_excludes
- new_records.append(new_rec)
- with open(file_path, mode='w+', encoding='utf-8') as f:
- f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
- if __name__ == '__main__':
- use_gzip = True
- fmode = str(input('请输入处理文件方式(0:生成分类 1:添加分类过滤),留空默认为生成静态分类:\n'))
- ftips = '采集静态' if fmode == '1' else '采集'
- fname = str(input(f'请输入文件名(q结束程序),留空默认为{ftips}:\n'))
- t1 = time.time()
- if fname == 'q':
- exit('已主动结束脚本')
- if not fmode or fmode == '0':
- fname = fname or '采集'
- main(fname)
- elif fmode == '1':
- fname = fname or '采集静态'
- main_exclude(fname, 10)
- else:
- exit(f'未知的处理类型:{fmode}')
- t2 = time.time()
- print(f'本次程序运行耗时:{round(t2 - t1, 2)}秒')
|