123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- """
- Lambda Class's Exact Match Heuristic.
- """
- import os
- import pandas as pd
- from pandas._config.config import describe_option
- from tqdm import tqdm
- import networkx as nx
- from typing import Any, Tuple, List, Set, Optional, Dict
- from src.utils.utils import to_json
- def main(args: Any):
- withdraw_df, deposit_df = load_data(args.data_dir)
- clusters, tx2addr = get_exact_matches(deposit_df, withdraw_df, by_pool=args.by_pool)
- tx2block, tx2ts = get_transaction_info(withdraw_df, deposit_df)
- if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir)
- appendix: str = '_by_pool' if args.by_pool else ''
- # NOTE: we do not make a `db_file` here b/c we are guaranteed singleton clusters.
- clusters_file: str = os.path.join(
- args.save_dir, f'exact_match_clusters{appendix}.json')
- tx2addr_file: str = os.path.join(
- args.save_dir, f'exact_match_tx2addr{appendix}.json')
- tx2block_file: str = os.path.join(
- args.save_dir, f'exact_match_tx2block{appendix}.json')
- tx2ts_file: str = os.path.join(
- args.save_dir, f'exact_match_tx2ts{appendix}.json')
- to_json(clusters, clusters_file)
- to_json(tx2addr, tx2addr_file)
- to_json(tx2block, tx2block_file)
- to_json(tx2ts, tx2ts_file)
- def load_data(root) -> Tuple[pd.DataFrame, pd.DataFrame]:
- withdraw_df: pd.DataFrame = pd.read_csv(
- os.path.join(root, 'lighter_complete_withdraw_txs.csv'))
- # Change recipient_address to lowercase.
- withdraw_df['recipient_address'] = withdraw_df['recipient_address'].str.lower()
-
- # Change block_timestamp field to be a timestamp object.
- withdraw_df['block_timestamp'] = withdraw_df['block_timestamp'].apply(pd.Timestamp)
- deposit_df: pd.DataFrame = pd.read_csv(
- os.path.join(root, 'lighter_complete_deposit_txs.csv'))
-
- # Change block_timestamp field to be a timestamp object.
- deposit_df['block_timestamp'] = deposit_df['block_timestamp'].apply(pd.Timestamp)
- return withdraw_df, deposit_df
- def get_transaction_info(
- withdraw_df: pd.DataFrame,
- deposit_df: pd.DataFrame
- ) -> Tuple[Dict[str, int], Dict[str, Any]]:
- hashes: pd.DataFrame = pd.concat([withdraw_df.hash, deposit_df.hash])
- block_numbers: pd.DataFrame = \
- pd.concat([withdraw_df.block_number, deposit_df.block_number])
- block_timestamps: pd.DataFrame = \
- pd.concat([withdraw_df.block_timestamp, deposit_df.block_timestamp])
- block_timestamps: pd.DataFrame = block_timestamps.apply(pd.Timestamp)
- block_timestamps: pd.Series = \
- block_timestamps.apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
- tx2block = dict(zip(hashes, block_numbers))
- tx2ts = dict(zip(hashes, block_timestamps))
- return tx2block, tx2ts
- def get_exact_matches(
- deposit_df: pd.DataFrame,
- withdraw_df: pd.DataFrame,
- by_pool: bool = False,
- ) -> Tuple[List[Set[str]], Dict[str, str], Dict[str, Any]]:
- """
- Iterate over the withdraw transactions and apply heuristic one. For each
- withdraw with matching deposit transactions, a new element is added to
- the dictionary with the key as the withdraw hash and the values as all
- matching deposit hashes.
- It is possible for a transaction hash to appear more than once. As such,
- we compute weakly connected components to form clusters.
- """
- tx2addr: Dict[str, str] = {}
- graph: nx.DiGraph = nx.DiGraph()
- raw_links: Dict[str, str] = {}
- for withdraw_row in tqdm(withdraw_df.itertuples(), total=withdraw_df.shape[0]):
- heuristic_fn = \
- exact_match_heuristic_by_pool if by_pool else exact_match_heuristic
- results: Tuple[bool, List[pd.Series]] = \
- heuristic_fn(deposit_df, withdraw_row)
- if results[0]:
- deposit_rows: List[pd.Series] = results[1]
- for deposit_row in deposit_rows:
- raw_links[withdraw_row.hash] = deposit_row.hash
- graph.add_node(withdraw_row.hash)
- graph.add_node(deposit_row.hash)
- graph.add_edge(withdraw_row.hash, deposit_row.hash)
- # save transaction -> address map
- tx2addr[withdraw_row.hash] = withdraw_row.recipient_address
- tx2addr[deposit_row.hash] = deposit_row.from_address
- clusters: List[Set[str]] = [ # ignore singletons
- c for c in nx.weakly_connected_components(graph) if len(c) > 1]
- print(f'# links (graph): {len(clusters)}')
- print(f'# links (raw): {len(raw_links)}')
- return clusters, tx2addr
- def exact_match_heuristic(
- deposit_df: pd.DataFrame,
- withdraw_df: pd.DataFrame,
- ) -> Tuple[bool, Optional[List[pd.Series]]]:
- """
- matches: List[pd.Series] = []
- # iterate over each deposit transaction. When a matching deposit is found,
- # its hash is pushed to the list same_deposit_address_hashes.
- for deposit_row in deposit_df.itertuples():
- # check that addresses are the same and that the deposit
- # was done earlier than the withdraw.
- if ((deposit_row.from_address == withdraw_df.recipient_address) and
- (deposit_row.block_timestamp < withdraw_df.block_timestamp)):
- matches.append(deposit_row)
- """
- matches: pd.DataFrame = deposit_df[
- (deposit_df.from_address == withdraw_df.recipient_address) &
- (deposit_df.block_timestamp < withdraw_df.block_timestamp)
- ]
- matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
- if len(matches) > 0:
- return (True, matches)
- return (False, None)
- def exact_match_heuristic_by_pool(
- deposit_df: pd.DataFrame,
- withdraw_df: pd.DataFrame,
- ) -> Tuple[bool, Optional[List[pd.Series]]]:
- matches: pd.DataFrame = deposit_df[
- (deposit_df.from_address == withdraw_df.recipient_address) &
- (deposit_df.block_timestamp < withdraw_df.block_timestamp) &
- (deposit_df.tornado_cash_address == withdraw_df.tornado_cash_address)
- ]
- matches: List[pd.Series] = [matches.iloc[i] for i in range(len(matches))]
- if len(matches) > 0:
- return (True, matches)
- return (False, None)
- if __name__ == "__main__":
- from argparse import ArgumentParser
- parser: ArgumentParser = ArgumentParser()
- parser.add_argument('data_dir', type=str, help='path to tornado cash data')
- parser.add_argument('save_dir', type=str, help='folder to save matches')
- parser.add_argument('--by-pool', action='store_true', default=False,
- help='prune by pool heuristic or not?')
- args: Any = parser.parse_args()
- main(args)
|