Asynchronous RabbitMQ consumer job library for PostgreSQL
Furkan Kalkan 66063a09e1 Await ack to complete. | 2 years ago | |
---|---|---|
.github | 3 years ago | |
bin | 3 years ago | |
rabbitmq2psql_as_json | 2 years ago | |
LICENSE | 3 years ago | |
README.md | 3 years ago | |
kube.yaml | 3 years ago | |
setup.py | 2 years ago |
rabbitmq2psql-as-json is ready to use, basic asynchronous RabbitMQ consumer job library for PostgreSQL. It stops when queue is empty, so it can be useful for cron jobs, unit tests, CI/CD environments and production environments has slow datastream.
You can install this library easily with pip.
pip install rabbitmq2psql-as-json
import os
import asyncio
import logging
from rabbitmq2psql_as_json import consume
if __name__ == '__main__':
logger = logging.getLogger("rabbitmq2psql-as-json")
logger.setLevel(os.environ.get('LOG_LEVEL', "DEBUG"))
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
os.environ.get('LOG_FORMAT', "%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
)
logger.addHandler(handler)
config = {
"mq_host": os.environ.get('MQ_HOST'),
"mq_port": int(os.environ.get('MQ_PORT')),
"mq_vhost": os.environ.get('MQ_VHOST'),
"mq_user": os.environ.get('MQ_USER'),
"mq_pass": os.environ.get('MQ_PASS'),
"mq_queue": os.environ.get('MQ_QUEUE'),
"mq_exchange": os.environ.get('MQ_EXCHANGE'),
"mq_routing_key": os.environ.get('MQ_ROUTING_KEY'),
"db_host": os.environ.get('DB_HOST'),
"db_port": int(os.environ.get('DB_PORT')),
"db_user": os.environ.get('DB_USER'),
"db_pass": os.environ.get('DB_PASS'),
"db_database": os.environ.get('DB_DATABASE')
}
sql_template = """insert into logs (body) values (%s);"""
loop = asyncio.get_event_loop()
loop.run_until_complete(
consume(
loop=loop,
consumer_pool_size=10,
sql_template=sql_template,
config=config
)
)
loop.close()
This library uses aio_pika and aiopg packages.
You can also call this library as standalone consumer job command. Just set required environment variables and run rabbitmq2psql-as-json
. This usecase perfectly fits when you need run it on cronjobs or kubernetes jobs.
Required environment variables:
insert into logs (body) values (%s);
)Example Kubernetes job: You can see it to kube.yaml