run_linked_tx_heuristic.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. """
  2. Lambda Class's Linked Transactions Heuristic.
  3. """
  4. import os, json
  5. from tqdm import tqdm
  6. import numpy as np
  7. import pandas as pd
  8. import networkx as nx
  9. from collections import namedtuple
  10. from typing import Any, Tuple, Dict, Set, Any, List
  11. from src.utils.utils import to_json, Entity, Heuristic
  12. def main(args: Any):
  13. root: str = args.data_dir
  14. with open(os.path.join(root, 'tornado_pools.json')) as json_file:
  15. tornado_addresses: Dict[str, str] = json.load(json_file)
  16. deposit_txs: pd.DataFrame = pd.read_csv(
  17. os.path.join(root, 'lighter_complete_deposit_txs.csv'))
  18. deposit_txs['tcash_pool'] = deposit_txs['tornado_cash_address']\
  19. .apply(lambda addr: tornado_addresses[addr])
  20. withdraw_txs: pd.DataFrame = pd.read_csv(
  21. os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
  22. withdraw_txs['tcash_pool'] = withdraw_txs['tornado_cash_address']\
  23. .apply(lambda addr: tornado_addresses[addr])
  24. all_tx2addr: Dict[str, str] = {
  25. **dict(zip(deposit_txs.hash, deposit_txs.from_address)),
  26. **dict(zip(withdraw_txs.hash, withdraw_txs.recipient_address)),
  27. }
  28. addr_pool_to_deposit: Dict[Tuple[str, str], str] = \
  29. load_addresses_and_pools_to_deposits_json(
  30. os.path.join(root, 'addresses_and_pools_to_deposits.json'))
  31. address_and_withdraw: pd.DataFrame = pd.read_csv(
  32. os.path.join(root, 'transactions_between_deposit_and_withdraw_addresses.csv'))
  33. address_and_withdraw: pd.DataFrame = \
  34. address_and_withdraw[['from_address', 'to_address']]
  35. address_and_withdraw_counts: pd.DataFrame = \
  36. address_and_withdraw.groupby(
  37. ['from_address', 'to_address']).size().reset_index(name='size')
  38. address_and_withdraw: pd.DataFrame = \
  39. address_and_withdraw_counts[
  40. address_and_withdraw_counts['size'] >= args.min_interactions]
  41. address_and_withdraw: pd.DataFrame = dataframe_from_set_of_sets(
  42. filter(lambda x: len(x) == 2,
  43. filter_repeated_and_permuted(address_and_withdraw)))
  44. unique_deposits: Set[str] = set(deposit_txs['from_address'])
  45. unique_withdraws: Set[str] = set(withdraw_txs['recipient_address'])
  46. withdraw2deposit: Dict[str, str] = map_withdraw2deposit(
  47. address_and_withdraw, unique_deposits, unique_withdraws)
  48. links: Dict[str, List[str]] = apply_first_neighbors_heuristic(
  49. withdraw_txs, withdraw2deposit, addr_pool_to_deposit)
  50. # build a graph, then find clusters, build tx2addr
  51. clusters, tx2addr = build_clusters(links, all_tx2addr)
  52. tx2block, tx2ts = get_transaction_info(withdraw_txs, deposit_txs)
  53. address_sets: List[Set[str]] = get_address_sets(clusters, tx2addr)
  54. address_metadata: List[Dict[str, Any]] = get_metadata(address_sets)
  55. if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
  56. clusters_file: str = os.path.join(args.save_dir, f'linked_tx_clusters_{args.min_interactions}intxs.json')
  57. tx2addr_file: str = os.path.join(args.save_dir, f'linked_tx_tx2addr_{args.min_interactions}intxs.json')
  58. tx2block_file: str = os.path.join(args.save_dir, f'linked_tx_tx2block_{args.min_interactions}intxs.json')
  59. tx2ts_file: str = os.path.join(args.save_dir, f'linked_tx_tx2ts_{args.min_interactions}intxs.json')
  60. address_file: str = os.path.join(args.save_dir, f'linked_tx_address_set_{args.min_interactions}intxs.json')
  61. metadata_file: str = os.path.join(args.save_dir, f'linked_tx_metadata_{args.min_interactions}intxs.csv')
  62. to_json(clusters, clusters_file)
  63. to_json(tx2addr, tx2addr_file)
  64. to_json(tx2block, tx2block_file)
  65. to_json(tx2ts, tx2ts_file)
  66. to_json(address_sets, address_file)
  67. address_metadata.to_csv(metadata_file, index=False)
  68. def build_clusters(
  69. links: Dict[str, List[str]],
  70. all_tx2addr: Dict[str, str]) -> Tuple[List[Set[str]], Dict[str, str]]:
  71. graph: nx.DiGraph = nx.DiGraph()
  72. tx2addr: Dict[str, str] = {}
  73. for withdraw, deposits in links.items():
  74. graph.add_node(withdraw)
  75. graph.add_nodes_from(deposits)
  76. for deposit in deposits:
  77. graph.add_edge(withdraw, deposit)
  78. tx2addr[withdraw] = all_tx2addr[withdraw]
  79. tx2addr[deposit] = all_tx2addr[deposit]
  80. clusters: List[Set[str]] = [ # ignore singletons
  81. c for c in nx.weakly_connected_components(graph) if len(c) > 1]
  82. return clusters, tx2addr
  83. def get_transaction_info(
  84. withdraw_df: pd.DataFrame,
  85. deposit_df: pd.DataFrame
  86. ) -> Tuple[Dict[str, int], Dict[str, Any]]:
  87. hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
  88. block_numbers: pd.DataFrame = \
  89. pd.concat([withdraw_df.block_number, deposit_df.block_number])
  90. block_timestamps: pd.DataFrame = \
  91. pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
  92. block_timestamps: pd.DataFrame = block_timestamps.apply(pd.Timestamp)
  93. block_timestamps: pd.Series = \
  94. block_timestamps.apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  95. tx2block = dict(zip(hashes, block_numbers))
  96. tx2ts = dict(zip(hashes, block_timestamps))
  97. return tx2block, tx2ts
  98. def get_address_sets(
  99. tx_clusters: List[Set[str]],
  100. tx2addr: Dict[str, str],
  101. ) -> List[Set[str]]:
  102. """
  103. Stores pairs of addresses that are related to each other. Don't
  104. apply graphs on this because we are going to join this into the
  105. other clusters.
  106. """
  107. address_sets: List[Set[str]] = []
  108. for cluster in tx_clusters:
  109. addr_set: Set[str] = set([tx2addr[tx] for tx in cluster])
  110. addr_set: List[str] = list(addr_set)
  111. if len(addr_set) > 1: # make sure not singleton
  112. for addr1 in addr_set:
  113. for addr2 in addr_set:
  114. if addr1 != addr2:
  115. address_sets.append({addr1, addr2})
  116. return address_sets
  117. def get_metadata(address_sets: List[Set[str]]) -> pd.DataFrame:
  118. """
  119. Stores metadata about addresses to add to db.
  120. """
  121. unique_addresses: Set[str] = set().union(*address_sets)
  122. address: List[str] = []
  123. entity: List[int] = []
  124. conf: List[float] = []
  125. meta_data: List[str] = []
  126. heuristic: List[int] = []
  127. for member in unique_addresses:
  128. address.append(member)
  129. entity.append(Entity.EOA.value)
  130. conf.append(1)
  131. meta_data.append(json.dumps({}))
  132. heuristic.append(Heuristic.LINKED_TX.value)
  133. response: Dict[str, List[Any]] = dict(
  134. address = address,
  135. entity = entity,
  136. conf = conf,
  137. meta_data = meta_data,
  138. heuristic = heuristic,
  139. )
  140. response: pd.DataFrame = pd.DataFrame.from_dict(response)
  141. return response
  142. def apply_first_neighbors_heuristic(
  143. withdraw_txs: pd.Series,
  144. withdraw2deposit: Dict[str, str],
  145. addr_pool_to_deposit: Dict[Tuple[str, str], str]) -> Dict[str, List[str]]:
  146. links: Dict[str, str] = {}
  147. for row in tqdm(withdraw_txs.itertuples(), total=len(withdraw_txs)):
  148. dic = first_neighbors_heuristic(row, withdraw2deposit, addr_pool_to_deposit)
  149. links.update(dic)
  150. return dict(filter(lambda elem: len(elem[1]) != 0, links.items()))
  151. def first_neighbors_heuristic(
  152. withdraw_tx: pd.Series,
  153. withdraw2deposit: Dict[str, str],
  154. addr_pool_to_deposit: Dict[Tuple[str, str], str]) -> dict:
  155. """
  156. Check that there has been a transaction between this address and some deposit
  157. address outside Tcash. If not, return an empty list for this particular withdraw.
  158. """
  159. address: str = withdraw_tx.recipient_address
  160. pool: str = withdraw_tx.tcash_pool
  161. AddressPool = namedtuple('AddressPool', ['address', 'pool'])
  162. if address in withdraw2deposit.keys():
  163. interacted_addresses = withdraw2deposit[address]
  164. linked_deposits = []
  165. for addr in interacted_addresses:
  166. if AddressPool(address=addr, pool=pool) in addr_pool_to_deposit.keys():
  167. for d in addr_pool_to_deposit[AddressPool(address=addr, pool=pool)]:
  168. if d.timestamp < withdraw_tx.block_timestamp:
  169. linked_deposits.append(d.deposit_hash)
  170. return {withdraw_tx.hash: linked_deposits}
  171. else:
  172. return {withdraw_tx.hash: []}
  173. def map_withdraw2deposit(
  174. address_and_withdraw: pd.DataFrame,
  175. deposits: Set[str],
  176. withdraws: Set[str]
  177. ) -> dict:
  178. """
  179. Map interactions between every withdraw address to every deposit address, outside TCash
  180. """
  181. deposit_and_withdraw = np.empty((0, 2), dtype=str)
  182. for row in tqdm(address_and_withdraw.itertuples(),
  183. total=len(address_and_withdraw),
  184. mininterval=0.5):
  185. if (is_D_W_tx(row.address_1, row.address_2, deposits, withdraws) or
  186. is_D_DW_tx(row.address_1, row.address_2, deposits, withdraws) or
  187. is_DW_W_tx(row.address_1, row.address_2, deposits, withdraws)):
  188. deposit_and_withdraw = np.append(
  189. deposit_and_withdraw, [[row.address_1, row.address_2]], axis=0)
  190. elif (is_W_D_tx(row.address_1, row.address_2, deposits, withdraws) or
  191. is_W_DW_tx(row.address_1, row.address_2, deposits, withdraws) or
  192. is_DW_D_tx(row.address_1, row.address_2, deposits, withdraws)):
  193. deposit_and_withdraw = np.append(
  194. deposit_and_withdraw, [[row.address_2, row.address_1]], axis=0)
  195. elif is_DW_DW_tx(row.address_1, row.address_2, deposits, withdraws):
  196. deposit_and_withdraw = np.append(
  197. deposit_and_withdraw, [[row.address_1, row.address_2]], axis=0)
  198. deposit_and_withdraw = np.append(
  199. deposit_and_withdraw, [[row.address_2, row.address_1]], axis=0)
  200. else:
  201. raise ValueError('Unknown type: D_W, W_D, D_DW, DW_D, W_DW, DW_W, DW_DW')
  202. D_W_df = pd.DataFrame(deposit_and_withdraw, columns=['deposit', 'withdraw'])
  203. return dict(D_W_df.groupby('withdraw')['deposit'].apply(list))
  204. # -- tx classification utilities --
  205. def is_D_type(address: str, deposits: Set[str], withdraws: Set[str]) -> bool:
  206. return (address in deposits) and (address not in withdraws)
  207. def is_W_type(address: str, deposits: Set[str], withdraws: Set[str]) -> bool:
  208. return (address not in deposits) and (address in withdraws)
  209. def is_DW_type(address: str, deposits: Set[str], withdraws: Set[str]) -> bool:
  210. return (address in deposits) and (address in withdraws)
  211. def is_D_W_tx(
  212. address1: str, address2: str,
  213. deposits: Set[str], withdraws: Set[str]) -> bool:
  214. return is_D_type(address1, deposits, withdraws) and \
  215. is_W_type(address2, deposits, withdraws)
  216. def is_W_D_tx(
  217. address1: str, address2: str,
  218. deposits: Set[str], withdraws: Set[str]) -> bool:
  219. return is_W_type(address1, deposits, withdraws) and \
  220. is_D_type(address2, deposits, withdraws)
  221. def is_D_DW_tx(
  222. address1: str, address2: str,
  223. deposits: Set[str], withdraws: Set[str]) -> bool:
  224. return is_D_type(address1, deposits, withdraws) and \
  225. is_DW_type(address2, deposits, withdraws)
  226. def is_DW_D_tx(
  227. address1: str, address2: str,
  228. deposits: Set[str], withdraws: Set[str]) -> bool:
  229. return is_DW_type(address1, deposits, withdraws) and \
  230. is_D_type(address2, deposits, withdraws)
  231. def is_W_DW_tx(
  232. address1: str, address2: str,
  233. deposits: Set[str], withdraws: Set[str]) -> bool:
  234. return is_W_type(address1, deposits, withdraws) and \
  235. is_DW_type(address2, deposits, withdraws)
  236. def is_DW_W_tx(
  237. address1: str, address2: str,
  238. deposits: Set[str], withdraws: Set[str]) -> bool:
  239. return is_DW_type(address1, deposits, withdraws) and \
  240. is_W_type(address2, deposits, withdraws)
  241. def is_DW_DW_tx(
  242. address1: str, address2: str,
  243. deposits: Set[str], withdraws: Set[str]) -> bool:
  244. return is_DW_type(address1, deposits, withdraws) and \
  245. is_DW_type(address2, deposits, withdraws)
  246. # -- data utilities --
  247. def filter_repeated_and_permuted(address_and_withdraw_df):
  248. filtered_addresses_set = set()
  249. for row in address_and_withdraw_df.itertuples():
  250. filtered_addresses_set.add(frozenset([row.from_address, row.to_address]))
  251. return filtered_addresses_set
  252. def dataframe_from_set_of_sets(set_of_sets):
  253. addresses_df = pd.DataFrame({'address_1':[], 'address_2':[]})
  254. for s in tqdm(set_of_sets):
  255. s_tuple = tuple(s)
  256. if len(s) == 2:
  257. addresses_df = addresses_df.append(
  258. {'address_1': s_tuple[0], 'address_2': s_tuple[1]},
  259. ignore_index=True)
  260. else:
  261. addresses_df = addresses_df.append(
  262. {'address_1': s_tuple[0], 'address_2': s_tuple[0]},
  263. ignore_index=True)
  264. return addresses_df
  265. def remap_keys(mapping):
  266. return [{'key': k,'value': v} for k, v in mapping.items()]
  267. def load_addresses_and_pools_to_deposits_json(filepath):
  268. with open(filepath) as json_file:
  269. raw_dict_list = json.load(json_file)
  270. addresses_and_pools_to_deposits: dict = {}
  271. HashTimestamp = namedtuple('HashTimestamp', ['deposit_hash', 'timestamp'])
  272. AddressPool = namedtuple('AddressPool', ['address', 'pool'])
  273. for dic in raw_dict_list:
  274. elem = {
  275. AddressPool(address=dic['key'][0], pool=dic['key'][1]): \
  276. [HashTimestamp(deposit_hash=l[0], timestamp=l[1]) for l in dic['value']]
  277. }
  278. addresses_and_pools_to_deposits.update(elem)
  279. return addresses_and_pools_to_deposits
  280. def _addr_pool_to_deposits(address: str, tcash_pool: str, deposit_txs) -> dict:
  281. """
  282. # Given an address and the TCash pool, give all the deposits that
  283. # address has done in that pool.
  284. """
  285. mask = (deposit_txs['from_address'] == address) & \
  286. (deposit_txs['tcash_pool'] == tcash_pool)
  287. addr_pool_deposits = deposit_txs[mask]
  288. HashTimestamp = namedtuple('HashTimestamp', ['deposit_hash', 'timestamp'])
  289. AddressPool = namedtuple('AddressPool', ['address', 'pool'])
  290. hashes_and_timestamps: list = [None] * len(addr_pool_deposits)
  291. for i, row in enumerate(addr_pool_deposits.itertuples()):
  292. hashes_and_timestamps[i] = HashTimestamp(
  293. deposit_hash=row.hash, timestamp=row.block_timestamp)
  294. return {AddressPool(address=address, pool=tcash_pool): hashes_and_timestamps}
  295. def addresses_and_pools_to_deposits(deposit_txs) -> dict:
  296. """
  297. Gives a dictionary with deposit addresses as keys and the
  298. deposit transactions each address made as values.
  299. """
  300. addresses_and_pools: dict = dict(
  301. deposit_txs.groupby('from_address')['tcash_pool'].apply(list))
  302. addresses_and_pools_to_deposits: dict = {}
  303. for addr in tqdm(addresses_and_pools.keys(), mininterval=3):
  304. for pool in addresses_and_pools[addr]:
  305. addresses_and_pools_to_deposits.update(
  306. _addr_pool_to_deposits(addr, pool, deposit_txs))
  307. return addresses_and_pools_to_deposits
  308. if __name__ == "__main__":
  309. from argparse import ArgumentParser
  310. parser: ArgumentParser = ArgumentParser()
  311. parser.add_argument('data_dir', type=str, help='path to tornado cash data')
  312. parser.add_argument('save_dir', type=str, help='folder to save clusters')
  313. parser.add_argument('--min-interactions', type=int, default=3,
  314. help='minimum number of interactions (default: 3)')
  315. args: Any = parser.parse_args()
  316. main(args)