123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # File : encode.py
- # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
- # Date : 2022/8/29
- import base64
- import math
- import re
- from urllib.parse import urljoin,quote,unquote
- from js2py.base import PyJsString
- import requests,warnings
- # 关闭警告
- warnings.filterwarnings("ignore")
- from requests.packages import urllib3
- urllib3.disable_warnings()
- import requests.utils
- import hashlib
- from time import sleep
- import os
- from utils.web import UC_UA,PC_UA
- from ast import literal_eval
- from utils.log import logger
- import quickjs
- def getPreJs():
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目
- lib_path = os.path.join(base_path, f'libs/pre.js')
- with open(lib_path,encoding='utf-8') as f:
- code = f.read()
- return code
- def getCryptoJS():
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目
- os.makedirs(os.path.join(base_path, f'libs'), exist_ok=True)
- lib_path = os.path.join(base_path, f'libs/crypto-hiker.js')
- # print('加密库地址:', lib_path)
- if not os.path.exists(lib_path):
- return 'undefiend'
- with open(lib_path,encoding='utf-8') as f:
- code = f.read()
- return code
- def md5(str):
- return hashlib.md5(str.encode(encoding='UTF-8')).hexdigest()
- def getLib(js):
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- lib_path = os.path.join(base_path, f'libs/{js}')
- if not os.path.exists(lib_path):
- return ''
- with open(lib_path,encoding='utf-8') as f:
- return f.read()
- # def atob(text):
- # if isinstance(text,PyJsString):
- # text = parseText(str(text))
- # qjs = quickjs.Context()
- # print(text)
- # js = getLib('atob.js')
- # print(js)
- # ret = qjs.eval(f'{js};atob("{text}")')
- # print(ret)
- def atob(text):
- """
- 解码
- :param text:
- :return:
- """
- if isinstance(text,PyJsString):
- text = parseText(str(text))
- return base64.b64decode(text.encode("utf8")).decode("latin1")
- def btoa(text):
- """
- 编码
- :param text:
- :return:
- """
- if isinstance(text,PyJsString):
- text = parseText(str(text))
- return base64.b64encode(text.encode("latin1")).decode("utf8")
- def requireCache(lib:str):
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目
- os.makedirs(os.path.join(base_path, f'libs'), exist_ok=True)
- logger.info(f'开始加载:{lib}')
- code = 'undefiend'
- if not lib.startswith('http'):
- lib_path = os.path.join(base_path, f'libs/{lib}')
- if not os.path.exists(lib_path):
- pass
- else:
- with open(lib_path, encoding='utf-8') as f:
- code = f.read()
- else:
- lib_path = os.path.join(base_path, f'libs/{md5(lib)}.js')
- if not os.path.exists(lib_path):
- try:
- r = requests.get(lib,headers={
- 'Referer': lib,
- 'User-Agent': UC_UA,
- },timeout=5,verify=False)
- with open(lib_path,mode='wb+') as f:
- f.write(r.content)
- code = r.text
- except Exception as e:
- print(f'获取远程依赖失败:{e}')
- else:
- with open(lib_path,encoding='utf-8') as f:
- code = f.read()
- # print(code)
- return code
- def getHome(url):
- # http://www.baidu.com:9000/323
- urls = url.split('//')
- homeUrl = urls[0] + '//' + urls[1].split('/')[0]
- return homeUrl
- class OcrApi:
- def __init__(self,api):
- self.api = api
- def classification(self,img):
- try:
- code = requests.post(self.api,data=img,headers={'user-agent':PC_UA},verify=False).text
- except Exception as e:
- print(f'ocr识别发生错误:{e}')
- code = ''
- return code
- def verifyCode(url,headers,timeout=5,total_cnt=3,api=None):
- if not api:
- # api = ''
- api = 'http://dm.mudery.com:10000'
- lower_keys = list(map(lambda x: x.lower(), headers.keys()))
- host = getHome(url)
- if not 'referer' in lower_keys:
- headers['Referer'] = host
- print(f'开始自动过验证,请求头:{headers}')
- cnt = 0
- ocr = OcrApi(api)
- while cnt < total_cnt:
- s = requests.session()
- try:
- img = s.get(url=f"{host}/index.php/verify/index.html", headers=headers,timeout=timeout,verify=False).content
- code = ocr.classification(img)
- print(f'第{cnt+1}次验证码识别结果:{code}')
- res = s.post(
- url=f"{host}/index.php/ajax/verify_check?type=search&verify={code}",
- headers=headers).json()
- if res["msg"] == "ok":
- cookies_dict = requests.utils.dict_from_cookiejar(s.cookies)
- cookie_str = ';'.join([f'{k}={cookies_dict[k]}' for k in cookies_dict])
- # return cookies_dict
- return cookie_str
- except:
- print(f'第{cnt+1}次验证码提交失败')
- pass
- cnt += 1
- sleep(1)
- return ''
- def base64Encode(text):
- if isinstance(text,PyJsString):
- text = str(text).replace("'","").replace('"','')
- return base64.b64encode(text.encode("utf8")).decode("utf-8") #base64编码
- def base64Decode(text):
- if isinstance(text,PyJsString):
- text = parseText(str(text))
- # print(text)
- return base64.b64decode(text).decode("utf-8") #base64解码
- def encodeStr(input, encoding='GBK'):
- """
- 指定字符串编码
- :param input:
- :param encoding:
- :return:
- """
- if isinstance(input,PyJsString):
- input = parseText(str(input))
- if isinstance(encoding,PyJsString):
- encoding = parseText(str(input))
- return quote(input.encode(encoding, 'ignore'))
- def decodeStr(input, encoding='GBK'):
- """
- 指定字符串解码
- :param input:
- :param encoding:
- :return:
- """
- if isinstance(input,PyJsString):
- input = parseText(str(input))
- if isinstance(encoding,PyJsString):
- encoding = parseText(str(input))
- return unquote(input,encoding)
- def parseText(text:str):
- text = text.replace('false','False').replace('true','True').replace('null','None')
- # print(text)
- return literal_eval(text)
- def setDetail(title:str,img:str,desc:str,content:str,tabs:list=None,lists:list=None):
- vod = {
- "vod_name": title.split('/n')[0],
- "vod_pic": img,
- "type_name": title,
- "vod_year": "",
- "vod_area": "",
- "vod_remarks": desc,
- "vod_actor": "",
- "vod_director": "",
- "vod_content": content
- }
- return vod
- def urljoin2(a,b):
- a = str(a).replace("'",'').replace('"','')
- b = str(b).replace("'",'').replace('"','')
- # print(type(a),a)
- # print(type(b),b)
- ret = urljoin(a,b)
- return ret
- def join(lists,string):
- """
- 残废函数,没法使用
- :param lists:
- :param string:
- :return:
- """
- lists1 = lists.to_list()
- string1 = str(string)
- print(type(lists1),lists1)
- print(type(string1),string1)
- try:
- ret = string1.join(lists1)
- print(ret)
- return ret
- except Exception as e:
- print(e)
- return ''
- def dealObj(obj=None):
- if not obj:
- obj = {}
- encoding = obj.get('encoding') or 'utf-8'
- encoding = str(encoding).replace("'", "")
- # encoding = parseText(str(encoding))
- method = obj.get('method') or 'get'
- method = str(method).replace("'", "")
- # method = parseText(str(method))
- withHeaders = obj.get('withHeaders') or ''
- withHeaders = str(withHeaders).replace("'", "")
- # withHeaders = parseText(str(withHeaders))
- # print(type(url),url)
- # headers = dict(obj.get('headers')) if obj.get('headers') else {}
- # headers = obj.get('headers').to_dict() if obj.get('headers') else {}
- headers = obj.get('headers') if obj.get('headers') else {}
- new_headers = {}
- # print(type(headers),headers)
- for i in headers:
- new_headers[str(i).replace("'", "")] = str(headers[i]).replace("'", "")
- # print(type(new_headers), new_headers)
- timeout = float(obj.get('timeout').to_int()) if obj.get('timeout') else None
- # print(type(timeout), timeout)
- body = obj.get('body') if obj.get('body') else {}
- # print(body)
- # print(type(body))
- if isinstance(body,PyJsString):
- body = parseText(str(body))
- new_dict = {}
- new_tmp = body.split('&')
- for i in new_tmp:
- new_dict[i.split('=')[0]] = i.split('=')[1]
- body = new_dict
- new_body = {}
- for i in body:
- new_body[str(i).replace("'", "")] = str(body[i]).replace("'", "")
- return {
- 'encoding':encoding,
- 'headers':new_headers,
- 'timeout':timeout,
- 'body': new_body,
- 'method':method,
- 'withHeaders':withHeaders
- }
- def coverDict2form(data:dict):
- forms = []
- for k,v in data.items():
- forms.append(f'{k}={v}')
- return '&'.join(forms)
- def base_request(url,obj):
- # verify=False 关闭证书验证
- # print(obj)
- url = str(url).replace("'", "")
- method = obj.get('method') or ''
- withHeaders = obj.get('withHeaders') or ''
- # print(f'withHeaders:{withHeaders}')
- if not method:
- method = 'get'
- obj['method'] = 'method'
- # print(obj)
- print(f"{method}:{url}:{obj['headers']}:{obj.get('body','')},请求超时:{obj['timeout']}")
- try:
- # r = requests.get(url, headers=headers, params=body, timeout=timeout)
- if method.lower() == 'get':
- r = requests.get(url, headers=obj['headers'], params=obj['body'], timeout=obj['timeout'],verify=False)
- else:
- # if isinstance(obj['body'],dict):
- # obj['body'] = coverDict2form(obj['body'])
- # print(obj['body'])
- # 亲测不需要转换data 格式的dict 为 form都正常 (gaze规则和奇优搜索)
- r = requests.post(url, headers=obj['headers'], data=obj['body'], timeout=obj['timeout'],verify=False)
- # r = requests.get(url, timeout=timeout)
- # r = requests.get(url)
- # print(encoding)
- r.encoding = obj['encoding']
- # print(f'源码:{r.text}')
- if withHeaders:
- backObj = {
- 'url':r.url,
- 'body':r.text,
- 'headers':r.headers
- }
- return backObj
- else:
- return r.text
- except Exception as e:
- print(f'{method}请求发生错误:{e}')
- return {} if withHeaders else ''
- def fetch(url,obj):
- # print('fetch')
- obj = dealObj(obj)
- if not obj.get('headers') or not any([obj['headers'].get('User-Agent'),obj['headers'].get('user-agent')]):
- obj['headers']['User-Agent'] = obj['headers'].get('user-agent',PC_UA)
- return base_request(url,obj)
- def post(url,obj):
- obj = dealObj(obj)
- obj['method'] = 'post'
- return base_request(url,obj)
- def request(url,obj):
- obj = dealObj(obj)
- # print(f'{method}:{url}')
- if not obj.get('headers') or not any([obj['headers'].get('User-Agent'),obj['headers'].get('user-agent')]):
- obj['headers']['User-Agent'] = obj['headers'].get('user-agent',UC_UA)
- return base_request(url, obj)
- def redx(text):
- """
- 修正js2py交互的字符串自动加前后引号问题
- :param text:
- :return:
- """
- # return text.replace("'", "").replace('"', "")
- text = str(text)
- if text.startswith("'") and text.endswith("'"):
- text = text[1:-1]
- return text
- def buildUrl(url,obj=None):
- # url = str(url).replace("'", "")
- url = redx(url)
- if not obj:
- obj = {}
- new_obj = {}
- for i in obj:
- # new_obj[str(i).replace("'", "")] = str(obj[i]).replace("'", "")
- new_obj[redx(i)] = redx(obj[i])
- if str(url).find('?') < 0:
- url = str(url) + '?'
- param_list = [f'{i}={new_obj[i]}' for i in new_obj]
- # print(param_list)
- prs = '&'.join(param_list)
- if len(new_obj) > 0 and not str(url).endswith('?'):
- url += '&'
- # url = (url + prs).replace('"','').replace("'",'')
- url = url + prs
- # print(url)
- return url
- def forceOrder(lists:list,key:str=None,option=None):
- """
- 强制正序
- :param lists:
- :param key:
- :return:
- """
- start = math.floor(len(lists)/2)
- end = min(len(lists)-1,start+1)
- if start >= end:
- return lists
- first = lists[start]
- second = lists[end]
- if key:
- try:
- first = first[key]
- second = second[key]
- except:
- pass
- if option and hasattr(option, '__call__'):
- try:
- first = option(first)
- second = option(second)
- # print(f'first:{first},second:{second}')
- except Exception as e:
- print(f'强制排序执行option发生了错误:{e}')
- first = str(first)
- second = str(second)
- if re.search(r'(\d+)',first) and re.search(r'(\d+)',second):
- num1 = int(re.search(r'(\d+)',first).groups()[0])
- num2 = int(re.search(r'(\d+)',second).groups()[0])
- if num1 > num2:
- lists.reverse()
- return lists
- def base64ToImage(image_base64:str):
- if isinstance(image_base64,PyJsString):
- image_base64 = parseText(str(image_base64))
- if ',' in image_base64:
- image_base64 = image_base64.split(',')[1]
- img_data = base64.b64decode(image_base64)
- return img_data