get_wallets.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Find wallet addresses from transaction addresses.
  3. """
  4. import re
  5. import time
  6. import urllib
  7. from tqdm import tqdm
  8. import pandas as pd
  9. from bs4 import BeautifulSoup
  10. from typing import Optional, Any, Dict, Set, List
  11. from bs4.element import Tag, ResultSet
  12. from src.utils.utils import to_json
  13. def main(args: Any):
  14. def process_tx(tx: str):
  15. page: Any = get_etherscan_page(tx)
  16. data: Dict[str, Any] = get_etherscan_data(page)
  17. return data['from']['addr']
  18. clusters: List[Set[str]] = []
  19. pbar = tqdm(total=get_length(args.csv_path))
  20. for deposit, withdraw in load_data(args.csv_path):
  21. deposit_wallet: str = process_tx(deposit)
  22. time.sleep(0.5)
  23. withdraw_wallet: str = process_tx(withdraw)
  24. time.sleep(0.5)
  25. cluster: Set[str] = {deposit_wallet, withdraw_wallet}
  26. clusters.append(cluster)
  27. pbar.update()
  28. pbar.close()
  29. to_json(clusters, args.out_json)
  30. def get_length(csv_path: str):
  31. return len(pd.read_csv(csv_path))
  32. def load_data(csv_path: str):
  33. df: pd.DataFrame = pd.read_csv(csv_path)
  34. for row in df.itertuples():
  35. deposit: str = row.deposit_tx
  36. withdraw: str = row.withdrawl_tx
  37. yield deposit, withdraw
  38. def get_etherscan_page(tx_hash: str) -> Optional[bytes]:
  39. # simulate mozilla browser to download webpage
  40. user_agent: str = \
  41. 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
  42. headers = {'User-Agent': user_agent}
  43. uri: str = f'https://etherscan.io/tx/{tx_hash}'
  44. request = urllib.request.Request(uri, None, headers) # headers are important!
  45. response = urllib.request.urlopen(request)
  46. if response.code == 200:
  47. data = response.read()
  48. else:
  49. data = None
  50. return data
  51. def get_etherscan_data(page_bytes: Optional[bytes]) -> Optional[str]:
  52. """
  53. Pull out all the data from Etherscan on what the transaction is doing.
  54. """
  55. if page_bytes is None: # garbage in, garbage out
  56. return None
  57. soup: BeautifulSoup = BeautifulSoup(page_bytes, 'html.parser')
  58. table_div: Tag = soup.find('div', {'id': 'myTabContent'})
  59. table_div: Tag = table_div.find('div', {'id': 'ContentPlaceHolder1_maintable'})
  60. children: ResultSet = table_div.findChildren('div', recursive=False)
  61. data: Dict[str, Any] = dict()
  62. for child_div in children:
  63. messy_text: str = child_div.text
  64. if 'From:' in messy_text:
  65. components: ResultSet = child_div.findChildren('div', recursive=False)
  66. assert len(components) == 2
  67. raw_text: str = components[1].text.strip()
  68. type_: str = 'contract' if 'Contract' in raw_text else 'address'
  69. addr: str = components[1].find('span', {'id': 'spanFromAdd'}).text
  70. match: Any = re.search(r'\((.*?)\)', raw_text)
  71. wallet: Optional[str] = match.group(1) if match is not None else ''
  72. data['from'] = dict(addr=addr, type=type_, wallet=wallet)
  73. elif 'To:' in messy_text:
  74. components: ResultSet = child_div.findChildren('div', recursive=False)
  75. assert len(components) == 2
  76. raw_text: str = components[1].text.strip()
  77. type_: str = 'contract' if 'Contract' in raw_text else 'address'
  78. addr: str = components[1].find('span', {'id': 'spanToAdd'}).text
  79. match: Any = re.search(r'\((.*?)\)', raw_text)
  80. wallet: Optional[str] = match.group(1) if match is not None else ''
  81. data['to'] = dict(addr=addr, type=type_, wallet=wallet)
  82. return data
  83. if __name__ == "__main__":
  84. from argparse import ArgumentParser
  85. parser: ArgumentParser = ArgumentParser()
  86. parser.add_argument('csv_path', type=str, help='path to CSV')
  87. parser.add_argument('out_json', type=str, help='where to save CSV')
  88. args: Any = parser.parse_args()
  89. main(args)