run_exact_match_heuristic.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. Lambda Class's Exact Match Heuristic.
  3. """
  4. import os
  5. import pandas as pd
  6. from pandas._config.config import describe_option
  7. from tqdm import tqdm
  8. import networkx as nx
  9. from typing import Any, Tuple, List, Set, Optional, Dict
  10. from src.utils.utils import to_json
  11. def main(args: Any):
  12. withdraw_df, deposit_df = load_data(args.data_dir)
  13. clusters, tx2addr = get_exact_matches(deposit_df, withdraw_df, by_pool=args.by_pool)
  14. tx2block, tx2ts = get_transaction_info(withdraw_df, deposit_df)
  15. if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
  16. appendix: str = '_by_pool' if args.by_pool else ''
  17. # NOTE: we do not make a `db_file` here b/c we are guaranteed singleton clusters.
  18. clusters_file: str = os.path.join(
  19. args.save_dir, f'exact_match_clusters{appendix}.json')
  20. tx2addr_file: str = os.path.join(
  21. args.save_dir, f'exact_match_tx2addr{appendix}.json')
  22. tx2block_file: str = os.path.join(
  23. args.save_dir, f'exact_match_tx2block{appendix}.json')
  24. tx2ts_file: str = os.path.join(
  25. args.save_dir, f'exact_match_tx2ts{appendix}.json')
  26. to_json(clusters, clusters_file)
  27. to_json(tx2addr, tx2addr_file)
  28. to_json(tx2block, tx2block_file)
  29. to_json(tx2ts, tx2ts_file)
  30. def load_data(root) -> Tuple[pd.DataFrame, pd.DataFrame]:
  31. withdraw_df: pd.DataFrame = pd.read_csv(
  32. os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
  33. # Change recipient_address to lowercase.
  34. withdraw_df['recipient_address'] = withdraw_df['recipient_address'].str.lower()
  35. # Change block_timestamp field to be a timestamp object.
  36. withdraw_df['block_timestamp'] = withdraw_df['block_timestamp'].apply(pd.Timestamp)
  37. deposit_df: pd.DataFrame = pd.read_csv(
  38. os.path.join(root, 'lighter_complete_deposit_txs.csv'))
  39. # Change block_timestamp field to be a timestamp object.
  40. deposit_df['block_timestamp'] = deposit_df['block_timestamp'].apply(pd.Timestamp)
  41. return withdraw_df, deposit_df
  42. def get_transaction_info(
  43. withdraw_df: pd.DataFrame,
  44. deposit_df: pd.DataFrame
  45. ) -> Tuple[Dict[str, int], Dict[str, Any]]:
  46. hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
  47. block_numbers: pd.DataFrame = \
  48. pd.concat([withdraw_df.block_number, deposit_df.block_number])
  49. block_timestamps: pd.DataFrame = \
  50. pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
  51. tx2block = dict(zip(hashes, block_numbers))
  52. tx2ts = dict(zip(hashes, block_timestamps))
  53. return tx2block, tx2ts
  54. def get_exact_matches(
  55. deposit_df: pd.DataFrame,
  56. withdraw_df: pd.DataFrame,
  57. by_pool: bool = False,
  58. ) -> Tuple[List[Set[str]], Dict[str, str], Dict[str, Any]]:
  59. """
  60. Iterate over the withdraw transactions and apply heuristic one. For each
  61. withdraw with matching deposit transactions, a new element is added to
  62. the dictionary with the key as the withdraw hash and the values as all
  63. matching deposit hashes.
  64. It is possible for a transaction hash to appear more than once. As such,
  65. we compute weakly connected components to form clusters.
  66. """
  67. tx2addr: Dict[str, str] = {}
  68. graph: nx.DiGraph = nx.DiGraph()
  69. raw_links: Dict[str, str] = {}
  70. for withdraw_row in tqdm(withdraw_df.itertuples(), total=withdraw_df.shape[0]):
  71. heuristic_fn = \
  72. exact_match_heuristic_by_pool if by_pool else exact_match_heuristic
  73. results: Tuple[bool, List[pd.Series]] = \
  74. heuristic_fn(deposit_df, withdraw_row)
  75. if results[0]:
  76. deposit_rows: List[pd.Series] = results[1]
  77. for deposit_row in deposit_rows:
  78. raw_links[withdraw_row.hash] = deposit_row.hash
  79. graph.add_node(withdraw_row.hash)
  80. graph.add_node(deposit_row.hash)
  81. graph.add_edge(withdraw_row.hash, deposit_row.hash)
  82. # save transaction -> address map
  83. tx2addr[withdraw_row.hash] = withdraw_row.recipient_address
  84. tx2addr[deposit_row.hash] = deposit_row.from_address
  85. clusters: List[Set[str]] = [ # ignore singletons
  86. c for c in nx.weakly_connected_components(graph) if len(c) > 1]
  87. print(f'# links (graph): {len(clusters)}')
  88. print(f'# links (raw): {len(raw_links)}')
  89. return clusters, tx2addr
  90. def exact_match_heuristic(
  91. deposit_df: pd.DataFrame,
  92. withdraw_df: pd.DataFrame,
  93. ) -> Tuple[bool, Optional[List[pd.Series]]]:
  94. """
  95. matches: List[pd.Series] = []
  96. # iterate over each deposit transaction. When a matching deposit is found,
  97. # its hash is pushed to the list same_deposit_address_hashes.
  98. for deposit_row in deposit_df.itertuples():
  99. # check that addresses are the same and that the deposit
  100. # was done earlier than the withdraw.
  101. if ((deposit_row.from_address == withdraw_df.recipient_address) and
  102. (deposit_row.block_timestamp < withdraw_df.block_timestamp)):
  103. matches.append(deposit_row)
  104. """
  105. matches: pd.DataFrame = deposit_df[
  106. (deposit_df.from_address == withdraw_df.recipient_address) &
  107. (deposit_df.block_timestamp < withdraw_df.block_timestamp)
  108. ]
  109. matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
  110. if len(matches) > 0:
  111. return (True, matches)
  112. return (False, None)
  113. def exact_match_heuristic_by_pool(
  114. deposit_df: pd.DataFrame,
  115. withdraw_df: pd.DataFrame,
  116. ) -> Tuple[bool, Optional[List[pd.Series]]]:
  117. matches: pd.DataFrame = deposit_df[
  118. (deposit_df.from_address == withdraw_df.recipient_address) &
  119. (deposit_df.block_timestamp < withdraw_df.block_timestamp) &
  120. (deposit_df.tornado_cash_address == withdraw_df.tornado_cash_address)
  121. ]
  122. matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
  123. if len(matches) > 0:
  124. return (True, matches)
  125. return (False, None)
  126. if __name__ == "__main__":
  127. from argparse import ArgumentParser
  128. parser: ArgumentParser = ArgumentParser()
  129. parser.add_argument('data_dir', type=str, help='path to tornado cash data')
  130. parser.add_argument('save_dir', type=str, help='folder to save matches')
  131. parser.add_argument('--by-pool', action='store_true', default=False,
  132. help='prune by pool heuristic or not?')
  133. args: Any = parser.parse_args()
  134. main(args)