run_exact_match_heuristic.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. Lambda Class's Exact Match Heuristic.
  3. """
  4. import os
  5. import pandas as pd
  6. from pandas._config.config import describe_option
  7. from tqdm import tqdm
  8. import networkx as nx
  9. from typing import Any, Tuple, List, Set, Optional, Dict
  10. from src.utils.utils import to_json
  11. def main(args: Any):
  12. withdraw_df, deposit_df = load_data(args.data_dir)
  13. clusters, tx2addr = get_exact_matches(deposit_df, withdraw_df, by_pool=args.by_pool)
  14. tx2block, tx2ts = get_transaction_info(withdraw_df, deposit_df)
  15. if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
  16. appendix: str = '_by_pool' if args.by_pool else ''
  17. # NOTE: we do not make a `db_file` here b/c we are guaranteed singleton clusters.
  18. clusters_file: str = os.path.join(
  19. args.save_dir, f'exact_match_clusters{appendix}.json')
  20. tx2addr_file: str = os.path.join(
  21. args.save_dir, f'exact_match_tx2addr{appendix}.json')
  22. tx2block_file: str = os.path.join(
  23. args.save_dir, f'exact_match_tx2block{appendix}.json')
  24. tx2ts_file: str = os.path.join(
  25. args.save_dir, f'exact_match_tx2ts{appendix}.json')
  26. to_json(clusters, clusters_file)
  27. to_json(tx2addr, tx2addr_file)
  28. to_json(tx2block, tx2block_file)
  29. to_json(tx2ts, tx2ts_file)
  30. def load_data(root) -> Tuple[pd.DataFrame, pd.DataFrame]:
  31. withdraw_df: pd.DataFrame = pd.read_csv(
  32. os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
  33. # Change recipient_address to lowercase.
  34. withdraw_df['recipient_address'] = withdraw_df['recipient_address'].str.lower()
  35. # Change block_timestamp field to be a timestamp object.
  36. withdraw_df['block_timestamp'] = withdraw_df['block_timestamp'].apply(pd.Timestamp)
  37. deposit_df: pd.DataFrame = pd.read_csv(
  38. os.path.join(root, 'lighter_complete_deposit_txs.csv'))
  39. # Change block_timestamp field to be a timestamp object.
  40. deposit_df['block_timestamp'] = deposit_df['block_timestamp'].apply(pd.Timestamp)
  41. return withdraw_df, deposit_df
  42. def get_transaction_info(
  43. withdraw_df: pd.DataFrame,
  44. deposit_df: pd.DataFrame
  45. ) -> Tuple[Dict[str, int], Dict[str, Any]]:
  46. hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
  47. block_numbers: pd.DataFrame = \
  48. pd.concat([withdraw_df.block_number, deposit_df.block_number])
  49. block_timestamps: pd.DataFrame = \
  50. pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
  51. block_timestamps: pd.DataFrame = block_timestamps.apply(pd.Timestamp)
  52. block_timestamps: pd.Series = \
  53. block_timestamps.apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  54. tx2block = dict(zip(hashes, block_numbers))
  55. tx2ts = dict(zip(hashes, block_timestamps))
  56. return tx2block, tx2ts
  57. def get_exact_matches(
  58. deposit_df: pd.DataFrame,
  59. withdraw_df: pd.DataFrame,
  60. by_pool: bool = False,
  61. ) -> Tuple[List[Set[str]], Dict[str, str], Dict[str, Any]]:
  62. """
  63. Iterate over the withdraw transactions and apply heuristic one. For each
  64. withdraw with matching deposit transactions, a new element is added to
  65. the dictionary with the key as the withdraw hash and the values as all
  66. matching deposit hashes.
  67. It is possible for a transaction hash to appear more than once. As such,
  68. we compute weakly connected components to form clusters.
  69. """
  70. tx2addr: Dict[str, str] = {}
  71. graph: nx.DiGraph = nx.DiGraph()
  72. raw_links: Dict[str, str] = {}
  73. for withdraw_row in tqdm(withdraw_df.itertuples(), total=withdraw_df.shape[0]):
  74. heuristic_fn = \
  75. exact_match_heuristic_by_pool if by_pool else exact_match_heuristic
  76. results: Tuple[bool, List[pd.Series]] = \
  77. heuristic_fn(deposit_df, withdraw_row)
  78. if results[0]:
  79. deposit_rows: List[pd.Series] = results[1]
  80. for deposit_row in deposit_rows:
  81. raw_links[withdraw_row.hash] = deposit_row.hash
  82. graph.add_node(withdraw_row.hash)
  83. graph.add_node(deposit_row.hash)
  84. graph.add_edge(withdraw_row.hash, deposit_row.hash)
  85. # save transaction -> address map
  86. tx2addr[withdraw_row.hash] = withdraw_row.recipient_address
  87. tx2addr[deposit_row.hash] = deposit_row.from_address
  88. clusters: List[Set[str]] = [ # ignore singletons
  89. c for c in nx.weakly_connected_components(graph) if len(c) > 1]
  90. print(f'# links (graph): {len(clusters)}')
  91. print(f'# links (raw): {len(raw_links)}')
  92. return clusters, tx2addr
  93. def exact_match_heuristic(
  94. deposit_df: pd.DataFrame,
  95. withdraw_df: pd.DataFrame,
  96. ) -> Tuple[bool, Optional[List[pd.Series]]]:
  97. """
  98. matches: List[pd.Series] = []
  99. # iterate over each deposit transaction. When a matching deposit is found,
  100. # its hash is pushed to the list same_deposit_address_hashes.
  101. for deposit_row in deposit_df.itertuples():
  102. # check that addresses are the same and that the deposit
  103. # was done earlier than the withdraw.
  104. if ((deposit_row.from_address == withdraw_df.recipient_address) and
  105. (deposit_row.block_timestamp < withdraw_df.block_timestamp)):
  106. matches.append(deposit_row)
  107. """
  108. matches: pd.DataFrame = deposit_df[
  109. (deposit_df.from_address == withdraw_df.recipient_address) &
  110. (deposit_df.block_timestamp < withdraw_df.block_timestamp)
  111. ]
  112. matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
  113. if len(matches) > 0:
  114. return (True, matches)
  115. return (False, None)
  116. def exact_match_heuristic_by_pool(
  117. deposit_df: pd.DataFrame,
  118. withdraw_df: pd.DataFrame,
  119. ) -> Tuple[bool, Optional[List[pd.Series]]]:
  120. matches: pd.DataFrame = deposit_df[
  121. (deposit_df.from_address == withdraw_df.recipient_address) &
  122. (deposit_df.block_timestamp < withdraw_df.block_timestamp) &
  123. (deposit_df.tornado_cash_address == withdraw_df.tornado_cash_address)
  124. ]
  125. matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
  126. if len(matches) > 0:
  127. return (True, matches)
  128. return (False, None)
  129. if __name__ == "__main__":
  130. from argparse import ArgumentParser
  131. parser: ArgumentParser = ArgumentParser()
  132. parser.add_argument('data_dir', type=str, help='path to tornado cash data')
  133. parser.add_argument('save_dir', type=str, help='folder to save matches')
  134. parser.add_argument('--by-pool', action='store_true', default=False,
  135. help='prune by pool heuristic or not?')
  136. args: Any = parser.parse_args()
  137. main(args)