sort_big_csv.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import os
  2. import sys
  3. import csv
  4. import copy
  5. import heapq
  6. import tempfile
  7. import pandas as pd
  8. from glob import glob
  9. from tqdm import tqdm
  10. from typing import Any, List, final
  11. csv.field_size_limit(2**30)
  12. def main(args: Any):
  13. filenames = glob(os.path.join(args.csv_dir, '*.csv'))
  14. if not args.merge_only:
  15. sort_dir: str = os.path.join(args.csv_dir, 'sorted')
  16. if not os.path.isdir(sort_dir): os.makedirs(sort_dir)
  17. # sort each filename independently
  18. pbar = tqdm(total=len(filenames))
  19. for filename in filenames:
  20. basename: str = os.path.basename(filename)
  21. outname: str = os.path.join(sort_dir, basename)
  22. memorysort(filename, outname, colname=args.sort_column)
  23. pbar.update()
  24. pbar.close()
  25. if args.sort_only:
  26. sys.exit(0)
  27. # get header
  28. header: List[str] = get_header(filenames[0])
  29. merge_idx: int = header.index(args.sort_column)
  30. # merge sorted files together slowly
  31. print('running merge sort...')
  32. temp_filename: str = mergesort(filenames, nway=2, merge_idx=merge_idx)
  33. processed_dir: str = os.path.join(args.csv_dir, 'processed')
  34. if not os.path.isdir(processed_dir): os.makedirs(processed_dir)
  35. out_filename: str = os.path.join(processed_dir, args.out_filename)
  36. count: int = 0
  37. with open(out_filename, 'w', newline='') as fp:
  38. writer: csv.writer = csv.writer(
  39. fp, delimiter=',', quoting=csv.QUOTE_MINIMAL)
  40. writer.writerow(header)
  41. with open(temp_filename, newline='') as sfp:
  42. for row in csv.reader(sfp):
  43. writer.writerow(row)
  44. if count % 1000000 == 0:
  45. print(f'Written {count} rows.')
  46. count += 1
  47. print('removing temp file...')
  48. os.remove(temp_filename)
  49. def memorysort(
  50. filename: str,
  51. outname: str,
  52. colname: str = 'block_number',
  53. ):
  54. """Sort this CSV file in memory on the given columns"""
  55. df: pd.DataFrame = pd.read_csv(filename)
  56. df: pd.DataFrame = df.sort_values(colname)
  57. df.to_csv(outname, index=False) # overwriting
  58. del df # wipe from memory
  59. def mergesort(sorted_filenames, nway=2, merge_idx=5):
  60. """Merge two sorted CSV files into a single output."""
  61. orig_filenames: List[str] = copy.deepcopy(sorted_filenames)
  62. merge_n: int = 0
  63. while len(sorted_filenames) > 1:
  64. # merge_filenames = current files to sort
  65. # sorted_filenames = remaining files to sort
  66. merge_filenames, sorted_filenames = \
  67. sorted_filenames[:nway], sorted_filenames[nway:]
  68. num_remaining = len(sorted_filenames)
  69. num_total = len(merge_filenames) + len(sorted_filenames)
  70. if merge_n % 10 == 0:
  71. print(f'{merge_n} merged | {num_remaining} remaining | {num_total} total')
  72. with tempfile.NamedTemporaryFile(delete=False, mode='w') as fp:
  73. writer: csv.writer = csv.writer(fp)
  74. merge_n += 1 # increment
  75. iterators: List[Any]= [
  76. make_iterator(filename) for filename in merge_filenames
  77. ]
  78. # `block_number` is the 4th column
  79. for row in heapq.merge(*iterators, key=lambda x: int(x[merge_idx])):
  80. writer.writerow(row)
  81. sorted_filenames.append(fp.name)
  82. # these are files to get rid of (don't remove original files)
  83. extra_filenames: List[str] = list(
  84. set(merge_filenames) - set(orig_filenames))
  85. for filename in extra_filenames:
  86. os.remove(filename)
  87. final_filename: str = sorted_filenames[0]
  88. return final_filename
  89. def make_iterator(filename):
  90. with open(filename, newline='') as fp:
  91. count = 0
  92. for row in csv.reader(fp):
  93. if count == 0:
  94. count = 1 # just make it not 0
  95. continue # skip header
  96. yield row
  97. def get_header(filename) -> List[str]:
  98. with open(filename, newline='') as fp:
  99. reader = csv.reader(fp, delimiter=',')
  100. header: List[str] = next(reader)
  101. return header
  102. if __name__ == "__main__":
  103. from argparse import ArgumentParser
  104. parser: ArgumentParser = ArgumentParser()
  105. parser.add_argument(
  106. 'csv_dir',
  107. type=str,
  108. help='path to directory of csvs',
  109. )
  110. parser.add_argument(
  111. '--merge-only',
  112. action='store_true',
  113. default=False,
  114. help='assume csv_dir contains sorted files'
  115. )
  116. parser.add_argument(
  117. '--sort-only',
  118. action='store_true',
  119. default=False,
  120. help='sort files only'
  121. )
  122. parser.add_argument(
  123. '--sort-column',
  124. type=str,
  125. default='block_number',
  126. )
  127. parser.add_argument(
  128. '--out-filename',
  129. type=str,
  130. default='transactions-sorted.csv',
  131. )
  132. args: Any = parser.parse_args()
  133. main(args)