pagure-stream-server.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. Streaming server for pagure's eventsource feature
  7. This server takes messages sent to redis and publish them at the specified
  8. endpoint
  9. To test, run this script and in another terminal
  10. nc localhost 8080
  11. HELLO
  12. GET /test/issue/26?foo=bar HTTP/1.1
  13. """
  14. import datetime
  15. import logging
  16. import os
  17. import urlparse
  18. import trollius
  19. import trollius_redis
  20. log = logging.getLogger(__name__)
  21. if 'PAGURE_CONFIG' not in os.environ \
  22. and os.path.exists('/etc/pagure/pagure.cfg'):
  23. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  24. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  25. import pagure
  26. import pagure.lib
  27. from pagure.exceptions import PagureEvException
  28. SERVER = None
  29. def get_obj_from_path(path):
  30. """ Return the Ticket or Request object based on the path provided.
  31. """
  32. username = None
  33. try:
  34. if path.startswith('/fork'):
  35. username, repo, obj, objid = path.split('/')[2:6]
  36. else:
  37. repo, obj, objid = path.split('/')[1:4]
  38. except:
  39. raise PagureEvException("Invalid URL: %s" % path)
  40. repo = pagure.lib.get_project(pagure.SESSION, repo, user=username)
  41. if repo is None:
  42. raise PagureEvException("Project '%s' not found" % repo)
  43. output = None
  44. if obj == 'issue':
  45. if not repo.settings.get('issue_tracker', True):
  46. raise PagureEvException("No issue tracker found for this project")
  47. output = pagure.lib.search_issues(
  48. pagure.SESSION, repo, issueid=objid)
  49. if output is None or output.project != repo:
  50. raise PagureEvException("Issue '%s' not found" % objid)
  51. if output.private:
  52. # TODO: find a way to do auth
  53. raise PagureEvException(
  54. "This issue is private and you are not allowed to view it")
  55. elif obj == 'pull-request':
  56. if not repo.settings.get('pull_requests', True):
  57. raise PagureEvException(
  58. "No pull-request tracker found for this project")
  59. output = pagure.lib.search_pull_requests(
  60. pagure.SESSION, project_id=repo.id, requestid=objid)
  61. if output is None or output.project != repo:
  62. raise PagureEvException("Pull-Request '%s' not found" % objid)
  63. else:
  64. raise PagureEvException("Invalid object provided: '%s'" % obj)
  65. return output
  66. @trollius.coroutine
  67. def handle_client(client_reader, client_writer):
  68. data = None
  69. while True:
  70. # give client a chance to respond, timeout after 10 seconds
  71. line = yield trollius.From(trollius.wait_for(
  72. client_reader.readline(),
  73. timeout=10.0))
  74. if not line.decode().strip():
  75. break
  76. line = line.decode().rstrip()
  77. if data is None:
  78. data = line
  79. if data is None:
  80. log.warning("Expected ticket uid, received None")
  81. return
  82. data = data.decode().rstrip().split()
  83. log.info("Received %s", data)
  84. if not data:
  85. log.warning("No URL provided: %s" % data)
  86. return
  87. if not '/' in data[1]:
  88. log.warning("Invalid URL provided: %s" % data[1])
  89. return
  90. url = urlparse.urlsplit(data[1])
  91. try:
  92. obj = get_obj_from_path(url.path)
  93. except PagureEvException as err:
  94. log.warning(err.message)
  95. return
  96. origin = pagure.APP.config.get('APP_URL')
  97. if origin.endswith('/'):
  98. origin = origin[:-1]
  99. client_writer.write((
  100. "HTTP/1.0 200 OK\n"
  101. "Content-Type: text/event-stream\n"
  102. "Cache: nocache\n"
  103. "Connection: keep-alive\n"
  104. "Access-Control-Allow-Origin: %s\n\n" % origin
  105. ).encode())
  106. connection = yield trollius.From(trollius_redis.Connection.create(
  107. host=pagure.APP.config['REDIS_HOST'],
  108. port=pagure.APP.config['REDIS_PORT'],
  109. db=pagure.APP.config['REDIS_DB']))
  110. try:
  111. # Create subscriber.
  112. subscriber = yield trollius.From(connection.start_subscribe())
  113. # Subscribe to channel.
  114. yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid]))
  115. # Inside a while loop, wait for incoming events.
  116. while True:
  117. reply = yield trollius.From(subscriber.next_published())
  118. #print(u'Received: ', repr(reply.value), u'on channel', reply.channel)
  119. log.info(reply)
  120. log.info("Sending %s", reply.value)
  121. client_writer.write(('data: %s\n\n' % reply.value).encode())
  122. yield trollius.From(client_writer.drain())
  123. except trollius.ConnectionResetError as err:
  124. log.exception("ERROR: ConnectionResetError in handle_client")
  125. except Exception as err:
  126. log.exception("ERROR: Exception in handle_client")
  127. finally:
  128. # Wathever happens, close the connection.
  129. connection.close()
  130. client_writer.close()
  131. @trollius.coroutine
  132. def stats(client_reader, client_writer):
  133. try:
  134. log.info('Clients: %s', SERVER.active_count)
  135. client_writer.write((
  136. "HTTP/1.0 200 OK\n"
  137. "Cache: nocache\n\n"
  138. ).encode())
  139. client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
  140. yield trollius.From(client_writer.drain())
  141. except trollius.ConnectionResetError as err:
  142. log.info(err)
  143. pass
  144. finally:
  145. client_writer.close()
  146. return
  147. def main():
  148. global SERVER
  149. try:
  150. loop = trollius.get_event_loop()
  151. coro = trollius.start_server(
  152. handle_client,
  153. host=None,
  154. port=pagure.APP.config['EVENTSOURCE_PORT'],
  155. loop=loop)
  156. SERVER = loop.run_until_complete(coro)
  157. log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname()))
  158. if pagure.APP.config.get('EV_STATS_PORT'):
  159. stats_coro = trollius.start_server(
  160. stats,
  161. host=None,
  162. port=pagure.APP.config.get('EV_STATS_PORT'),
  163. loop=loop)
  164. stats_server = loop.run_until_complete(stats_coro)
  165. log.info('Serving stats at {}'.format(
  166. stats_server.sockets[0].getsockname()))
  167. loop.run_forever()
  168. except KeyboardInterrupt:
  169. pass
  170. except trollius.ConnectionResetError as err:
  171. log.exception("ERROR: ConnectionResetError in main")
  172. except Exception as err:
  173. log.exception("ERROR: Exception in main")
  174. finally:
  175. # Close the server
  176. SERVER.close()
  177. if pagure.APP.config.get('EV_STATS_PORT'):
  178. stats_server.close()
  179. log.info("End Connection")
  180. loop.run_until_complete(SERVER.wait_closed())
  181. loop.close()
  182. log.info("End")
  183. if __name__ == '__main__':
  184. log = logging.getLogger("")
  185. formatter = logging.Formatter(
  186. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  187. # setup console logging
  188. log.setLevel(logging.DEBUG)
  189. ch = logging.StreamHandler()
  190. ch.setLevel(logging.DEBUG)
  191. aslog = logging.getLogger("asyncio")
  192. aslog.setLevel(logging.DEBUG)
  193. ch.setFormatter(formatter)
  194. log.addHandler(ch)
  195. main()