utils.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import os
  2. import csv
  3. import copy
  4. import heapq
  5. import shutil
  6. import logging
  7. import tempfile
  8. import subprocess
  9. import pandas as pd
  10. from tqdm import tqdm
  11. from typing import Tuple, List, Any
  12. from os.path import join, dirname, realpath
  13. from src.utils.bigquery import EthereumBigQuery
  14. from src.utils.storage import EthereumStorage
  15. LIVE_DIR: str = realpath(dirname(__file__))
  16. ROOT_DIR: str = realpath(join(LIVE_DIR, '..'))
  17. DATA_DIR: str = realpath(join(ROOT_DIR, 'data'))
  18. STATIC_DIR: str = realpath(join(DATA_DIR, 'static'))
  19. LOG_DIR: str = realpath(join(ROOT_DIR, 'logs'))
  20. WEBAPP_DIR: str = realpath(join(ROOT_DIR, 'webapp'))
  21. WEBAPP_DATA_DIR: str = realpath(join(WEBAPP_DIR, 'static/data'))
  22. CONSTANTS = {
  23. 'live_path': LIVE_DIR,
  24. 'root_path': ROOT_DIR,
  25. 'data_path': DATA_DIR,
  26. 'static_path': STATIC_DIR,
  27. 'log_path': LOG_DIR,
  28. 'webapp_path': WEBAPP_DIR,
  29. 'webapp_data_path': WEBAPP_DATA_DIR,
  30. 'bigquery_project': 'lexical-theory-329617',
  31. 'postgres_db': 'tornado',
  32. 'postgres_user': 'postgres',
  33. }
  34. csv.field_size_limit(2**30)
  35. def export_bigquery_table_to_cloud_bucket(
  36. src_dataset: str,
  37. src_table: str,
  38. dest_bucket: str,
  39. ) -> bool:
  40. handler: EthereumBigQuery = EthereumBigQuery(src_dataset)
  41. try:
  42. handler.export_to_bucket(src_table, dest_bucket)
  43. return True
  44. except Exception as e:
  45. print(e.message, e.args)
  46. return False
  47. def delete_bucket_contents(bucket: str) -> bool:
  48. handler: EthereumStorage = EthereumStorage()
  49. try:
  50. handler.empty_bucket(bucket)
  51. return True
  52. except Exception as e:
  53. print(e.message, e.args)
  54. return False
  55. def export_cloud_bucket_to_csv(
  56. bucket: str, out_dir: str) -> Tuple[bool, List[str]]:
  57. handler: EthereumStorage = EthereumStorage()
  58. try:
  59. files: List[str] = handler.export_to_csv(bucket, out_dir)
  60. return True, files
  61. except Exception as e:
  62. print(e.message, e.args)
  63. return False, []
  64. def execute_bash(cmd: str) -> bool:
  65. code: int = subprocess.call(cmd, shell=True)
  66. return code == 0
  67. def get_logger(log_file: str) -> logging.basicConfig:
  68. logger = logging.getLogger(__name__)
  69. logger.setLevel(logging.INFO)
  70. # create a file handler
  71. handler = logging.FileHandler(log_file)
  72. handler.setLevel(logging.INFO)
  73. # create a logging format
  74. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  75. handler.setFormatter(formatter)
  76. # add the file handler to the logger
  77. logger.addHandler(handler)
  78. return logger
  79. def load_data_from_chunks(
  80. files: List[str], sort_column = 'block_number') -> pd.DataFrame:
  81. """
  82. Read dataframes from files and sort by block number.
  83. """
  84. chunks: List[pd.DataFrame] = []
  85. for file_ in files:
  86. chunk: pd.DataFrame = pd.read_csv(file_)
  87. chunks.append(chunk)
  88. data: pd.DataFrame = pd.concat(chunks)
  89. data.reset_index(inplace=True)
  90. data.sort_values(sort_column, inplace=True)
  91. return data
  92. def load_data_from_chunks_low_memory(
  93. files: List[str],
  94. outfile: str,
  95. sort_column = 'block_number') -> pd.DataFrame:
  96. """
  97. Runs external merge sort on a potentially large file.
  98. """
  99. print('memory sorting')
  100. pbar = tqdm(total=len(files))
  101. for file_ in files:
  102. # sort the file in place
  103. memorysort(file_, file_, colname=sort_column)
  104. pbar.update()
  105. pbar.close()
  106. header: List[str] = get_header(files[0])
  107. merge_idx: int = header.index(sort_column)
  108. print('running merge sort')
  109. temp_filename: str = mergesort(files, nway=2, merge_idx=merge_idx)
  110. print('writing temp file to new file')
  111. count: int = 0
  112. with open(outfile, 'w', newline='') as fp:
  113. writer: csv.writer = csv.writer(
  114. fp, delimiter=',', quoting=csv.QUOTE_MINIMAL)
  115. writer.writerow(header)
  116. with open(temp_filename, newline='') as sfp:
  117. for row in csv.reader(sfp):
  118. writer.writerow(row)
  119. if count % 1000000 == 0:
  120. print(f'Written {count} rows.')
  121. count += 1
  122. print('removing temp file')
  123. os.remove(temp_filename)
  124. # -- external sort utilities --
  125. def memorysort(filename: str, outname: str, colname: str = 'block_number'):
  126. """Sort this CSV file in memory on the given columns"""
  127. df: pd.DataFrame = pd.read_csv(filename)
  128. df: pd.DataFrame = df.sort_values(colname)
  129. df.to_csv(outname, index=False) # overwriting
  130. del df # wipe from memory
  131. def mergesort(sorted_filenames, nway=2, merge_idx=5):
  132. """Merge two sorted CSV files into a single output."""
  133. orig_filenames: List[str] = copy.deepcopy(sorted_filenames)
  134. merge_n: int = 0
  135. while len(sorted_filenames) > 1:
  136. # merge_filenames = current files to sort
  137. # sorted_filenames = remaining files to sort
  138. merge_filenames, sorted_filenames = \
  139. sorted_filenames[:nway], sorted_filenames[nway:]
  140. num_remaining: int = len(sorted_filenames)
  141. num_total: int = len(merge_filenames) + len(sorted_filenames)
  142. if merge_n % 10 == 0:
  143. print(f'{merge_n} merged | {num_remaining} remaining | {num_total} total')
  144. with tempfile.NamedTemporaryFile(delete=False, mode='w') as fp:
  145. writer: csv.writer = csv.writer(fp)
  146. merge_n += 1 # increment
  147. iterators: List[Any]= [
  148. make_iterator(filename) for filename in merge_filenames]
  149. # `block_number` is the 4th column
  150. for row in heapq.merge(*iterators, key=lambda x: int(x[merge_idx])):
  151. writer.writerow(row)
  152. sorted_filenames.append(fp.name)
  153. # these are files to get rid of (don't remove original files)
  154. extra_filenames: List[str] = list(
  155. set(merge_filenames) - set(orig_filenames))
  156. for filename in extra_filenames:
  157. os.remove(filename)
  158. final_filename: str = sorted_filenames[0]
  159. return final_filename
  160. def make_iterator(filename):
  161. with open(filename, newline='') as fp:
  162. count: int = 0
  163. for row in csv.reader(fp):
  164. if count == 0:
  165. count: int = 1 # just make it not 0
  166. continue # skip header
  167. yield row
  168. def get_header(filename) -> List[str]:
  169. with open(filename, newline='') as fp:
  170. reader: csv.reader = csv.reader(fp, delimiter=',')
  171. header: List[str] = next(reader)
  172. return header