123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- import os
- import csv
- import copy
- import heapq
- import shutil
- import logging
- import tempfile
- import subprocess
- import pandas as pd
- from tqdm import tqdm
- from typing import Tuple, List, Any
- from os.path import join, dirname, realpath
- from src.utils.bigquery import EthereumBigQuery
- from src.utils.storage import EthereumStorage
- LIVE_DIR: str = realpath(dirname(__file__))
- ROOT_DIR: str = realpath(join(LIVE_DIR, '..'))
- DATA_DIR: str = realpath(join(ROOT_DIR, 'data'))
- STATIC_DIR: str = realpath(join(DATA_DIR, 'static'))
- LOG_DIR: str = realpath(join(ROOT_DIR, 'logs'))
- WEBAPP_DIR: str = realpath(join(ROOT_DIR, 'webapp'))
- WEBAPP_DATA_DIR: str = realpath(join(WEBAPP_DIR, 'static/data'))
- CONSTANTS = {
- 'live_path': LIVE_DIR,
- 'root_path': ROOT_DIR,
- 'data_path': DATA_DIR,
- 'static_path': STATIC_DIR,
- 'log_path': LOG_DIR,
- 'webapp_path': WEBAPP_DIR,
- 'webapp_data_path': WEBAPP_DATA_DIR,
- 'bigquery_project': 'lexical-theory-329617',
- 'postgres_db': 'tornado',
- 'postgres_user': 'postgres',
- }
- csv.field_size_limit(2**30)
- def export_bigquery_table_to_cloud_bucket(
- src_dataset: str,
- src_table: str,
- dest_bucket: str,
- ) -> bool:
- handler: EthereumBigQuery = EthereumBigQuery(src_dataset)
- try:
- handler.export_to_bucket(src_table, dest_bucket)
- return True
- except Exception as e:
- print(e.message, e.args)
- return False
- def delete_bucket_contents(bucket: str) -> bool:
- handler: EthereumStorage = EthereumStorage()
- try:
- handler.empty_bucket(bucket)
- return True
- except Exception as e:
- print(e.message, e.args)
- return False
- def export_cloud_bucket_to_csv(
- bucket: str, out_dir: str) -> Tuple[bool, List[str]]:
- handler: EthereumStorage = EthereumStorage()
- try:
- files: List[str] = handler.export_to_csv(bucket, out_dir)
- return True, files
- except Exception as e:
- print(e.message, e.args)
- return False, []
- def execute_bash(cmd: str) -> bool:
- code: int = subprocess.call(cmd, shell=True)
- return code == 0
- def get_logger(log_file: str) -> logging.basicConfig:
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- # create a file handler
- handler = logging.FileHandler(log_file)
- handler.setLevel(logging.INFO)
- # create a logging format
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- # add the file handler to the logger
- logger.addHandler(handler)
- return logger
- def load_data_from_chunks(
- files: List[str], sort_column = 'block_number') -> pd.DataFrame:
- """
- Read dataframes from files and sort by block number.
- """
- chunks: List[pd.DataFrame] = []
- for file_ in files:
- chunk: pd.DataFrame = pd.read_csv(file_)
- chunks.append(chunk)
- data: pd.DataFrame = pd.concat(chunks)
- data.reset_index(inplace=True)
- data.sort_values(sort_column, inplace=True)
- return data
- def load_data_from_chunks_low_memory(
- files: List[str],
- outfile: str,
- sort_column = 'block_number') -> pd.DataFrame:
- """
- Runs external merge sort on a potentially large file.
- """
- print('memory sorting')
- pbar = tqdm(total=len(files))
- for file_ in files:
- # sort the file in place
- memorysort(file_, file_, colname=sort_column)
- pbar.update()
- pbar.close()
- header: List[str] = get_header(files[0])
- merge_idx: int = header.index(sort_column)
- print('running merge sort')
- temp_filename: str = mergesort(files, nway=2, merge_idx=merge_idx)
- print('writing temp file to new file')
- count: int = 0
- with open(outfile, 'w', newline='') as fp:
- writer: csv.writer = csv.writer(
- fp, delimiter=',', quoting=csv.QUOTE_MINIMAL)
- writer.writerow(header)
- with open(temp_filename, newline='') as sfp:
- for row in csv.reader(sfp):
- writer.writerow(row)
- if count % 1000000 == 0:
- print(f'Written {count} rows.')
- count += 1
- print('removing temp file')
- os.remove(temp_filename)
- # -- external sort utilities --
- def memorysort(filename: str, outname: str, colname: str = 'block_number'):
- """Sort this CSV file in memory on the given columns"""
- df: pd.DataFrame = pd.read_csv(filename)
- df: pd.DataFrame = df.sort_values(colname)
- df.to_csv(outname, index=False) # overwriting
- del df # wipe from memory
- def mergesort(sorted_filenames, nway=2, merge_idx=5):
- """Merge two sorted CSV files into a single output."""
- orig_filenames: List[str] = copy.deepcopy(sorted_filenames)
- merge_n: int = 0
- while len(sorted_filenames) > 1:
- # merge_filenames = current files to sort
- # sorted_filenames = remaining files to sort
- merge_filenames, sorted_filenames = \
- sorted_filenames[:nway], sorted_filenames[nway:]
- num_remaining: int = len(sorted_filenames)
- num_total: int = len(merge_filenames) + len(sorted_filenames)
- if merge_n % 10 == 0:
- print(f'{merge_n} merged | {num_remaining} remaining | {num_total} total')
- with tempfile.NamedTemporaryFile(delete=False, mode='w') as fp:
- writer: csv.writer = csv.writer(fp)
- merge_n += 1 # increment
- iterators: List[Any]= [
- make_iterator(filename) for filename in merge_filenames]
- # `block_number` is the 4th column
- for row in heapq.merge(*iterators, key=lambda x: int(x[merge_idx])):
- writer.writerow(row)
- sorted_filenames.append(fp.name)
- # these are files to get rid of (don't remove original files)
- extra_filenames: List[str] = list(
- set(merge_filenames) - set(orig_filenames))
- for filename in extra_filenames:
- os.remove(filename)
- final_filename: str = sorted_filenames[0]
- return final_filename
- def make_iterator(filename):
- with open(filename, newline='') as fp:
- count: int = 0
- for row in csv.reader(fp):
- if count == 0:
- count: int = 1 # just make it not 0
- continue # skip header
- yield row
- def get_header(filename) -> List[str]:
- with open(filename, newline='') as fp:
- reader: csv.reader = csv.reader(fp, delimiter=',')
- header: List[str] = next(reader)
- return header
|