run_gas_price_heuristic.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. Lambda Class's Same Gas Price Heuristic.
  3. """
  4. import os, json
  5. from tqdm import tqdm
  6. import pandas as pd
  7. import networkx as nx
  8. from typing import Any, Tuple, Optional, Dict, List, Set, Any
  9. from src.utils.utils import to_json, Entity, Heuristic
  10. def main(args: Any):
  11. withdraw_df, deposit_df = load_data(args.data_dir)
  12. clusters, tx2addr = \
  13. get_same_gas_price_clusters(deposit_df, withdraw_df, by_pool=args.by_pool)
  14. tx2block, tx2ts = get_transaction_info(withdraw_df, deposit_df)
  15. address_sets: List[Set[str]] = get_address_sets(clusters, tx2addr)
  16. address_metadata: List[Dict[str, Any]] = get_metadata(address_sets)
  17. if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
  18. appendix: str = '_by_pool' if args.by_pool else ''
  19. clusters_file: str = os.path.join(
  20. args.save_dir, f'gas_price_clusters{appendix}.json')
  21. tx2addr_file: str = os.path.join(
  22. args.save_dir, f'gas_price_tx2addr{appendix}.json')
  23. tx2block_file: str = os.path.join(
  24. args.save_dir, f'gas_price_tx2block{appendix}.json')
  25. tx2ts_file: str = os.path.join(
  26. args.save_dir, f'gas_price_tx2ts{appendix}.json')
  27. address_file: str = os.path.join(
  28. args.save_dir, f'gas_price_address_set{appendix}.json')
  29. metadata_file: str = os.path.join(
  30. args.save_dir, f'gas_price_metadata{appendix}.csv')
  31. to_json(clusters, clusters_file)
  32. to_json(tx2addr, tx2addr_file)
  33. to_json(tx2block, tx2block_file)
  34. to_json(tx2ts, tx2ts_file)
  35. to_json(address_sets, address_file)
  36. address_metadata.to_csv(metadata_file, index=False)
  37. def load_data(root) -> Tuple[pd.DataFrame, pd.DataFrame]:
  38. withdraw_df: pd.DataFrame = pd.read_csv(
  39. os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
  40. # Change recipient_address to lowercase.
  41. withdraw_df['recipient_address'] = withdraw_df['recipient_address'].str.lower()
  42. # Change block_timestamp field to be a timestamp object.
  43. withdraw_df['block_timestamp'] = withdraw_df['block_timestamp'].apply(pd.Timestamp)
  44. # Remove withdrawals from relayer services. Assume when recipient address is not the
  45. # from_address, then this is using a relayer.
  46. withdraw_df = withdraw_df[withdraw_df['from_address'] == withdraw_df['recipient_address']]
  47. deposit_df: pd.DataFrame = pd.read_csv(
  48. os.path.join(root, 'lighter_complete_deposit_txs.csv'))
  49. # Change block_timestamp field to be a timestamp object.
  50. deposit_df['block_timestamp'] = deposit_df['block_timestamp'].apply(pd.Timestamp)
  51. return withdraw_df, deposit_df
  52. def get_transaction_info(
  53. withdraw_df: pd.DataFrame,
  54. deposit_df: pd.DataFrame
  55. ) -> Tuple[Dict[str, int], Dict[str, Any]]:
  56. hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
  57. block_numbers: pd.DataFrame = \
  58. pd.concat([withdraw_df.block_number, deposit_df.block_number])
  59. block_timestamps: pd.DataFrame = \
  60. pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
  61. block_timestamps: pd.Series = \
  62. block_timestamps.apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  63. tx2block = dict(zip(hashes, block_numbers))
  64. tx2ts = dict(zip(hashes, block_timestamps))
  65. return tx2block, tx2ts
  66. def get_same_gas_price_clusters(
  67. deposit_df: pd.DataFrame,
  68. withdraw_df: pd.DataFrame,
  69. by_pool: bool = False,
  70. ) -> Tuple[List[Set[str]], Dict[str, str]]:
  71. # get deposit transactions with unique gas prices
  72. filter_fn = filter_by_unique_gas_price_by_pool if by_pool else filter_by_unique_gas_price
  73. unique_gas_deposit_df: pd.DataFrame = filter_fn(deposit_df)
  74. # initialize an empty dictionary to store the linked transactions.
  75. tx2addr: Dict[str, str] = {}
  76. graph: nx.DiGraph = nx.DiGraph()
  77. raw_links: Dict[str, str] = {} # store non-graph version
  78. all_withdraws: List[str] = []
  79. all_deposits: List[str] = []
  80. # Iterate over the withdraw transactions.
  81. pbar = tqdm(total=len(withdraw_df))
  82. for _, withdraw_row in withdraw_df.iterrows():
  83. # apply heuristic for the given withdraw transaction.
  84. heuristic_fn = same_gas_price_heuristic_by_pool \
  85. if by_pool else same_gas_price_heuristic
  86. results: Tuple[bool, pd.Series] = \
  87. heuristic_fn(withdraw_row, unique_gas_deposit_df)
  88. # when a deposit transaction matching the withdraw transaction gas price is found, add
  89. # the linked transactions to the dictionary.
  90. if results[0]:
  91. deposit_row: pd.Series = results[1]
  92. raw_links[withdraw_row.hash] = deposit_row.hash
  93. graph.add_node(withdraw_row.hash)
  94. graph.add_node(deposit_row.hash)
  95. graph.add_edge(withdraw_row.hash, deposit_row.hash)
  96. tx2addr[withdraw_row.hash] = withdraw_row.recipient_address
  97. tx2addr[deposit_row.hash] = deposit_row.from_address
  98. all_withdraws.append(withdraw_row.hash)
  99. all_deposits.append(deposit_row.hash)
  100. pbar.update()
  101. pbar.close()
  102. clusters: List[Set[str]] = [ # ignore singletons
  103. c for c in nx.weakly_connected_components(graph) if len(c) > 1]
  104. print(f'# links (graph): {len(clusters)}')
  105. print(f'# links (raw): {len(raw_links)}')
  106. return clusters, tx2addr
  107. def get_address_sets(
  108. tx_clusters: List[Set[str]],
  109. tx2addr: Dict[str, str],
  110. ) -> List[Set[str]]:
  111. """
  112. Stores pairs of addresses that are related to each other. Don't
  113. apply graphs on this because we are going to join this into the
  114. other clusters.
  115. """
  116. address_sets: List[Set[str]] = []
  117. for cluster in tx_clusters:
  118. addr_set: Set[str] = set([tx2addr[tx] for tx in cluster])
  119. addr_set: List[str] = list(addr_set)
  120. if len(addr_set) > 1: # make sure not singleton
  121. for addr1 in addr_set:
  122. for addr2 in addr_set:
  123. if addr1 != addr2:
  124. address_sets.append({addr1, addr2})
  125. return address_sets
  126. def get_metadata(address_sets: List[Set[str]]) -> pd.DataFrame:
  127. """
  128. Stores metadata about addresses to add to db.
  129. """
  130. unique_addresses: Set[str] = set().union(*address_sets)
  131. address: List[str] = []
  132. entity: List[int] = []
  133. conf: List[float] = []
  134. meta_data: List[str] = []
  135. heuristic: List[int] = []
  136. for member in unique_addresses:
  137. address.append(member)
  138. entity.append(Entity.EOA.value)
  139. conf.append(1)
  140. meta_data.append(json.dumps({}))
  141. heuristic.append(Heuristic.GAS_PRICE.value)
  142. response: Dict[str, List[Any]] = dict(
  143. address = address,
  144. entity = entity,
  145. conf = conf,
  146. meta_data = meta_data,
  147. heuristic = heuristic,
  148. )
  149. response: pd.DataFrame = pd.DataFrame.from_dict(response)
  150. return response
  151. def filter_by_unique_gas_price(transactions_df: pd.DataFrame) -> pd.DataFrame:
  152. # count the appearances of each gas price in the transactions df
  153. gas_prices_count: pd.DataFrame = transactions_df['gas_price'].value_counts()
  154. # filter the gas prices that are unique, i.e., the ones with a count equal to 1
  155. unique_gas_prices: pd.DataFrame = gas_prices_count[gas_prices_count == 1].keys()
  156. return transactions_df[transactions_df['gas_price'].isin(unique_gas_prices)]
  157. def filter_by_unique_gas_price_by_pool(transactions_df: pd.DataFrame) -> pd.DataFrame:
  158. """
  159. Unlike the non-pool version, we check for unique gas price BY POOL (this
  160. is a weaker constraint).
  161. """
  162. gas_prices_count: pd.DataFrame = transactions_df[['gas_price', 'tornado_cash_address']].value_counts()
  163. unique_gas_prices: pd.DataFrame = pd.DataFrame(gas_prices_count[gas_prices_count == 1])
  164. # tuple set with the values (gas_price, tornado_cash_address) is made to filter efficiently
  165. tuple_set: Set[Any] = set([(row.Index[0], row.Index[1]) for row in unique_gas_prices.itertuples()])
  166. output_df: pd.DataFrame = pd.DataFrame(
  167. filter(lambda iter_tuple: \
  168. (iter_tuple.gas_price, iter_tuple.tornado_cash_address)
  169. in tuple_set, transactions_df.itertuples()))
  170. return output_df
  171. def same_gas_price_heuristic(
  172. withdraw_df: pd.DataFrame,
  173. unique_gas_deposit_df: pd.DataFrame,
  174. ) -> Tuple[bool, Optional[pd.Series]]:
  175. """
  176. # iterate over each deposit transaction of unique_gas_deposit_df
  177. for deposit_row in unique_gas_deposit_df.itertuples():
  178. if ((withdraw_df.gas_price == deposit_row.gas_price) and
  179. (withdraw_df.block_timestamp > deposit_row.block_timestamp)):
  180. return (True, deposit_row)
  181. """
  182. searches: pd.DataFrame = unique_gas_deposit_df[
  183. (unique_gas_deposit_df.gas_price == withdraw_df.gas_price) &
  184. (unique_gas_deposit_df.block_timestamp < withdraw_df.block_timestamp)
  185. ]
  186. if len(searches) > 0:
  187. return (True, searches.iloc[0])
  188. return (False, None)
  189. def same_gas_price_heuristic_by_pool(
  190. withdraw_df: pd.DataFrame,
  191. unique_gas_deposit_df: pd.DataFrame,
  192. ) -> Tuple[bool, Optional[str]]:
  193. """
  194. This heuristic groups together transactions by pool. It is strictly
  195. a subset of the function `same_gas_price_heuristic`.
  196. """
  197. """
  198. for deposit_row in unique_gas_deposit_df.itertuples():
  199. # When a deposit transaction with the same gas price as the withdrawal transaction is found, and
  200. # it also satisfies having an earlier timestamp than it, the tuple (True, deposit_hash) is returned.
  201. if ((withdraw_df.gas_price == deposit_row.gas_price) and
  202. (withdraw_df.block_timestamp > deposit_row.block_timestamp) and
  203. (withdraw_df.tornado_cash_address == deposit_row.tornado_cash_address)):
  204. return (True, deposit_row.hash)
  205. """
  206. searches: pd.DataFrame = unique_gas_deposit_df[
  207. (unique_gas_deposit_df.gas_price == withdraw_df.gas_price) &
  208. (unique_gas_deposit_df.block_timestamp < withdraw_df.block_timestamp) &
  209. (unique_gas_deposit_df.tornado_cash_address == withdraw_df.tornado_cash_address)
  210. ]
  211. if len(searches) > 0:
  212. return (True, searches.iloc[0])
  213. return (False, None)
  214. if __name__ == "__main__":
  215. from argparse import ArgumentParser
  216. parser: ArgumentParser = ArgumentParser()
  217. parser.add_argument('data_dir', type=str, help='path to tornado cash data')
  218. parser.add_argument('save_dir', type=str, help='folder to save clusters')
  219. parser.add_argument('--by-pool', action='store_true', default=False,
  220. help='prune by pool heuristic or not?')
  221. args: Any = parser.parse_args()
  222. main(args)