api.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. # coding: utf-8
  2. import json
  3. import uuid
  4. import asyncio
  5. import logging
  6. import functools
  7. import aioredis
  8. from aiohttp import web
  9. from prettyconf import config
  10. from echo.utils import (encode_json, decode_json, generate_key, normalize_path,
  11. request_to_dict, hash_dict, compare_hash,
  12. unused_tcp_port)
  13. log = logging.getLogger(__name__)
  14. app = None
  15. redis_pool = None
  16. @asyncio.coroutine
  17. def get_mock(request):
  18. data = json.dumps({'mocks': request.app['mock_db']})
  19. return web.Response(text=data, content_type='application/json')
  20. @asyncio.coroutine
  21. def put_mock(request):
  22. data = yield from request.json()
  23. uuid_hex = uuid.uuid4().hex
  24. path = yield from normalize_path('/mock/{}/{}/'.format(uuid_hex,
  25. data['path']))
  26. request.app.router.add_route(data['method'], path, mock)
  27. request.app['mock_db'][path] = data
  28. return web.Response(text=json.dumps({'path': path}),
  29. status=201,
  30. content_type='application/json')
  31. @asyncio.coroutine
  32. def get_proxy(request):
  33. data = json.dumps({'proxies': []})
  34. return web.Response(text=data, content_type='application/json')
  35. @asyncio.coroutine
  36. def put_proxy(request):
  37. log.info('put proxy')
  38. data = yield from request.json()
  39. # protocol = data['protocol']
  40. backend = data['backend']
  41. from .protocol import HTTPProxy
  42. ProxyServer = functools.partial(HTTPProxy, backend=backend)
  43. host = request.app['host']
  44. port = unused_tcp_port()
  45. proxy_server = yield from request.app.loop.create_server(ProxyServer,
  46. host,
  47. port)
  48. address = proxy_server.sockets[0].getsockname()
  49. path = 'http://{}:{}/'.format(*address)
  50. return web.Response(text=json.dumps({'path': path}),
  51. status=201,
  52. content_type='application/json')
  53. @asyncio.coroutine
  54. def all_callback(request):
  55. app = request.match_info['app']
  56. queue = request.match_info['queue']
  57. key = yield from generate_key(app, queue)
  58. with (yield from request.app['redis_pool']) as redis:
  59. requests = yield from redis.lrange(key, 0, -1)
  60. requests = [decode_json(r) for r in requests]
  61. redis.delete(key)
  62. data = json.dumps({'requests': requests})
  63. return web.Response(text=data, content_type='application/json')
  64. @asyncio.coroutine
  65. def first_callback(request):
  66. app = request.match_info['app']
  67. queue = request.match_info['queue']
  68. key = yield from generate_key(app, queue)
  69. with (yield from request.app['redis_pool']) as redis:
  70. request_ = yield from redis.lpop(key)
  71. request_ = decode_json(request_)
  72. data = json.dumps({'request': request_})
  73. return web.Response(text=data, content_type='application/json')
  74. @asyncio.coroutine
  75. def last_callback(request):
  76. app = request.match_info['app']
  77. queue = request.match_info['queue']
  78. key = yield from generate_key(app, queue)
  79. with (yield from request.app['redis_pool']) as redis:
  80. request_ = yield from redis.rpop(key)
  81. request_ = decode_json(request_)
  82. data = json.dumps({'request': request_})
  83. return web.Response(text=data, content_type='application/json')
  84. @asyncio.coroutine
  85. def clean_callback(request):
  86. app = request.match_info['app']
  87. queue = request.match_info['queue']
  88. key = yield from generate_key(app, queue)
  89. with (yield from request.app['redis_pool']) as redis:
  90. redis.delete(key)
  91. return web.Response(text=json.dumps({}),
  92. content_type='application/json')
  93. @asyncio.coroutine
  94. def callback(request):
  95. # TODO: Add support for addtional_url
  96. app = request.match_info['app']
  97. queue = request.match_info['queue']
  98. key = yield from generate_key(app, queue)
  99. request_ = yield from request_to_dict(request)
  100. json_ = yield from encode_json(request_)
  101. with (yield from request.app['redis_pool']) as redis:
  102. redis.rpush(key, json_)
  103. return web.Response(text=json.dumps({'request': request_}),
  104. content_type='application/json')
  105. @asyncio.coroutine
  106. def mock(request):
  107. received = yield from request.json()
  108. # Check/Validate for KeyError
  109. config = request.app['mock_db'][request.path]
  110. response = config['response']['body']
  111. expected = config['request']['body']
  112. status = config['response']['status_code']
  113. content_type = config['response']['headers']['content_type']
  114. expected_hash = yield from hash_dict(expected)
  115. received_hash = yield from hash_dict(received)
  116. if not compare_hash(expected_hash, received_hash):
  117. status = 400
  118. response = {
  119. 'message': 'Received request data did not match with expected',
  120. 'received_data': received,
  121. 'expected_data': expected,
  122. }
  123. return web.Response(text=json.dumps(response),
  124. status=status,
  125. content_type=content_type)
  126. @asyncio.coroutine
  127. def health(request):
  128. return web.Response(text=json.dumps({'status': 'ok'}),
  129. content_type='application/json')
  130. @asyncio.coroutine
  131. def start(loop, api_host='127.0.0.1', api_port=9876):
  132. app = web.Application(loop=loop)
  133. app['mock_db'] = {}
  134. app['host'] = api_host
  135. redis_address = (config('ECHO_REDIS_HOST', default='127.0.0.1'),
  136. config('ECHO_REDIS_PORT', default=6379))
  137. redis_db = config('ECHO_REDIS_DB', default=0)
  138. redis_pool = yield from aioredis.create_pool(redis_address, db=redis_db,
  139. minsize=5, maxsize=10,
  140. encoding='utf-8', loop=loop)
  141. app['redis_pool'] = redis_pool
  142. # Mock
  143. app.router.add_route('GET', '/mocks/', get_mock)
  144. app.router.add_route('PUT', '/mocks/', put_mock)
  145. # Proxies
  146. app.router.add_route('GET', '/proxies/', get_proxy)
  147. app.router.add_route('PUT', '/proxies/', put_proxy)
  148. # Callbacks
  149. app.router.add_route('*', '/callbacks/{app}/{queue}/', callback)
  150. app.router.add_route('GET', '/callbacks/_all/{app}/{queue}/', all_callback)
  151. app.router.add_route('GET', '/callbacks/_first/{app}/{queue}/',
  152. first_callback)
  153. app.router.add_route('GET', '/callbacks/_last/{app}/{queue}/',
  154. last_callback)
  155. app.router.add_route('GET', '/callbacks/_clean/{app}/{queue}/',
  156. clean_callback)
  157. # Health
  158. app.router.add_route('GET', '/health/', health)
  159. handler = app.make_handler()
  160. server = yield from loop.create_server(handler, api_host, api_port)
  161. address = server.sockets[0].getsockname()
  162. log.info('API started at http://{}:{}/'.format(*address))
  163. return server, handler, redis_pool
  164. def stop(loop):
  165. if app:
  166. loop.run_until_complete(app.finish())