sort_big_csv.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import os
  2. import sys
  3. import csv
  4. import copy
  5. import heapq
  6. import tempfile
  7. import pandas as pd
  8. from glob import glob
  9. from tqdm import tqdm
  10. from typing import Any, List, final
  11. csv.field_size_limit(2**30)
  12. def main(args: Any):
  13. filenames = glob(os.path.join(args.csv_dir, '*.csv'))
  14. if not args.merge_only:
  15. sort_dir: str = os.path.join(args.csv_dir, 'sorted')
  16. if not os.path.isdir(sort_dir): os.makedirs(sort_dir)
  17. # sort each filename independently
  18. pbar = tqdm(total=len(filenames))
  19. for filename in filenames:
  20. basename: str = os.path.basename(filename)
  21. outname: str = os.path.join(sort_dir, basename)
  22. memorysort(filename, outname, colname=args.sort_column)
  23. pbar.update()
  24. pbar.close()
  25. if args.sort_only:
  26. sys.exit(0)
  27. # get header
  28. header: List[str] = get_header(filenames[0])
  29. merge_idx: int = header.index(args.sort_column)
  30. # merge sorted files together slowly
  31. print('running merge sort...')
  32. temp_filename: str = mergesort(filenames, nway=2, merge_idx=merge_idx)
  33. processed_dir: str = os.path.join(args.csv_dir, 'processed')
  34. if not os.path.isdir(processed_dir): os.makedirs(processed_dir)
  35. out_filename: str = os.path.join(processed_dir, args.out_filename)
  36. count: int = 0
  37. with open(out_filename, 'w', newline='') as fp:
  38. writer: csv.writer = csv.writer(
  39. fp, delimiter=',', quoting=csv.QUOTE_MINIMAL)
  40. writer.writerow(header)
  41. with open(temp_filename, newline='') as sfp:
  42. for row in csv.reader(sfp):
  43. writer.writerow(row)
  44. if count % 1000000 == 0:
  45. print(f'Written {count} rows.')
  46. count += 1
  47. print('removing temp file...')
  48. os.remove(temp_filename)
  49. def memorysort(
  50. filename: str,
  51. outname: str,
  52. colname: str = 'block_number',
  53. ):
  54. """Sort this CSV file in memory on the given columns"""
  55. df: pd.DataFrame = pd.read_csv(filename)
  56. df: pd.DataFrame = df.sort_values(colname)
  57. df.to_csv(outname, index=False) # overwriting
  58. del df # wipe from memory
  59. def mergesort(sorted_filenames, nway=2, merge_idx=5):
  60. """Merge two sorted CSV files into a single output."""
  61. orig_filenames: List[str] = copy.deepcopy(sorted_filenames)
  62. merge_n: int = 0
  63. while len(sorted_filenames) > 1:
  64. # merge_filenames = current files to sort
  65. # sorted_filenames = remaining files to sort
  66. merge_filenames, sorted_filenames = \
  67. sorted_filenames[:nway], sorted_filenames[nway:]
  68. num_remaining = len(sorted_filenames)
  69. num_total = len(merge_filenames) + len(sorted_filenames)
  70. if merge_n % 10 == 0:
  71. print(f'{merge_n} merged | {num_remaining} remaining | {num_total} total')
  72. with tempfile.NamedTemporaryFile(delete=False, mode='w') as fp:
  73. writer: csv.writer = csv.writer(fp)
  74. merge_n += 1 # increment
  75. iterators: List[Any]= [
  76. make_iterator(filename) for filename in merge_filenames
  77. ]
  78. # `block_number` is the 4th column
  79. for row in heapq.merge(*iterators, key=lambda x: int(x[merge_idx])):
  80. writer.writerow(row)
  81. sorted_filenames.append(fp.name)
  82. # these are files to get rid of (don't remove original files)
  83. extra_filenames: List[str] = list(
  84. set(merge_filenames) - set(orig_filenames))
  85. for filename in extra_filenames:
  86. os.remove(filename)
  87. final_filename: str = sorted_filenames[0]
  88. return final_filename
  89. def make_iterator(filename):
  90. with open(filename, newline='') as fp:
  91. count = 0
  92. for row in csv.reader(fp):
  93. if count == 0:
  94. count = 1 # just make it not 0
  95. continue # skip header
  96. yield row
  97. def get_header(filename) -> List[str]:
  98. with open(filename, newline='') as fp:
  99. reader = csv.reader(fp, delimiter=',')
  100. header: List[str] = next(reader)
  101. return header
  102. import tempfile
  103. import pandas as pd
  104. from glob import glob
  105. from tqdm import tqdm
  106. from typing import Any, List, final
  107. csv.field_size_limit(2**30)
  108. def main(args: Any):
  109. filenames = glob(os.path.join(args.csv_dir, '*.csv'))
  110. if not args.merge_only:
  111. sort_dir: str = os.path.join(args.csv_dir, 'sorted')
  112. if not os.path.isdir(sort_dir): os.makedirs(sort_dir)
  113. # sort each filename independently
  114. pbar = tqdm(total=len(filenames))
  115. for filename in filenames:
  116. basename: str = os.path.basename(filename)
  117. outname: str = os.path.join(sort_dir, basename)
  118. memorysort(filename, outname, colname=args.sort_column)
  119. pbar.update()
  120. pbar.close()
  121. if args.sort_only:
  122. sys.exit(0)
  123. # get header
  124. header: List[str] = get_header(filenames[0])
  125. merge_idx: int = header.index(args.sort_column)
  126. # merge sorted files together slowly
  127. print('running merge sort...')
  128. temp_filename: str = mergesort(filenames, nway=2, merge_idx=merge_idx)
  129. processed_dir: str = os.path.join(args.csv_dir, 'processed')
  130. if not os.path.isdir(processed_dir): os.makedirs(processed_dir)
  131. out_filename: str = os.path.join(processed_dir, args.out_filename)
  132. count: int = 0
  133. with open(out_filename, 'w', newline='') as fp:
  134. writer: csv.writer = csv.writer(
  135. fp, delimiter=',', quoting=csv.QUOTE_MINIMAL)
  136. writer.writerow(header)
  137. with open(temp_filename, newline='') as sfp:
  138. for row in csv.reader(sfp):
  139. writer.writerow(row)
  140. if count % 1000000 == 0:
  141. print(f'Written {count} rows.')
  142. count += 1
  143. print('removing temp file...')
  144. os.remove(temp_filename)
  145. def memorysort(
  146. filename: str,
  147. outname: str,
  148. colname: str = 'block_number',
  149. ):
  150. """Sort this CSV file in memory on the given columns"""
  151. df: pd.DataFrame = pd.read_csv(filename)
  152. df: pd.DataFrame = df.sort_values(colname)
  153. df.to_csv(outname, index=False) # overwriting
  154. del df # wipe from memory
  155. def mergesort(sorted_filenames, nway=2, merge_idx=5):
  156. """Merge two sorted CSV files into a single output."""
  157. orig_filenames: List[str] = copy.deepcopy(sorted_filenames)
  158. merge_n: int = 0
  159. while len(sorted_filenames) > 1:
  160. # merge_filenames = current files to sort
  161. # sorted_filenames = remaining files to sort
  162. merge_filenames, sorted_filenames = \
  163. sorted_filenames[:nway], sorted_filenames[nway:]
  164. num_remaining = len(sorted_filenames)
  165. num_total = len(merge_filenames) + len(sorted_filenames)
  166. if merge_n % 10 == 0:
  167. print(f'{merge_n} merged | {num_remaining} remaining | {num_total} total')
  168. with tempfile.NamedTemporaryFile(delete=False, mode='w') as fp:
  169. writer: csv.writer = csv.writer(fp)
  170. merge_n += 1 # increment
  171. iterators: List[Any]= [
  172. make_iterator(filename) for filename in merge_filenames
  173. ]
  174. # `block_number` is the 4th column
  175. for row in heapq.merge(*iterators, key=lambda x: int(x[merge_idx])):
  176. writer.writerow(row)
  177. sorted_filenames.append(fp.name)
  178. # these are files to get rid of (don't remove original files)
  179. extra_filenames: List[str] = list(
  180. set(merge_filenames) - set(orig_filenames))
  181. for filename in extra_filenames:
  182. os.remove(filename)
  183. final_filename: str = sorted_filenames[0]
  184. return final_filename
  185. def make_iterator(filename):
  186. with open(filename, newline='') as fp:
  187. count = 0
  188. for row in csv.reader(fp):
  189. if count == 0:
  190. count = 1 # just make it not 0
  191. continue # skip header
  192. yield row
  193. def get_header(filename) -> List[str]:
  194. with open(filename, newline='') as fp:
  195. reader = csv.reader(fp, delimiter=',')
  196. header: List[str] = next(reader)
  197. return header
  198. if __name__ == "__main__":
  199. from argparse import ArgumentParser
  200. parser: ArgumentParser = ArgumentParser()
  201. parser.add_argument(
  202. 'csv_dir',
  203. type=str,
  204. help='path to directory of csvs',
  205. )
  206. parser.add_argument(
  207. '--merge-only',
  208. action='store_true',
  209. default=False,
  210. help='assume csv_dir contains sorted files'
  211. )
  212. parser.add_argument(
  213. '--sort-only',
  214. action='store_true',
  215. default=False,
  216. help='sort files only'
  217. )
  218. parser.add_argument(
  219. '--sort-column',
  220. type=str,
  221. default='block_number',
  222. )
  223. parser.add_argument(
  224. '--out-filename',
  225. type=str,
  226. default='transactions-sorted.csv',
  227. )
  228. args: Any = parser.parse_args()
  229. main(args)