__init__.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import aiohttp
  2. import aio_pika
  3. import asyncio
  4. import io
  5. import json
  6. import os
  7. import PyPDF2
  8. from aio_pika.pool import Pool
  9. from distutils.util import strtobool
  10. from functools import partial
  11. async def run(loop, logger=None, config=None, consumer_pool_size=10):
  12. async def _get_connection():
  13. return await aio_pika.connect(
  14. host=config.get("mq_host"),
  15. port=config.get("mq_port"),
  16. login=config.get("mq_user"),
  17. password=config.get("mq_pass"),
  18. virtualhost=config.get("mq_vhost"),
  19. loop=loop
  20. )
  21. async def _get_channel():
  22. async with connection_pool.acquire() as connection:
  23. return await connection.channel()
  24. def _parse_file(pdf_content):
  25. pdf_reader = PyPDF2.PdfFileReader(io.BytesIO(pdf_content))
  26. pages = []
  27. for page in range(pdf_reader.numPages):
  28. page_obj = pdf_reader.getPage(page)
  29. pages.append(page_obj.extractText())
  30. return "".join(pages)
  31. async def _publish(content, channel):
  32. exchange = await channel.get_exchange(config.get("mq_target_exchange"))
  33. await exchange.publish(
  34. aio_pika.Message(content.encode("utf-8")),
  35. config.get("mq_target_routing_key"),
  36. )
  37. async def _consume(consumer_id):
  38. async with channel_pool.acquire() as channel:
  39. queue = await channel.declare_queue(
  40. config.get("mq_source_queue"), durable=config.get("mq_queue_durable"), auto_delete=False
  41. )
  42. while True:
  43. try:
  44. m = await queue.get(timeout=300 * consumer_pool_size)
  45. message = m.body.decode('utf-8')
  46. if logger:
  47. logger.debug(message)
  48. try:
  49. j = json.loads(message)
  50. async with aiohttp.ClientSession() as session:
  51. async with session.get(j["url"]) as response:
  52. if response.status == 200:
  53. buffer = await response.read()
  54. pdf_text = await loop.run_in_executor(None, partial(_parse_file, buffer))
  55. await _publish(json.dumps({"id": j["id"], "pdf_text": pdf_text}), channel=channel)
  56. logger.debug("Consumer %s: pdf %s sent" % (consumer_id, j["id"]))
  57. else:
  58. logger.error("Http Error: %s returns %s" % (j["url"], response.status))
  59. except Exception as e:
  60. if logger:
  61. logger.error("PDF Parsing Error: %s" % (e,))
  62. raise e
  63. else:
  64. await m.ack()
  65. except aio_pika.exceptions.QueueEmpty:
  66. if logger:
  67. logger.info("Consumer %s: Queue empty. Stopping." % consumer_id)
  68. break
  69. if config is None:
  70. config = {
  71. "mq_host": os.environ.get('MQ_HOST'),
  72. "mq_port": int(os.environ.get('MQ_PORT', '5672')),
  73. "mq_vhost": os.environ.get('MQ_VHOST'),
  74. "mq_user": os.environ.get('MQ_USER'),
  75. "mq_pass": os.environ.get('MQ_PASS'),
  76. "mq_source_queue": os.environ.get('MQ_SOURCE_QUEUE'),
  77. "mq_target_exchange": os.environ.get('MQ_TARGET_EXCHANGE'),
  78. "mq_target_routing_key": os.environ.get("MQ_TARGET_ROUTING_KEY"),
  79. "mq_queue_durable": bool(strtobool(os.environ.get('MQ_QUEUE_DURABLE', 'True'))),
  80. "consumer_pool_size": os.environ.get("CONSUMER_POOL_SIZE"),
  81. }
  82. if "consumer_pool_size" in config:
  83. if config.get("consumer_pool_size"):
  84. try:
  85. consumer_pool_size = int(config.get("consumer_pool_size"))
  86. except TypeError as e:
  87. if logger:
  88. logger.error("Invalid pool size: %s" % (consumer_pool_size,))
  89. raise e
  90. connection_pool = Pool(_get_connection, max_size=consumer_pool_size, loop=loop)
  91. channel_pool = Pool(_get_channel, max_size=consumer_pool_size, loop=loop)
  92. async with connection_pool, channel_pool:
  93. consumer_pool = []
  94. if logger:
  95. logger.info("Consumers started")
  96. for i in range(consumer_pool_size):
  97. consumer_pool.append(_consume(consumer_id=i))
  98. await asyncio.gather(*consumer_pool)