partition.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. #!/usr/bin/env python
  2. """An example of how one can partition a large file and process it in parallel.
  3. The core function is `partition`. Also look at how multiprocessing's
  4. starmap_async is used to get the result.
  5. """
  6. # Copyright 2018 - Stefano Mazzucco <stefano - AT - curso - DOT - re
  7. # License: Apache 2.0
  8. # https://www.apache.org/licenses/LICENSE-2.0.txt
  9. import mmap
  10. import os
  11. def partition(size, parts, overlap):
  12. """Return the offsets of the file given its size in bytes, the number of parts
  13. it should be split into and the overlap between the parts in bytes.
  14. """
  15. length = size // parts
  16. def partition(part):
  17. offset = part * (length - overlap + 1)
  18. return offset - (offset % mmap.ALLOCATIONGRANULARITY)
  19. return [partition(part) for part in range(parts)]
  20. def process(fd, length, offset, access):
  21. """Process file descriptor `fd` for its `length`, starting at `offset` and
  22. using `access` permissions.
  23. """
  24. # mmap cannot be pickled, so we create it in the sub-processes.
  25. m = mmap.mmap(
  26. fd,
  27. length=length,
  28. offset=offset,
  29. access=access
  30. )
  31. # Note that we don't have any guarantee that the first line is complete,
  32. # excluding the first chunk of the file.
  33. if offset == 0:
  34. header = m.readline() # Say this is a CSV file with headers.
  35. print('HEADER', header)
  36. line = m.readline() # Just an example.
  37. # print('Processed', line)
  38. return line
  39. def parse_args(argv=None):
  40. from argparse import ArgumentParser
  41. parser = ArgumentParser(
  42. description='Process file in parallel.'
  43. )
  44. parser.add_argument(
  45. 'fname',
  46. help='File to be processed',
  47. )
  48. parser.add_argument(
  49. '--parts',
  50. help='How many parts to split the processing into. Defaults to 3.',
  51. default=3,
  52. metavar='N',
  53. type=int,
  54. )
  55. return parser.parse_args(argv)
  56. if __name__ == '__main__':
  57. from multiprocessing import Pool, TimeoutError
  58. arguments = parse_args()
  59. input_file = arguments.fname
  60. parts = arguments.parts
  61. with open(input_file, 'rb') as f:
  62. # Twice the length of the first line should be a fair overlap.
  63. overlap = 2 * len(f.readline())
  64. fd = f.fileno()
  65. size = os.fstat(fd).st_size
  66. offsets = partition(size, parts, overlap)
  67. length = size // parts
  68. last = parts - 1
  69. args = [
  70. (
  71. fd,
  72. # Last part maps to end of file.
  73. 0 if part == last else length,
  74. offset,
  75. mmap.ACCESS_READ
  76. )
  77. for part, offset in enumerate(offsets)
  78. ]
  79. workers = len(os.sched_getaffinity(0)) + 1
  80. def callback(result):
  81. print('CALLBACK', result)
  82. def error_callback(error):
  83. print('ERROR CALLBACK', error)
  84. with Pool(workers) as pool:
  85. result = pool.starmap_async(
  86. process,
  87. args,
  88. callback=callback,
  89. error_callback=error_callback
  90. )
  91. timeout = 2
  92. try:
  93. outcome = result.get(timeout=timeout)
  94. print('OUTCOME', outcome)
  95. except TimeoutError:
  96. print(f'Timed out after {timeout} seconds')