Asynchronous RabbitMQ consumer job library for PostgreSQL

Furkan Kalkan 66063a09e1 Await ack to complete. 2 سال پیش
.github 8d4599f8d2 Update python-publish.yml 3 سال پیش
bin 5a14f9f541 added logger and standalone run script for k8s 3 سال پیش
rabbitmq2psql_as_json 66063a09e1 Await ack to complete. 2 سال پیش
LICENSE f0a9573132 Codes added 3 سال پیش
README.md 46d39d6ecb Add dead letter queue feature and convert logging messages to f-strings 3 سال پیش
kube.yaml 46d39d6ecb Add dead letter queue feature and convert logging messages to f-strings 3 سال پیش
setup.py 57de5b0a7f Bump version 2 سال پیش

README.md

rabbitmq2psql-as-json

rabbitmq2psql-as-json is ready to use, basic asynchronous RabbitMQ consumer job library for PostgreSQL. It stops when queue is empty, so it can be useful for cron jobs, unit tests, CI/CD environments and production environments has slow datastream.

Installation

You can install this library easily with pip. pip install rabbitmq2psql-as-json

Usage

As a library

import os
import asyncio
import logging
from rabbitmq2psql_as_json import consume

if __name__ == '__main__':
    logger = logging.getLogger("rabbitmq2psql-as-json")
    logger.setLevel(os.environ.get('LOG_LEVEL', "DEBUG"))
    handler = logging.StreamHandler()
    handler.setFormatter(
        logging.Formatter(
            os.environ.get('LOG_FORMAT', "%(asctime)s [%(levelname)s] %(name)s: %(message)s")
        )
    )
    logger.addHandler(handler)

    config = {
      "mq_host": os.environ.get('MQ_HOST'),
	  "mq_port": int(os.environ.get('MQ_PORT')), 
	  "mq_vhost": os.environ.get('MQ_VHOST'),
	  "mq_user": os.environ.get('MQ_USER'),
	  "mq_pass": os.environ.get('MQ_PASS'),
	  "mq_queue": os.environ.get('MQ_QUEUE'),
      "mq_exchange": os.environ.get('MQ_EXCHANGE'),
      "mq_routing_key": os.environ.get('MQ_ROUTING_KEY'),
	  "db_host": os.environ.get('DB_HOST'),
	  "db_port": int(os.environ.get('DB_PORT')),
	  "db_user": os.environ.get('DB_USER'),
	  "db_pass": os.environ.get('DB_PASS'),
	  "db_database": os.environ.get('DB_DATABASE') 
    }
  
    sql_template = """insert into logs (body) values (%s);""" 
  
    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        consume(
          loop=loop,
          consumer_pool_size=10,
          sql_template=sql_template,
          config=config
        )
    )

loop.close()

This library uses aio_pika and aiopg packages.

Standalone

You can also call this library as standalone consumer job command. Just set required environment variables and run rabbitmq2psql-as-json. This usecase perfectly fits when you need run it on cronjobs or kubernetes jobs.

Required environment variables:

  • MQ_HOST
  • MQ_PORT (optional)
  • MQ_VHOST
  • MQ_USER
  • MQ_PASS
  • MQ_QUEUE
  • MQ_QUEUE_DURABLE (optional, default value: True)
  • MQ_EXCHANGE (Exchange for dead letter queue, aka records with error queue)
  • MQ_ROUTING_KEY (Routing key for dead letter queue)
  • DB_HOST
  • DB_PORT (optional)
  • DB_USER
  • DB_PASS
  • DB_DATABASE
  • SQL_TEMPLATE (DB Insert query template. Ex: insert into logs (body) values (%s);)
  • CONSUMER_POOL_SIZE (optional, default value: 10)
  • LOG_LEVEL (Logging level. See: Python logging module docs)

Example Kubernetes job: You can see it to kube.yaml