123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- from time import sleep
- from config import *
- from datetime import datetime
- from ecdsa import SigningKey, SECP256k1
- from eth_account.messages import encode_defunct
- from random import choice
- from requests import request
- from sha3 import keccak_256
- from sys import exit
- from time import sleep
- from string import ascii_lowercase, digits
- from user_agent import generate_user_agent
- from web3.auto import w3
- from names import get_first_name, get_last_name
- from config import (
- SECMAIL_DOMAINS,
- WAIT_MAIL_ROUNDS,
- WAIT_MAIL_TIME,
- CHANGE_PROXY_URL,
- PROXY_URL,
- ALLOWED_PROXY_SCHEMES,
- )
- from log import logger
- def proxy_url_to_proxies(url):
- try:
- schema, url = url.split('://')
- auth, url = url.split('@')
- username, password = auth.split(':')
- host, port = url.split(':')
- except Exception as e:
- logger.error(f'Zjef5mTM4J3NWmHP; invalid proxy format: {url}')
- return {'status': 'error'}
- if not schema in ALLOWED_PROXY_SCHEMES:
- logger.error(f'2Hsm4eUf3YjktahK; {schema}')
- return {
- 'status': 'success',
- 'data': {
- 'proxies': {
- 'http': f'{schema}://{username}:{password}@{host}:{port}',
- 'https': f'{schema}://{username}:{password}@{host}:{port}',
- }
- }
- }
- def proxy_validation(proxies):
- sleep(5)
- response = request('GET', 'http://myexternalip.com/raw', proxies=proxies)
- if response.status_code != 200:
- logger.error(f'code: kvbMTudHNS8nkt8d; {response.status_code} {response.text}')
- return {'status': 'error'}
- if not response.text:
- logger.error(f'code: CBTc5NEGE75vuwCU; {response.status_code} {response.text}')
- return {'status': 'error'}
- return {
- 'status': 'success',
- 'data': {
- 'external_ip': response.text
- }
- }
- def change_proxy_with_url(url):
- response = request('GET', url)
- if response.status_code != 200:
- logger.error(f'code: wzuz3FPKaJmJtEJb; {response.status_code} {response.text}')
- return {'status': 'error'}
- logger.debug(response.text)
- sleep(10)
- return {'status': 'success'}
- def get_random_string(length=10):
- return ''.join([choice(ascii_lowercase+digits) for _ in range(length)])
- def checksum_encode(addr_str): # Takes a hex (string) address as input
- keccak = keccak_256()
- out = ''
- addr = addr_str.lower().replace('0x', '')
- keccak.update(addr.encode('ascii'))
- hash_addr = keccak.hexdigest()
- for i, c in enumerate(addr):
- if int(hash_addr[i], 16) >= 8:
- out += c.upper()
- else:
- out += c
- return '0x' + out
- def parse_1secmail_mail(email, _from, subject, proxies, without_mail_id=None):
- mail_id = None
- split_email = email.split('@')
- url = f'https://www.1secmail.com/api/v1/?action=getMessages&login={split_email[0]}&domain={split_email[1]}'
-
- all_break = False
- for _ in range(WAIT_MAIL_ROUNDS):
- if all_break == True: break
- response = request('GET', url, proxies=proxies)
- if not response.status_code == 200:
- sleep(WAIT_MAIL_TIME+1)
- continue
- if not response.json():
- sleep(WAIT_MAIL_TIME+1)
- continue
- for mail in response.json():
- if all_break == True: break
- if not _from in mail['from'] or not subject in mail['subject']:
- logger.debug(f'unknown mail: {mail}')
- continue
- else:
- mail_id = mail['id']
- all_break = True
- sleep(WAIT_MAIL_TIME+1)
- if not mail_id:
- logger.error('code: wd75Kpbn,cannnot get mail')
- return {'status': 'error'}
- if mail_id == without_mail_id:
- logger.debug('repetition mail, start parser again')
- return parse_mail(email, _from, subject, without_mail_id, proxies)
- url = f'https://www.1secmail.com/api/v1/?action=readMessage&login={split_email[0]}&domain={split_email[1]}&id={mail_id}'
- sleep(5)
- response = request('GET', url)
- if not response.status_code == 200:
- logger.error(f'code: zK997XAv, {response.status_code} {response.text}')
- return {'status': 'error'}
- if not response.text or response.text == 'Message not found':
- logger.error(f'code: VaPXmY7f, {response.text}')
- return {'status': 'error'}
- return {
- 'status': 'success',
- 'data': {
- 'id': mail_id,
- 'text': response.text,
- 'json': response.json()
- }
- }
- def get_random_alliance_name():
- return choice(ALLIANCES)
- def get_random_1secmail_email():
- return get_random_string() + '@' + choice(SECMAIL_DOMAINS)
- def get_random_first_name():
- return get_first_name()
- def get_random_last_name():
- return get_last_name()
- def get_random_password():
- return '!1Q' + get_random_string()
- def get_current_datetime():
- return str(datetime.now())
- def get_random_username():
- return get_random_string()
- def get_random_user_agent():
- return generate_user_agent()
- def wallet_sign_message(message, private_key):
- encode_message = encode_defunct(text=message)
- signed_message = w3.eth.account.sign_message(encode_message, private_key=private_key)
- return signed_message.signature.hex()
- def get_random_wallet():
- keccak = keccak_256()
- private_key = SigningKey.generate(curve=SECP256k1)
- public_key = private_key.get_verifying_key().to_string()
- keccak.update(public_key)
- address = keccak.hexdigest()[24:]
- return {
- 'address': checksum_encode(address),
- 'public_key': public_key.hex(),
- 'private_key': private_key.to_string().hex()
- }
- def get_random_tor_proxy_url(username=get_random_string(), password=get_random_string()):
- return f'socks5://{username}:{password}@127.0.0.1:9052'
- def get_proxy():
- if not PROXY_URL:
- return {
- 'type': 'none',
- 'external_ip': '',
- 'proxies': {
- 'http': '',
- 'https': '',
- },
- }
- elif not CHANGE_PROXY_URL:
- proxy_url = PROXY_URL
- else:
- change_proxy_with_url_result = change_proxy_with_url(CHANGE_PROXY_URL)
- if change_proxy_with_url_result['status'] != 'success':
- assert False, f'dqcjheHVm75qk6fH; {proxies}'
- proxy_url = PROXY_URL
- proxies = proxy_url_to_proxies(proxy_url)
- if proxies['status'] != 'success':
- assert False, 'xsnYjY6BwbLw8X9q' + str(proxies)
- proxy_validation_result = proxy_validation(proxies['data']['proxies'])
- if proxy_validation_result['status'] != 'success':
- assert False, f'Pk8sv9wHkCuPWNYv; {proxy_validation_result}'
- external_ip = proxy_validation_result['data']['external_ip']
- return {
- 'external_ip': external_ip,
- 'proxies': proxies['data']['proxies'],
- }
- def get_email():
- return get_random_1secmail_email()
|