supervisor_asio.h 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "rotor/supervisor.h"
  8. #include "rotor/asio/export.h"
  9. #include "supervisor_config_asio.h"
  10. #include "system_context_asio.h"
  11. #include "forwarder.hpp"
  12. #include <boost/asio.hpp>
  13. #include <unordered_map>
  14. #include <memory>
  15. #if defined(_MSC_VER)
  16. #pragma warning(push)
  17. #pragma warning(disable : 4251)
  18. #endif
  19. namespace rotor {
  20. namespace asio {
  21. namespace asio = boost::asio;
  22. namespace sys = boost::system;
  23. template <typename Actor, typename Handler, typename ArgsCount, typename ErrHandler> struct forwarder_t;
  24. /** \struct supervisor_asio_t
  25. *
  26. * \brief delivers rotor-messages on top of boost asio event loop
  27. * using `strand` for serialization
  28. *
  29. * The boost::asio `strand` guarantees, that handler, which "belongs" to
  30. * the same `strand` will be executed sequentially. It might be called
  31. * on different threads, however.
  32. *
  33. * The `supervisor_asio_t` uses that advantage to let the messages to
  34. * the supervisor and it's actors be delivered sequentially.
  35. *
  36. * The "sub-supervisors" can be created, but they do not extend the
  37. * sequential executing context, i.e. in the execution sense the
  38. * supervisors are independent.
  39. *
  40. * When there is need to "uplift" the boost::asio low-level event,
  41. * into high-level `rotor` message, the {@link forwarder_t} can be used.
  42. * It uses the `strand` under the hood.
  43. *
  44. * If there is need to change some actor's internals from boost::asio
  45. * handler, the change should be performed in synchronized way, i.e.
  46. * via `strand`.
  47. *
  48. */
  49. struct ROTOR_ASIO_API supervisor_asio_t : public supervisor_t {
  50. /** \brief injects an alias for supervisor_config_asio_t */
  51. using config_t = supervisor_config_asio_t;
  52. /** \brief injects templated supervisor_config_asio_builder_t */
  53. template <typename Supervisor> using config_builder_t = supervisor_config_asio_builder_t<Supervisor>;
  54. /** \brief constructs new supervisor from asio supervisor config */
  55. supervisor_asio_t(supervisor_config_asio_t &config);
  56. virtual address_ptr_t make_address() noexcept override;
  57. virtual void start() noexcept override;
  58. virtual void shutdown() noexcept override;
  59. virtual void enqueue(message_ptr_t message) noexcept override;
  60. virtual void shutdown_finish() noexcept override;
  61. /** \brief an helper for creation {@link forwarder_t} */
  62. template <typename Handler, typename ErrHandler>
  63. auto create_forwarder(Handler &&handler, ErrHandler &&err_handler) {
  64. return forwarder_t{*this, std::move(handler), std::move(err_handler)};
  65. }
  66. /** \brief an helper for creation {@link forwarder_t} (no-error handler case) */
  67. template <typename Handler> auto create_forwarder(Handler &&handler) {
  68. return forwarder_t{*this, std::move(handler)};
  69. }
  70. /** \brief returns execution strand */
  71. inline asio::io_context::strand &get_strand() noexcept { return *strand; }
  72. /** \brief process queue of messages of locality leader */
  73. void do_process() noexcept;
  74. protected:
  75. /** \struct timer_t
  76. * \brief boos::asio::deadline_timer with embedded timer handler */
  77. struct timer_t : public asio::deadline_timer {
  78. /** \brief non-owning pointer to timer handler */
  79. timer_handler_base_t *handler;
  80. /** \brief constructs timer using timer handler and boost asio io_context */
  81. timer_t(timer_handler_base_t *handler_, asio::io_context &io_context)
  82. : asio::deadline_timer(io_context), handler{handler_} {}
  83. };
  84. /** \brief unique pointer to timer */
  85. using timer_ptr_t = std::unique_ptr<timer_t>;
  86. /** \brief timer id to timer pointer mapping type */
  87. using timers_map_t = std::unordered_map<request_id_t, timer_ptr_t>;
  88. void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept override;
  89. void do_cancel_timer(request_id_t timer_id) noexcept override;
  90. /** \brief guard type : alias for asio executor_work_guard */
  91. using guard_t = asio::executor_work_guard<asio::io_context::executor_type>;
  92. /** \brief alias for a guard */
  93. using guard_ptr_t = std::unique_ptr<guard_t>;
  94. /** \brief timer id to timer pointer mapping */
  95. timers_map_t timers_map;
  96. /** \brief config for the supervisor */
  97. supervisor_config_asio_t::strand_ptr_t strand;
  98. /** \brief guard to control ownership of the io-context */
  99. guard_ptr_t guard;
  100. private:
  101. void invoke_shutdown() noexcept;
  102. };
  103. template <typename Actor> inline boost::asio::io_context::strand &get_strand(Actor &actor) {
  104. return actor.get_strand();
  105. }
  106. } // namespace asio
  107. } // namespace rotor
  108. #if defined(_MSC_VER)
  109. #pragma warning(pop)
  110. #endif