server.py 11 KB


  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # SPDX-FileCopyrightText: Copyright (C) 2022-2023 MH3SP Server Project
  4. # SPDX-License-Identifier: AGPL-3.0-or-later
  5. """Monster Hunter PAT Server module."""
  6. import multiprocessing
  7. import random
  8. import socket
  9. import struct
  10. import threading
  11. from mh.time_utils import Timer
  12. from other.utils import wii_ssl_wrap_socket
  13. try:
  14. # Python 3
  15. import queue
  16. import selectors
  17. except ImportError:
  18. # Python 2
  19. import Queue as queue
  20. import externals.selectors2 as selectors
  21. try:
  22. from typing import List, Tuple # noqa: F401
  23. except ImportError:
  24. pass
  25. class BasicPatHandler(object):
  26. def __init__(self, socket, client_address, server):
  27. # type: (socket.socket, Tuple[str, int], BasicPatServer) -> None
  28. self.socket = socket
  29. self.client_address = client_address
  30. self.server = server
  31. self.finished = False
  32. self.rw = threading.Lock()
  33. self.setup()
  34. def fileno(self):
  35. # type: () -> int
  36. return self.socket.fileno()
  37. def setup(self):
  38. self.rfile = self.socket.makefile('rb', -1)
  39. self.wfile = self.socket.makefile('wb', 0)
  40. self.on_init()
  41. def on_init(self):
  42. """Called after setup"""
  43. pass
  44. def on_exception(self, e):
  45. # type: (Exception) -> None
  46. """Called when during recv/write an exception ocurred"""
  47. pass
  48. def on_recv(self):
  49. """Called when the socket have bytes to be readed
  50. ** This method would be called by the server thread
  51. """
  52. header = self.rfile.read(8)
  53. if not len(header):
  54. # The socket was closed by externally
  55. return None
  56. if len(header) < 8:
  57. # Invalid packet header
  58. return None
  59. return self.recv_packet(header)
  60. def on_packet(self, data):
  61. """ Called when there is a packet to be handled
  62. ** This method would be called from a worker thread (Not Thread Safe)
  63. """
  64. def recv_packet(self, header):
  65. """Receive PAT packet."""
  66. size, seq, packet_id = struct.unpack(">HHI", header)
  67. data = self.rfile.read(size)
  68. return packet_id, data, seq
  69. def send_packet(self, packet_id=0, data=b'', seq=0):
  70. """Send PAT packet."""
  71. self.wfile.write(struct.pack(
  72. ">HHI",
  73. len(data), seq, packet_id
  74. ))
  75. if data:
  76. self.wfile.write(data)
  77. def on_tick(self):
  78. """Called every time the server tick
  79. ** Currently executed from the server thread
  80. """
  81. pass
  82. def on_finish(self):
  83. """Called before finish"""
  84. pass
  85. def is_finished(self):
  86. return self.finished
  87. def finish(self):
  88. """Called when the handler is being disposed"""
  89. if self.finished:
  90. return
  91. try:
  92. self.on_finish()
  93. except Exception:
  94. pass
  95. self.finished = True
  96. try:
  97. self.wfile.close()
  98. except Exception:
  99. pass
  100. try:
  101. self.rfile.close()
  102. except Exception:
  103. pass
  104. class BasicPatServer(object):
  105. socket_queue_size = 5
  106. address_family = socket.AF_INET
  107. socket_type = socket.SOCK_STREAM
  108. def __init__(self, server_address, RequestHandlerClass, max_threads,
  109. bind_and_activate=True, ssl_cert=None, ssl_key=None):
  110. # type: (Tuple[str, int], BasicPatHandler, int, bool, str|None, str|None) -> None
  111. """Constructor. May be extended, do not override."""
  112. self.server_address = server_address
  113. self.RequestHandlerClass = RequestHandlerClass
  114. self.__is_shut_down = threading.Event()
  115. self.__is_shut_down.set()
  116. self.__shutdown_request = False
  117. self.socket = socket.socket(self.address_family, self.socket_type)
  118. self._random = random.SystemRandom() # type: random.SystemRandom
  119. self.handlers = [] # type: List[BasicPatHandler]
  120. self.worker_threads = [] # type: List[threading.Thread]
  121. self.worker_queues = [] # type: list[queue.queue]
  122. self.selector = selectors.DefaultSelector()
  123. self.max_threads = max_threads or multiprocessing.cpu_count()
  124. self.ssl_cert = ssl_cert
  125. self.ssl_key = ssl_key
  126. if bind_and_activate:
  127. try:
  128. self.server_bind()
  129. self.server_activate()
  130. except Exception:
  131. self.close()
  132. raise
  133. def server_bind(self):
  134. self.socket.bind(self.server_address)
  135. self.server_address = self.socket.getsockname()
  136. def server_activate(self):
  137. self.socket.listen(0)
  138. def fileno(self):
  139. """Return server socket file descriptor.
  140. Interface required by selector.
  141. """
  142. return self.socket.fileno()
  143. def initialize_workers(self):
  144. """Initialize workers queues/threads.
  145. This needs to be deferred, otherwise the close method might try to
  146. join threads that aren't started yet when an error occurs early.
  147. """
  148. for n in range(self.max_threads):
  149. thread_queue = queue.Queue()
  150. thread = threading.Thread(
  151. target=self._worker_target,
  152. args=(thread_queue,),
  153. name="{}.Worker-{}".format(self.__class__.__name__, n)
  154. )
  155. self.worker_queues.append(thread_queue)
  156. self.worker_threads.append(thread)
  157. thread.start()
  158. def serve_forever(self):
  159. self.__is_shut_down.clear()
  160. try:
  161. self.initialize_workers()
  162. with self.selector as selector:
  163. selector.register(self, selectors.EVENT_READ)
  164. write_watch = Timer()
  165. write_timeout = 1 # Seconds
  166. while not self.__shutdown_request:
  167. ready = selector.select(write_timeout)
  168. if self.__shutdown_request:
  169. break
  170. for (key, event) in ready:
  171. selected = key.fileobj
  172. if selected == self:
  173. self.accept_new_connection()
  174. else:
  175. assert event == selectors.EVENT_READ
  176. try:
  177. packet = selected.on_recv()
  178. if packet is None:
  179. if selected.is_finished():
  180. self.remove_handler(selected)
  181. continue
  182. self._queue_work(selected, packet, event)
  183. except Exception as e:
  184. selected.on_exception(e)
  185. if selected.is_finished():
  186. self.remove_handler(selected)
  187. if write_watch.elapsed() >= write_timeout:
  188. try:
  189. for handler in self.handlers:
  190. try:
  191. handler.on_tick()
  192. except Exception as e:
  193. handler.on_exception(e)
  194. if handler.is_finished():
  195. self.remove_handler(handler)
  196. finally:
  197. write_watch.restart()
  198. finally:
  199. self.__is_shut_down.set()
  200. def _worker_target(self, work_queue):
  201. # type: (queue.Queue) -> None
  202. while not self.__shutdown_request:
  203. try:
  204. handler, packet, event = work_queue.get(block=True)
  205. except queue.Empty:
  206. continue
  207. if self.__shutdown_request:
  208. break
  209. if handler.is_finished():
  210. continue
  211. assert event == selectors.EVENT_READ
  212. try:
  213. handler.on_packet(packet)
  214. except Exception as e:
  215. handler.on_exception(e)
  216. if handler.is_finished():
  217. self.remove_handler(handler)
  218. def accept_new_connection(self):
  219. # type: () -> None
  220. try:
  221. client_socket, client_address = self.socket.accept()
  222. except Exception as e:
  223. self.error('Error accepting connection (1). {}'.format(e))
  224. return
  225. try:
  226. # TODO: Find a cleaner way to process ill-formed packets.
  227. # Currently, they get stuck on `packet = selected.on_recv()`,
  228. # thus blocking the `serve_forever` method.
  229. client_socket.settimeout(2.0)
  230. # TODO: Ensure this is the correct way to fix the server not
  231. # accepting SSL connection anymore.
  232. #
  233. # See https://stackoverflow.com/a/68214507
  234. if self.ssl_cert and self.ssl_key:
  235. client_socket = wii_ssl_wrap_socket(
  236. client_socket, self.ssl_cert, self.ssl_key
  237. )
  238. handler = self.RequestHandlerClass(client_socket, client_address,
  239. self)
  240. except Exception as e:
  241. self.error('Error accepting connection (2). {}'.format(e))
  242. return
  243. handler.__worker_thread = \
  244. self._random.randint(0, len(self.worker_queues)-1)
  245. self.selector.register(handler, selectors.EVENT_READ)
  246. self.handlers.append(handler)
  247. def _queue_work(self, handler, work_data, event):
  248. # type: (BasicPatHandler, any, int) -> None
  249. if handler.is_finished():
  250. return
  251. thread_queue = self.worker_queues[handler.__worker_thread]
  252. thread_queue.put((handler, work_data, event), block=True)
  253. def remove_handler(self, handler):
  254. # type: (BasicPatHandler) -> None
  255. try:
  256. self.handlers.remove(handler)
  257. except Exception:
  258. pass
  259. try:
  260. self.selector.unregister(handler)
  261. except Exception:
  262. pass
  263. try:
  264. handler.finish()
  265. except Exception:
  266. pass
  267. try:
  268. handler.socket.close()
  269. except Exception:
  270. pass
  271. def close(self):
  272. """Called to clean-up the server.
  273. May be overridden.
  274. """
  275. self.__shutdown_request = True
  276. self.socket.close()
  277. self.__is_shut_down.wait()
  278. for h in self.handlers:
  279. try:
  280. h.finish()
  281. except Exception:
  282. pass
  283. for q in self.worker_queues:
  284. q.put((None, None, None), block=True)
  285. for t in self.worker_threads:
  286. t.join()
  287. self.worker_queues = []
  288. self.selector = None
  289. self.worker_threads = []
  290. self.__shutdown_request = False
  291. self.info('Server Closed')