compress_graph.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. Rather than storing addresses in DataFrames, store integers (much smaller).
  3. Then we can we create a map to do the replacement later.
  4. """
  5. import numpy as np
  6. import pandas as pd
  7. from typing import Any, Iterable, Dict, Set, List
  8. from src.utils.utils import to_json
  9. def yield_transactions(
  10. transactions_csv: str, chunk_size: int = 10000) -> Iterable[pd.DataFrame]:
  11. """
  12. Load a segment at a time (otherwise too large).
  13. """
  14. for chunk in pd.read_csv(transactions_csv, chunksize = chunk_size):
  15. yield chunk
  16. def make_address_map(
  17. address_map: Dict[str, int],
  18. chunk: pd.DataFrame,
  19. min_index: int,
  20. ) -> Dict[str, Any]:
  21. from_set: Set[str] = set(chunk.from_address.to_numpy())
  22. to_set: Set[str] = set(chunk.to_address.to_numpy())
  23. # build mapping from unique addresses
  24. addresses: Set[str] = from_set.union(to_set)
  25. bank: Set[str] = set(list(address_map.keys()))
  26. # remove existing addresses
  27. addresses: Set[str] = addresses - bank
  28. addresses: List[str] = sorted(list(addresses))
  29. indices: List[int] = list(range(len(addresses)))
  30. indices: List[int] = [x + min_index for x in indices]
  31. return dict(zip(addresses, indices))
  32. def make_graph_dataframe(
  33. transactions_csv: str,
  34. out_csv: str,
  35. addr_json: str,
  36. chunk_size: int = 10000,
  37. ) -> pd.DataFrame:
  38. count: int = 0
  39. address_map: Dict[str, int] = {}
  40. print('processing txs', end = '', flush=True)
  41. for chunk in yield_transactions(transactions_csv, chunk_size):
  42. cur_map: Dict[str, int] = \
  43. make_address_map(address_map, chunk, len(address_map))
  44. address_map.update(cur_map)
  45. chunk.from_address = chunk.from_address.apply(lambda x: address_map[x])
  46. chunk.to_address = chunk.to_address.apply(lambda x: address_map[x])
  47. if count == 0:
  48. chunk.to_csv(out_csv, index=False)
  49. else:
  50. chunk.to_csv(out_csv, mode='a', header=False, index=False)
  51. del chunk # wipe memory
  52. print('.', end = '', flush=True)
  53. count += 1
  54. to_json(address_map, addr_json)
  55. def main(args: Any):
  56. make_graph_dataframe(
  57. args.transactions_csv, args.save_csv, args.addr_json, args.chunk_size)
  58. if __name__ == "__main__":
  59. from argparse import ArgumentParser
  60. parser: ArgumentParser = ArgumentParser()
  61. parser.add_argument('transactions_csv', type=str, help='path to transaction data')
  62. parser.add_argument('save_csv', type=str, help='path to save data')
  63. parser.add_argument('addr_json', type=str, help='path to save addr map')
  64. parser.add_argument('--chunk-size', type=int, default=1000000,
  65. help='Chunk size (default: 1000000)')
  66. args: Any = parser.parse_args()
  67. main(args)