run_same_num_txs_heuristic.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. """
  2. Lambda Class's "Same # of Transactions" Heuristic.
  3. """
  4. import os, json
  5. import itertools
  6. import pandas as pd
  7. from tqdm import tqdm
  8. from collections import defaultdict
  9. from typing import Any, Tuple, List, Set, Dict, Optional
  10. from pandas import Timestamp, Timedelta
  11. from src.utils.utils import to_json, from_pickle, to_pickle
  12. from src.utils.utils import Entity, Heuristic
  13. pd.options.mode.chained_assignment = None
  14. def main(args: Any):
  15. if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
  16. appendix: str = f'_exact_{args.max_num_days}days'
  17. clusters_file: str = os.path.join(args.save_dir, f'same_num_txs_clusters{appendix}.json')
  18. tx2addr_file: str = os.path.join(args.save_dir, f'same_num_txs_tx2addr{appendix}.json')
  19. tx2block_file: str = os.path.join(args.save_dir, f'same_num_txs_tx2block{appendix}.json')
  20. tx2ts_file: str = os.path.join(args.save_dir, f'same_num_txs_tx2ts{appendix}.json')
  21. addr2conf_file: str = os.path.join(args.save_dir, f'same_num_txs_addr2conf{appendix}.json')
  22. address_file: str = os.path.join(args.save_dir, f'same_num_txs_address_sets{appendix}.json')
  23. metadata_file: str = os.path.join(args.save_dir, f'same_num_txs_metadata{appendix}.csv')
  24. withdraw_df, deposit_df, tornado_df = load_data(args.data_dir)
  25. clusters, address_sets, tx2addr, addr2conf = get_same_num_transactions_clusters(
  26. deposit_df, withdraw_df, tornado_df, args.max_num_days, args.data_dir)
  27. tx2block, tx2ts = get_transaction_info(withdraw_df, deposit_df)
  28. # save some stuff before continuing
  29. to_json(clusters, clusters_file)
  30. to_json(tx2addr, tx2addr_file)
  31. to_json(tx2block, tx2block_file)
  32. to_json(tx2ts, tx2ts_file)
  33. to_pickle(addr2conf, addr2conf_file)
  34. del clusters, tx2addr, deposit_df, withdraw_df, tornado_df # free memory
  35. to_json(address_sets, address_file)
  36. """
  37. address_sets = from_json(address_file)
  38. addr2conf = from_pickle(addr2conf_file)
  39. """
  40. address_metadata = get_metadata(address_sets, addr2conf)
  41. address_metadata.to_csv(metadata_file, index=False)
  42. def load_data(root) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
  43. withdraw_df: pd.DataFrame = pd.read_csv(
  44. os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
  45. # Change recipient_address to lowercase.
  46. withdraw_df['recipient_address'] = withdraw_df['recipient_address'].str.lower()
  47. # Change block_timestamp field to be a timestamp object.
  48. withdraw_df['block_timestamp'] = withdraw_df['block_timestamp'].apply(pd.Timestamp)
  49. deposit_df: pd.DataFrame = pd.read_csv(
  50. os.path.join(root, 'lighter_complete_deposit_txs.csv'))
  51. # Change block_timestamp field to be a timestamp object.
  52. deposit_df['block_timestamp'] = deposit_df['block_timestamp'].apply(pd.Timestamp)
  53. # Load Tornado data
  54. tornado_df: pd.DataFrame = pd.read_csv(args.tornado_csv)
  55. return withdraw_df, deposit_df, tornado_df
  56. def get_transaction_info(
  57. withdraw_df: pd.DataFrame,
  58. deposit_df: pd.DataFrame
  59. ) -> Tuple[Dict[str, int], Dict[str, Any]]:
  60. hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
  61. block_numbers: pd.DataFrame = \
  62. pd.concat([withdraw_df.block_number, deposit_df.block_number])
  63. block_timestamps: pd.DataFrame = \
  64. pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
  65. block_timestamps: pd.DataFrame = block_timestamps.apply(pd.Timestamp)
  66. block_timestamps: pd.Series = \
  67. block_timestamps.apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  68. tx2block = dict(zip(hashes, block_numbers))
  69. tx2ts = dict(zip(hashes, block_timestamps))
  70. return tx2block, tx2ts
  71. def get_same_num_transactions_clusters(
  72. deposit_df: pd.DataFrame,
  73. withdraw_df: pd.DataFrame,
  74. tornado_df: pd.DataFrame,
  75. max_num_days: int,
  76. data_dir: str,
  77. ):
  78. """
  79. Same Number of Transactions Heuristic.
  80. If there are multiple (say 12) deposit transactions coming from
  81. a deposit address and later there are 12 withdraw transactions
  82. to the same withdraw address, *then we can link all these deposit
  83. transactions to the withdraw transactions*.
  84. """
  85. tornado_addresses: Dict[str, int] = \
  86. dict(zip(tornado_df.address, tornado_df.tags))
  87. tornado_tags: List[str] = tornado_df.tags.to_list()
  88. tx_clusters: List[Set[str]] = []
  89. tx2addr: Dict[str, str] = {}
  90. address_sets: List[Set[str]] = []
  91. addr2conf: Dict[Tuple[str, str], float] = {}
  92. cache_window_file: str = os.path.join(
  93. data_dir, f'deposit_windows_{max_num_days}days.pickle')
  94. cache_portfolio_file: str = os.path.join(
  95. data_dir, f'deposit_portfolio_{max_num_days}days.csv')
  96. if os.path.isfile(cache_window_file):
  97. print('Loading deposit windows')
  98. deposit_windows: pd.DataFrame = from_pickle(cache_window_file)
  99. raw_portfolios: pd.DataFrame = pd.read_csv(cache_portfolio_file)
  100. else:
  101. print('Precomputing deposit windows')
  102. time_window: Timestamp = Timedelta(max_num_days, 'days')
  103. deposit_df['tornado_pool'] = deposit_df.tornado_cash_address.map(
  104. lambda x: tornado_addresses[x])
  105. deposit_windows: pd.Series = deposit_df.apply(lambda x: deposit_df[
  106. # find all deposits made before current one
  107. (deposit_df.block_timestamp <= x.block_timestamp) &
  108. # find all deposits made at most 24 hr before current one
  109. (deposit_df.block_timestamp >= (x.block_timestamp - time_window)) &
  110. # only consider those with same address as current one
  111. (deposit_df.from_address == x.from_address) &
  112. # ignore the current one from returned set
  113. (deposit_df.hash != x.hash)
  114. ], axis=1)
  115. deposit_windows: pd.DataFrame = pd.DataFrame(deposit_windows)
  116. to_pickle(deposit_windows, cache_window_file)
  117. raw_portfolios: pd.DataFrame = deposit_windows.apply(
  118. lambda x: x.iloc[0].groupby('tornado_pool').count()['hash'].to_dict(), axis=1)
  119. raw_portfolios.to_csv(cache_portfolio_file, index=False)
  120. deposit_portfolios: pd.DataFrame = make_portfolio_df(raw_portfolios, tornado_tags)
  121. print('Processing withdraws')
  122. pbar = tqdm(total=len(withdraw_df))
  123. for withdraw_row in withdraw_df.itertuples():
  124. results = same_num_of_transactions_heuristic(
  125. withdraw_row, withdraw_df, deposit_windows, deposit_portfolios,
  126. tornado_addresses, max_num_days)
  127. if results[0]:
  128. response_dict = results[1]
  129. # populate graph with known transactions
  130. withdraw_txs: List[str] = response_dict['withdraw_txs']
  131. deposit_txs: List[str] = response_dict['deposit_txs']
  132. withdraw_tx2addr: Dict[str, str] = response_dict['withdraw_tx2addr']
  133. deposit_tx2addr: Dict[str, str] = response_dict['deposit_tx2addr']
  134. tx_cluster: Set[str] = set(withdraw_txs + deposit_txs)
  135. withdraw_addr: str = response_dict['withdraw_addr']
  136. deposit_addrs: List[str] = response_dict['deposit_addrs']
  137. deposit_confs: List[float] = response_dict['deposit_confs']
  138. for deposit_addr, deposit_conf in zip(deposit_addrs, deposit_confs):
  139. if withdraw_addr != deposit_addr:
  140. address_sets.append([withdraw_addr, deposit_addr])
  141. addr2conf[(withdraw_addr, deposit_addr)] = deposit_conf
  142. tx2addr.update(withdraw_tx2addr)
  143. tx2addr.update(deposit_tx2addr)
  144. tx_clusters.append(tx_cluster)
  145. pbar.update()
  146. pbar.close()
  147. print(f'# clusters: {len(tx_clusters)}')
  148. return tx_clusters, address_sets, tx2addr, addr2conf
  149. def get_metadata(
  150. address_sets: List[Set[str]],
  151. addr2conf: Dict[Tuple[str, str], float],
  152. ) -> pd.DataFrame:
  153. """
  154. Stores metadata about addresses to add to db.
  155. """
  156. address: List[str] = []
  157. entity: List[int] = []
  158. conf: List[float] = []
  159. meta_data: List[str] = []
  160. heuristic: List[int] = []
  161. pbar = tqdm(total=len(address_sets))
  162. for cluster in address_sets:
  163. cluster: List[str] = list(cluster)
  164. assert len(cluster) == 2
  165. node_a, node_b = cluster
  166. conf_ab: float = addr2conf[(node_a, node_b)]
  167. address.append(node_a)
  168. entity.append(Entity.EOA.value)
  169. conf.append(conf_ab)
  170. meta_data.append(json.dumps({}))
  171. heuristic.append(Heuristic.SAME_NUM_TX.value)
  172. address.append(node_b)
  173. entity.append(Entity.EOA.value)
  174. conf.append(conf_ab)
  175. meta_data.append(json.dumps({}))
  176. heuristic.append(Heuristic.SAME_NUM_TX.value)
  177. pbar.update()
  178. pbar.close()
  179. response: Dict[str, List[Any]] = dict(
  180. address = address,
  181. entity = entity,
  182. conf = conf,
  183. meta_data = meta_data,
  184. heuristic = heuristic,
  185. )
  186. response: pd.DataFrame = pd.DataFrame.from_dict(response)
  187. response: pd.DataFrame = response.loc[response.groupby('address')['conf'].idxmax()]
  188. return response
  189. def same_num_of_transactions_heuristic(
  190. withdraw_tx: pd.Series,
  191. withdraw_df: pd.DataFrame,
  192. deposit_windows: pd.DataFrame,
  193. deposit_portfolios: pd.DataFrame,
  194. tornado_addresses: Dict[str, int],
  195. max_num_days: int,
  196. ) -> Tuple[bool, Optional[Dict[str, Any]]]:
  197. # Calculate the number of withdrawals of the address
  198. # from the withdraw_tx given as input.
  199. withdraw_counts, withdraw_set = get_num_of_withdraws(
  200. withdraw_tx, withdraw_df, tornado_addresses, max_num_days = max_num_days)
  201. # remove entries that only give to one pool, we are taking
  202. # multi-denominational deposits only
  203. if len(withdraw_counts) == 1:
  204. return (False, None)
  205. # if there are only 1 or 2 txs, ignore
  206. if sum(withdraw_counts.values()) < 2:
  207. return (False, None)
  208. withdraw_addr: str = withdraw_tx.recipient_address # who's gets the withdrawn
  209. withdraw_txs: List[str] = list(itertools.chain(*list(withdraw_set.values())))
  210. withdraw_tx2addr = dict(zip(withdraw_txs,
  211. [withdraw_addr for _ in range(len(withdraw_txs))]))
  212. matched_deposits: List[pd.Dataframe] = get_same_num_of_deposits(
  213. withdraw_counts, deposit_windows, deposit_portfolios)
  214. if len(matched_deposits) == 0: # no matched deposits by heuristic
  215. return (False, None)
  216. deposit_addrs: List[str] = []
  217. deposit_txs: List[str] = []
  218. deposit_confs: List[float] = []
  219. deposit_tx2addr: Dict[str, str] = {}
  220. for match in matched_deposits:
  221. deposit_addrs.append(match.from_address.iloc[0])
  222. txs: List[str] = match.hash.to_list()
  223. deposit_txs.extend(txs)
  224. deposit_confs.extend([1.0] * len(txs))
  225. deposit_tx2addr.update(dict(zip(match.hash, match.from_address)))
  226. deposit_addrs: List[str] = list(set(deposit_addrs))
  227. privacy_score: float = 1. - 1. / len(matched_deposits)
  228. response_dict: Dict[str, Any] = dict(
  229. withdraw_txs = withdraw_txs,
  230. deposit_txs = deposit_txs,
  231. deposit_confs = deposit_confs,
  232. withdraw_addr = withdraw_addr,
  233. deposit_addrs = deposit_addrs,
  234. withdraw_tx2addr = withdraw_tx2addr,
  235. deposit_tx2addr = deposit_tx2addr,
  236. privacy_score = privacy_score,
  237. )
  238. return (True, response_dict)
  239. def get_same_num_of_deposits(
  240. withdraw_counts: pd.DataFrame,
  241. deposit_windows: pd.DataFrame,
  242. deposit_portfolios: pd.DataFrame,
  243. ) -> List[pd.DataFrame]:
  244. # simple assertion that the number of non-zero currencies must be the same
  245. mask: Optional[pd.DataFrame] = \
  246. (deposit_portfolios > 0).sum(axis=1) == len(withdraw_counts)
  247. for k, v in withdraw_counts.items():
  248. if mask is None:
  249. mask: pd.DataFrame = (deposit_portfolios[k] == v)
  250. else:
  251. mask: pd.DataFrame = mask & (deposit_portfolios[k] == v)
  252. return [x[0] for x in deposit_windows[mask].values]
  253. def make_portfolio_df(raw_portfolios: pd.DataFrame, pools: List[str]) -> pd.DataFrame:
  254. raw_portfolios: List[Dict[str, int]] = \
  255. [eval(x) for x in raw_portfolios['0'].values]
  256. deposit_portfolios: Dict[str, List[str]] = defaultdict(lambda: [])
  257. for portfolio in raw_portfolios:
  258. for k in pools:
  259. if k in portfolio:
  260. deposit_portfolios[k].append(portfolio[k])
  261. else:
  262. deposit_portfolios[k].append(0)
  263. deposit_portfolios: Dict[str, List[str]] = dict(deposit_portfolios)
  264. return pd.DataFrame.from_dict(deposit_portfolios)
  265. def make_deposit_df(
  266. deposits: Dict[str, List[str]],
  267. hash2time: Dict[str, Timestamp],
  268. ) -> pd.DataFrame:
  269. transactions: List[str] = []
  270. pools: List[str] = []
  271. timestamps: List[Timestamp] = []
  272. for pool, txs in deposits.items():
  273. transactions.extend(txs)
  274. pools.extend([pool] * len(txs))
  275. timestamps.extend([hash2time[tx] for tx in txs])
  276. out: Dict[str, Any] = {
  277. 'transaction': transactions,
  278. 'pool': pools,
  279. 'timestamp': timestamps,
  280. }
  281. out: pd.DataFrame = pd.DataFrame.from_dict(out)
  282. return out
  283. def make_address_deposit_df(
  284. addr2deposit: Dict[str, Dict[str, List[str]]],
  285. hash2time: Dict[str, Timestamp],
  286. ) -> pd.DataFrame:
  287. addr_deposit_df: List[pd.DataFrame] = []
  288. for address, deposits in addr2deposit.items():
  289. deposit_df: pd.DataFrame = make_deposit_df(deposits, hash2time)
  290. deposit_df['address'] = address
  291. addr_deposit_df.append(deposit_df)
  292. addr_deposit_df: pd.DataFrame = pd.concat(addr_deposit_df)
  293. addr_deposit_df: pd.DataFrame = addr_deposit_df.reset_index()
  294. return addr_deposit_df
  295. def get_num_of_withdraws(
  296. withdraw_tx: pd.Series,
  297. withdraw_df: pd.DataFrame,
  298. tornado_addresses: Dict[str, str],
  299. max_num_days: int,
  300. ) -> Tuple[Dict[str, int], Dict[str, List[str]]]:
  301. """
  302. Given a particular withdraw transaction and the withdraw transactions
  303. DataFrame, gets the total withdraws the address made in each pool. It
  304. is returned as a dictionary with the pools as the keys and the number
  305. of withdraws as the values.
  306. """
  307. cur_withdraw_pool: str = tornado_addresses[withdraw_tx.tornado_cash_address]
  308. withdraw_txs: Dict[str, List[str]] = {
  309. tornado_addresses[withdraw_tx.tornado_cash_address]: []}
  310. time_window: Timestamp = Timedelta(max_num_days, 'days')
  311. subset_df: pd.DataFrame = withdraw_df[
  312. # ignore txs made by others
  313. (withdraw_df.recipient_address == withdraw_tx.recipient_address) &
  314. # ignore future transactions
  315. (withdraw_df.block_timestamp <= withdraw_tx.block_timestamp) &
  316. # ignore other withdraw transactions not within the last MAX_TIME_DIFF
  317. (withdraw_df.block_timestamp >= (withdraw_tx.block_timestamp - time_window)) &
  318. # ignore the query row
  319. (withdraw_df.hash != withdraw_tx.hash)
  320. ]
  321. subset_df['tornado_pool'] = subset_df.tornado_cash_address.map(
  322. lambda x: tornado_addresses[x])
  323. withdraw_count: pd.DataFrame = subset_df.groupby('tornado_pool').size()
  324. withdraw_count: Dict[str, int] = withdraw_count.to_dict()
  325. withdraw_txs: pd.DataFrame = subset_df.groupby('tornado_pool')['hash'].apply(list)
  326. withdraw_txs: Dict[str, List[str]] = withdraw_txs.to_dict()
  327. # add 1 for current address
  328. if cur_withdraw_pool in withdraw_count:
  329. withdraw_count[cur_withdraw_pool] += 1
  330. withdraw_txs[cur_withdraw_pool].append(withdraw_tx.hash)
  331. else:
  332. withdraw_count[cur_withdraw_pool] = 1
  333. withdraw_txs[cur_withdraw_pool] = [withdraw_tx.hash]
  334. return withdraw_count, withdraw_txs
  335. def get_address_deposits(
  336. deposit_df: pd.DataFrame,
  337. tornado_addresses: Dict[str, int],
  338. ) -> Dict[str, Dict[str, List[str]]]:
  339. """
  340. Given the deposit transactions DataFrame, returns a
  341. dictionary with every address to the transactions they
  342. deposited.
  343. Example:
  344. {
  345. '0x16e54b35d789832440ab47ae765e6a8098280676':
  346. {
  347. '0.1 ETH': [...],
  348. '100 USDT': [...],
  349. },
  350. '0x35dd029618f4e1835008da21fd98850c776453f0': {
  351. '0.1 ETH': [...],
  352. },
  353. '0xe906442c11b85acbc58eccb253b9a55a20b80a56': {
  354. '0.1 ETH': [...],
  355. },
  356. '0xaf301de836c81deb8dff9dc22745e23c476155b2': {
  357. '1 ETH': [...],
  358. '0.1 ETH': [...],
  359. '10 ETH': [...],
  360. },
  361. }
  362. """
  363. counts_df: pd.DataFrame = pd.DataFrame(
  364. deposit_df[['from_address', 'tornado_cash_address']].value_counts()
  365. ).rename(columns={0: "count"})
  366. addr2deposit: Dict[str, str] = {}
  367. print('building map from address to deposits made by address...')
  368. pbar = tqdm(total=len(counts_df))
  369. for row in counts_df.itertuples():
  370. deposit_set: pd.Series = deposit_df[
  371. (deposit_df.from_address == row.Index[0]) &
  372. (deposit_df.tornado_cash_address == row.Index[1])
  373. ].hash
  374. deposit_set: Set[str] = set(deposit_set)
  375. if row.Index[0] in addr2deposit.keys():
  376. addr2deposit[row.Index[0]][
  377. tornado_addresses[row.Index[1]]] = deposit_set
  378. else:
  379. addr2deposit[row.Index[0]] = {
  380. tornado_addresses[row.Index[1]]: deposit_set}
  381. pbar.update()
  382. pbar.close()
  383. return addr2deposit
  384. def get_max_time_diff(times: List[Timestamp]) -> float:
  385. diffs: List[float] = []
  386. for t1, t2 in itertools.product(times, repeat=2):
  387. diffs.append(abs((t1 - t2).total_seconds()))
  388. return max(diffs)
  389. if __name__ == "__main__":
  390. from argparse import ArgumentParser
  391. parser: ArgumentParser = ArgumentParser()
  392. parser.add_argument('data_dir', type=str, help='path to tornado cash data')
  393. parser.add_argument('tornado_csv', type=str, help='path to tornado cash pool addresses')
  394. parser.add_argument('save_dir', type=str, help='folder to save matches')
  395. parser.add_argument('--max-num-days', type=int, default=1,
  396. help='number of maximum days (default: 1)')
  397. args: Any = parser.parse_args()
  398. main(args)