actor_base.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "forward.hpp"
  8. #include "address.hpp"
  9. #include "actor_config.h"
  10. #include "messages.hpp"
  11. #include "state.h"
  12. #include "handler.h"
  13. #include "extended_error.h"
  14. #include "timer_handler.hpp"
  15. #include <set>
  16. #if defined(_MSC_VER)
  17. #pragma warning(push)
  18. #pragma warning(disable : 4251)
  19. #endif
  20. namespace rotor {
  21. /** \struct actor_base_t
  22. * \brief universal primitive of concurrent computation
  23. *
  24. * The class is base class for user-defined actors. It is expected that
  25. * actors will react on incoming messages (e.g. by changing internal
  26. * /private state) or send (other) messages to other actors, or do
  27. * some side-effects (I/O, etc.).
  28. *
  29. * Message passing interface is asynchronous, they are send to {@link supervisor_t}.
  30. *
  31. * Every actor belong to some {@link supervisor_t}, which "injects" the thread-safe
  32. * execution context, in a sense, that the actor can call it's own methods as well
  33. * as supervisors without any need of synchronization.
  34. *
  35. * All actor methods are thread-unsafe, i.e. should not be called with except of
  36. * it's own supervisor. Communication with actor should be performed via messages.
  37. *
  38. * Actor is addressed by it's "main" address; however it is possible for an actor
  39. * to have multiple identities aka "virtual" addresses.
  40. *
  41. */
  42. struct ROTOR_API actor_base_t : public arc_base_t<actor_base_t> {
  43. /** \brief injects an alias for actor_config_t */
  44. using config_t = actor_config_t;
  45. /** \brief injects templated actor_config_builder_t */
  46. template <typename Actor> using config_builder_t = actor_config_builder_t<Actor>;
  47. /** \brief SFINAE handler detector
  48. *
  49. * Either handler can be constructed from member-to-function-pointer or
  50. * it is already constructed and have a base `handler_base_t`
  51. */
  52. template <typename Handler>
  53. using is_handler =
  54. std::enable_if_t<std::is_member_function_pointer_v<Handler> || std::is_base_of_v<handler_base_t, Handler>>;
  55. // clang-format off
  56. /** \brief the default list of plugins for an actor
  57. *
  58. * The order of plugins is very important, as they are initialized in the direct order
  59. * and deinitialized in the reverse order.
  60. *
  61. */
  62. using plugins_list_t = std::tuple<
  63. plugin::address_maker_plugin_t,
  64. plugin::lifetime_plugin_t,
  65. plugin::init_shutdown_plugin_t,
  66. plugin::link_server_plugin_t,
  67. plugin::link_client_plugin_t,
  68. plugin::registry_plugin_t,
  69. plugin::resources_plugin_t,
  70. plugin::starter_plugin_t>;
  71. // clang-format on
  72. /** \brief constructs actor and links it's supervisor
  73. *
  74. * An actor cannot outlive it's supervisor.
  75. *
  76. * Sets internal actor state to `NEW`
  77. *
  78. */
  79. actor_base_t(config_t &cfg);
  80. virtual ~actor_base_t();
  81. /** \brief early actor initialization (pre-initialization)
  82. *
  83. * Actor's plugins are activated, "main" address is created
  84. * (via {@link plugin::address_maker_plugin_t}), state is set
  85. * to `INITIALIZING` (via {@link plugin::init_shutdown_plugin_t}).
  86. *
  87. */
  88. virtual void do_initialize(system_context_t *ctx) noexcept;
  89. /** \brief convenient method to send actor's supervisor shutdown trigger message
  90. *
  91. * If actor is already shutting down, the method will do nothing, otherwise
  92. * it will send shutdown trigger to its supervisor.
  93. *
  94. * The shutdown reason is forwarded "as is". If it is missing, than it will
  95. * be constructed with the error code "normal shutdown".
  96. */
  97. virtual void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept;
  98. /** \brief actor is fully initialized and it's supervisor has sent signal to start
  99. *
  100. * The actor state is set to `OPERATIONAL`.
  101. *
  102. */
  103. virtual void on_start() noexcept;
  104. /** \brief sends message to the destination address
  105. *
  106. * The provided arguments are used for the construction of **payload**, which
  107. * is, in turn, is wrapped into message.
  108. *
  109. * Internally the new message is placed into supervisor's outbound queue.
  110. *
  111. */
  112. template <typename M, typename... Args> void send(const address_ptr_t &addr, Args &&...args);
  113. /** \brief returns request builder for destination address using the "main" actor address
  114. *
  115. * The `args` are forwarded for construction of the request. The request is not actually sent,
  116. * until `send` method of {@link request_builder_t} will be invoked.
  117. *
  118. * Supervisor will spawn timeout timer upon `timeout` method.
  119. */
  120. template <typename R, typename... Args>
  121. request_builder_t<typename request_wrapper_t<R>::request_t> request(const address_ptr_t &dest_addr, Args &&...args);
  122. /** \brief returns request builder for destination address using the specified address for reply
  123. *
  124. * It is assumed, that the specified address belongs to the actor.
  125. *
  126. * The method is useful, when a different behavior is needed for the same
  127. * message response types. It serves at some extend as virtual dispatching within
  128. * the actor.
  129. *
  130. * See the description of `request` method.
  131. *
  132. */
  133. template <typename R, typename... Args>
  134. request_builder_t<typename request_wrapper_t<R>::request_t>
  135. request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args);
  136. /** \brief convenient method for constructing and sending response to a request
  137. *
  138. * `args` are forwarded to response payload construction
  139. */
  140. template <typename Request, typename... Args> void reply_to(Request &message, Args &&...args);
  141. /** \brief convenient method for constructing and sending error response to a request */
  142. template <typename Request> void reply_with_error(Request &message, const extended_error_ptr_t &ec);
  143. /** \brief makes response to the request, but does not send it.
  144. *
  145. * The return type is intrusive pointer to the message, not the message itself.
  146. *
  147. * It can be useful for delayed responses. The response can be dispatched later via
  148. * supervisor->put(std::move(response_ptr));
  149. *
  150. */
  151. template <typename Request, typename... Args> auto make_response(Request &message, Args &&...args);
  152. /** \brief makes error response to the request, but does not send it.
  153. *
  154. * The return type is intrusive pointer to the message, not the message itself.
  155. *
  156. * It can be useful for delayed responses. The response can be dispatched later via
  157. * supervisor->put(std::move(response_ptr));
  158. *
  159. */
  160. template <typename Request> auto make_response(Request &message, const extended_error_ptr_t &ec);
  161. /** \brief subscribes actor's handler to process messages on the specified address */
  162. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept;
  163. /** \brief subscribes actor's handler to process messages on the actor's "main" address */
  164. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h) noexcept;
  165. /** \brief unsubscribes actor's handler from process messages on the specified address */
  166. template <typename Handler, typename = is_handler<Handler>>
  167. void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept;
  168. /** \brief unsubscribes actor's handler from processing messages on the actor's "main" address */
  169. template <typename Handler, typename = is_handler<Handler>> void unsubscribe(Handler &&h) noexcept;
  170. /* \brief initiates handler unsubscription from the address
  171. *
  172. * If the address is local, then unsubscription confirmation is sent immediately,
  173. * otherwise {@link payload::external_subscription_t} request is sent to the external
  174. * supervisor, which owns the address.
  175. *
  176. * The optional call can be provided to be called upon message destruction.
  177. *
  178. */
  179. /** \brief initiates handler unsubscription from the default actor address */
  180. inline void unsubscribe(const handler_ptr_t &h) noexcept { lifetime->unsubscribe(h, address); }
  181. /** \brief starts plugins activation */
  182. void activate_plugins() noexcept;
  183. /** \brief finishes plugin activation, successful or not */
  184. void commit_plugin_activation(plugin::plugin_base_t &plugin, bool success) noexcept;
  185. /** \brief starts plugins deactivation */
  186. void deactivate_plugins() noexcept;
  187. /** \brief finishes plugin deactivation */
  188. void commit_plugin_deactivation(plugin::plugin_base_t &plugin) noexcept;
  189. /** \brief propagates subscription message to corresponding actors */
  190. void on_subscription(message::subscription_t &message) noexcept;
  191. /** \brief propagates unsubscription message to corresponding actors */
  192. void on_unsubscription(message::unsubscription_t &message) noexcept;
  193. /** \brief propagates external unsubscription message to corresponding actors */
  194. void on_unsubscription_external(message::unsubscription_external_t &message) noexcept;
  195. /** \brief creates new unique address for an actor (via address_maker plugin) */
  196. address_ptr_t create_address() noexcept;
  197. /** \brief starts shutdown procedure, e.g. upon receiving shutdown request
  198. *
  199. * The actor state is set to SHUTTING_DOWN.
  200. *
  201. */
  202. virtual void shutdown_start() noexcept;
  203. /** \brief polls plugins for shutdown
  204. *
  205. * The poll is performed in the reverse order. If all plugins, with active
  206. * shutdown reaction confirm they are ready to shutdown, then the
  207. * `shutdown_finish` method is invoked.
  208. *
  209. */
  210. void shutdown_continue() noexcept;
  211. /** \brief finalizes shutdown
  212. *
  213. * The shutdown response is sent and actor state is set to SHUT_DOWN.
  214. *
  215. * This is the last action in the shutdown sequence.
  216. * No further methods will be invoked on the actor.
  217. *
  218. * All unfinished requests and untriggered timers will be cancelled
  219. * by force in the method.
  220. *
  221. */
  222. virtual void shutdown_finish() noexcept;
  223. /** \brief starts initialization procedure
  224. *
  225. * The actor state is set to INITIALIZING.
  226. *
  227. */
  228. virtual void init_start() noexcept;
  229. /** \brief polls plugins whether they completed initialization.
  230. *
  231. * The poll is performed in the direct order. If all plugins, with active
  232. * init reaction confirm they are ready, then the `init_finish` method
  233. * is invoked.
  234. *
  235. */
  236. void init_continue() noexcept;
  237. /** \brief finalizes initialization
  238. *
  239. * The init response is sent and actor state is set to INITIALIZED.
  240. *
  241. */
  242. virtual void init_finish() noexcept;
  243. /** \brief main callback for plugin configuration when it's ready */
  244. virtual void configure(plugin::plugin_base_t &plugin) noexcept;
  245. /** \brief generic non-public fields accessor */
  246. template <typename T> auto &access() noexcept;
  247. /** \brief generic non-public methods accessor */
  248. template <typename T, typename... Args> auto access(Args... args) noexcept;
  249. /** \brief generic non-public fields accessor */
  250. template <typename T> auto &access() const noexcept;
  251. /** \brief generic non-public methods accessor */
  252. template <typename T, typename... Args> auto access(Args... args) const noexcept;
  253. /** \brief returns actor's main address */
  254. inline const address_ptr_t &get_address() const noexcept { return address; }
  255. /** \brief returns actor's supervisor */
  256. inline supervisor_t &get_supervisor() const noexcept { return *supervisor; }
  257. /** \brief spawns a new one-shot timer
  258. *
  259. * \param interval specifies amount of time, after which the timer will trigger.
  260. * \param delegate is an object of arbitrary class.
  261. * \param method is the pointer-to-member-function of the object or callback, which will be
  262. * invoked upon timer triggering or cancellation.
  263. *
  264. * The `method` parameter should have the following signatures:
  265. *
  266. * void Delegate::on_timer(request_id_t, bool cancelled) noexcept;
  267. *
  268. * or
  269. *
  270. * void(Delegate*,request_id_t, bool cancelled) noexcept
  271. *
  272. * `start_timer` returns timer identity. It will be supplied to the specified callback,
  273. * or the timer can be cancelled via it.
  274. */
  275. template <typename Delegate, typename Method,
  276. typename = std::enable_if_t<std::is_invocable_v<Method, Delegate *, request_id_t, bool>>>
  277. request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept;
  278. /** \brief cancels previously started timer
  279. *
  280. * If timer hasn't been triggered, then it is cancelled and the callback will be invoked
  281. * with `true` to mark that it was cancelled.
  282. *
  283. * Upon cancellation the timer callback will be invoked immediately, in the context of caller.
  284. */
  285. void cancel_timer(request_id_t request_id) noexcept;
  286. /** \brief returns actor shutdown reason
  287. *
  288. * The shutdown reason should be available if actors' state is already `SHUTTING_DOWN`
  289. *
  290. */
  291. inline const extended_error_ptr_t &get_shutdown_reason() const noexcept { return shutdown_reason; }
  292. /** \brief retuns human-readable actor identity
  293. *
  294. * The identity can be assigned either directly in ctor, or via address_maker plugin
  295. *
  296. */
  297. inline const std::string &get_identity() const noexcept { return identity; }
  298. /** \brief flag to mark, that actor is already executing initialization */
  299. static const constexpr std::uint32_t PROGRESS_INIT = 1 << 0;
  300. /** \brief flag to mark, that actor is already executing shutdown */
  301. static const constexpr std::uint32_t PROGRESS_SHUTDOWN = 1 << 1;
  302. /** \brief flag to mark, that actor is already executing shutdown
  303. *
  304. * When actor is shutdown due to failure, if this flag is ON, then
  305. * it will trigger it's supervisor shutdown.
  306. *
  307. * This policy is ignored when actor is spawned.
  308. *
  309. */
  310. static const constexpr std::uint32_t ESCALATE_FALIURE = 1 << 2;
  311. /** \brief flag to mark, that actor trigger supervisor shutdown
  312. *
  313. * When actor is shutdown (for whatever reason), if this flag is ON, then
  314. * it will trigger it's supervisor shutdown.
  315. *
  316. * This policy is ignored when actor is spawned.
  317. *
  318. */
  319. static const constexpr std::uint32_t AUTOSHUTDOWN_SUPERVISOR = 1 << 3;
  320. /** \brief whether spawner should create a new instance of the actor
  321. *
  322. * When then actor is spawned via a spawner, and it becomes down,
  323. * the spawner will ask the current instance whether it should
  324. * spawn another one.
  325. *
  326. * This method is consulted, only when spawner's restart_policy_t is
  327. * `ask_actor`.
  328. *
  329. */
  330. virtual bool should_restart() const noexcept;
  331. protected:
  332. /** \brief timer-id to timer-handler map (type) */
  333. using timers_map_t = std::unordered_map<request_id_t, timer_handler_ptr_t>;
  334. /** \brief list of ids of active requests (type) */
  335. using requests_t = std::unordered_set<request_id_t>;
  336. /** \brief triggers timer handler associated with the timer id */
  337. void on_timer_trigger(request_id_t request_id, bool cancelled) noexcept;
  338. /** \brief starts timer with pre-forged timer id (aka request-id */
  339. template <typename Delegate, typename Method>
  340. void start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
  341. Method method) noexcept;
  342. /** \brief helper-method, which assigns shutdown reason if it isn't set */
  343. void assign_shutdown_reason(extended_error_ptr_t reason) noexcept;
  344. /** \brief makes extended error within the context of the actor */
  345. extended_error_ptr_t make_error(const std::error_code &ec, const extended_error_ptr_t &next = {},
  346. const message_ptr_t &request = {}) const noexcept;
  347. /** \brief notification, when actor has been unlinked from server actor
  348. *
  349. * Returns boolean, meaning whether actor should initiate shutdown. Default value is `true`.
  350. *
  351. */
  352. virtual bool on_unlink(const address_ptr_t &server_addr) noexcept;
  353. /** \brief suspended init request message */
  354. intrusive_ptr_t<message::init_request_t> init_request;
  355. /** \brief suspended shutdown request message */
  356. intrusive_ptr_t<message::shutdown_request_t> shutdown_request;
  357. /** \brief actor address */
  358. address_ptr_t address;
  359. /** \brief actor spawner address */
  360. address_ptr_t spawner_address;
  361. /** \brief actor identity, which might have some meaning for developers */
  362. std::string identity;
  363. /** \brief non-owning pointer to actor's execution / infrastructure context */
  364. supervisor_t *supervisor;
  365. /** \brief opaque plugins storage (owning) */
  366. plugin_storage_ptr_t plugins_storage;
  367. /** \brief non-owning list of plugins */
  368. plugins_t plugins;
  369. /** \brief timeout for actor initialization (used by supervisor) */
  370. pt::time_duration init_timeout;
  371. /** \brief timeout for actor shutdown (used by supervisor) */
  372. pt::time_duration shutdown_timeout;
  373. /** \brief current actor state */
  374. state_t state;
  375. /** \brief non-owning pointer to address_maker plugin */
  376. plugin::address_maker_plugin_t *address_maker = nullptr;
  377. /** \brief non-owning pointer to lifetime plugin */
  378. plugin::lifetime_plugin_t *lifetime = nullptr;
  379. /** \brief non-owning pointer to link_server plugin */
  380. plugin::link_server_plugin_t *link_server = nullptr;
  381. /** \brief non-owning pointer to resources plugin */
  382. plugin::resources_plugin_t *resources = nullptr;
  383. /** \brief finds plugin by plugin class identity
  384. *
  385. * `nullptr` is returned when plugin cannot be found
  386. */
  387. plugin::plugin_base_t *get_plugin(const std::type_index &) const noexcept;
  388. /** \brief set of activating plugin identities */
  389. std::set<const std::type_index *> activating_plugins;
  390. /** \brief set of deactivating plugin identities */
  391. std::set<const std::type_index *> deactivating_plugins;
  392. /** \brief timer-id to timer-handler map */
  393. timers_map_t timers_map;
  394. /** \brief list of ids of active requests */
  395. requests_t active_requests;
  396. /** \brief set of currently processing states, i.e. init or shutdown
  397. *
  398. * This is not the same as `state_t` flag, which just marks the state.
  399. *
  400. * The `continuation_mask` is mostly used by plugins to avoid recursion
  401. *
  402. */
  403. std::uint32_t continuation_mask = 0;
  404. /** \brief explanation, why actor is been requested for shut down */
  405. extended_error_ptr_t shutdown_reason;
  406. friend struct plugin::plugin_base_t;
  407. friend struct plugin::lifetime_plugin_t;
  408. friend struct supervisor_t;
  409. template <typename T> friend struct request_builder_t;
  410. template <typename T, typename M> friend struct accessor_t;
  411. };
  412. } // namespace rotor
  413. #if defined(_MSC_VER)
  414. #pragma warning(pop)
  415. #endif