123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- #!/usr/bin/env python
- """An example of how one can partition a large file and process it in parallel.
- The core function is `partition`. Also look at how multiprocessing's
- starmap_async is used to get the result.
- """
- # Copyright 2018 - Stefano Mazzucco <stefano - AT - curso - DOT - re
- # License: Apache 2.0
- # https://www.apache.org/licenses/LICENSE-2.0.txt
- import mmap
- import os
- def partition(size, parts, overlap):
- """Return the offsets of the file given its size in bytes, the number of parts
- it should be split into and the overlap between the parts in bytes.
- """
- length = size // parts
- def partition(part):
- offset = part * (length - overlap + 1)
- return offset - (offset % mmap.ALLOCATIONGRANULARITY)
- return [partition(part) for part in range(parts)]
- def process(fd, length, offset, access):
- """Process file descriptor `fd` for its `length`, starting at `offset` and
- using `access` permissions.
- """
- # mmap cannot be pickled, so we create it in the sub-processes.
- m = mmap.mmap(
- fd,
- length=length,
- offset=offset,
- access=access
- )
- # Note that we don't have any guarantee that the first line is complete,
- # excluding the first chunk of the file.
- if offset == 0:
- header = m.readline() # Say this is a CSV file with headers.
- print('HEADER', header)
- line = m.readline() # Just an example.
- # print('Processed', line)
- return line
- def parse_args(argv=None):
- from argparse import ArgumentParser
- parser = ArgumentParser(
- description='Process file in parallel.'
- )
- parser.add_argument(
- 'fname',
- help='File to be processed',
- )
- parser.add_argument(
- '--parts',
- help='How many parts to split the processing into. Defaults to 3.',
- default=3,
- metavar='N',
- type=int,
- )
- return parser.parse_args(argv)
- if __name__ == '__main__':
- from multiprocessing import Pool, TimeoutError
- arguments = parse_args()
- input_file = arguments.fname
- parts = arguments.parts
- with open(input_file, 'rb') as f:
- # Twice the length of the first line should be a fair overlap.
- overlap = 2 * len(f.readline())
- fd = f.fileno()
- size = os.fstat(fd).st_size
- offsets = partition(size, parts, overlap)
- length = size // parts
- last = parts - 1
- args = [
- (
- fd,
- # Last part maps to end of file.
- 0 if part == last else length,
- offset,
- mmap.ACCESS_READ
- )
- for part, offset in enumerate(offsets)
- ]
- workers = len(os.sched_getaffinity(0)) + 1
- def callback(result):
- print('CALLBACK', result)
- def error_callback(error):
- print('ERROR CALLBACK', error)
- with Pool(workers) as pool:
- result = pool.starmap_async(
- process,
- args,
- callback=callback,
- error_callback=error_callback
- )
- timeout = 2
- try:
- outcome = result.get(timeout=timeout)
- print('OUTCOME', outcome)
- except TimeoutError:
- print(f'Timed out after {timeout} seconds')
|