010-sup-start_stop.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. //
  2. // Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include <catch2/matchers/catch_matchers_string.hpp>
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "actor_test.h"
  10. #include "access.h"
  11. namespace r = rotor;
  12. namespace rt = rotor::test;
  13. using namespace Catch::Matchers;
  14. static std::uint32_t destroyed = 0;
  15. struct init_shutdown_plugin_t;
  16. namespace payload {
  17. struct sample_payload_t {};
  18. } // namespace payload
  19. namespace message {
  20. using sample_payload_t = r::message_t<payload::sample_payload_t>;
  21. }
  22. struct sample_sup_t : public rt::supervisor_test_t {
  23. using sup_base_t = rt::supervisor_test_t;
  24. using plugins_list_t = std::tuple<r::plugin::address_maker_plugin_t, r::plugin::locality_plugin_t,
  25. r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>,
  26. r::plugin::lifetime_plugin_t, init_shutdown_plugin_t, /* use custom */
  27. r::plugin::foreigners_support_plugin_t, r::plugin::child_manager_plugin_t,
  28. r::plugin::starter_plugin_t>;
  29. std::uint32_t initialized = 0;
  30. std::uint32_t init_invoked = 0;
  31. std::uint32_t shutdown_started = 0;
  32. std::uint32_t shutdown_finished = 0;
  33. std::uint32_t shutdown_conf_invoked = 0;
  34. r::address_ptr_t shutdown_addr;
  35. using rt::supervisor_test_t::supervisor_test_t;
  36. ~sample_sup_t() override { ++destroyed; }
  37. void do_initialize(r::system_context_t *ctx) noexcept override {
  38. ++initialized;
  39. sup_base_t::do_initialize(ctx);
  40. }
  41. void shutdown_finish() noexcept override {
  42. ++shutdown_finished;
  43. rt::supervisor_test_t::shutdown_finish();
  44. }
  45. };
  46. struct init_shutdown_plugin_t : r::plugin::init_shutdown_plugin_t {
  47. using parent_t = r::plugin::init_shutdown_plugin_t;
  48. void deactivate() noexcept override { parent_t::deactivate(); }
  49. bool handle_shutdown(r::message::shutdown_request_t *message) noexcept override {
  50. auto sup = static_cast<sample_sup_t *>(actor);
  51. sup->shutdown_started++;
  52. return parent_t::handle_shutdown(message);
  53. }
  54. bool handle_init(r::message::init_request_t *message) noexcept override {
  55. auto sup = static_cast<sample_sup_t *>(actor);
  56. sup->init_invoked++;
  57. return parent_t::handle_init(message);
  58. }
  59. };
  60. struct sample_plugin_t : r::plugin::plugin_base_t {
  61. using parent_t = r::plugin::plugin_base_t;
  62. static std::type_index class_id;
  63. const std::type_index &identity() const noexcept override { return class_id; }
  64. void activate(r::actor_base_t *actor_) noexcept override {
  65. parent_t::activate(actor_);
  66. auto info = subscribe(&sample_plugin_t::on_message);
  67. info->tag_io();
  68. info->tag_io(); // for better coverage
  69. }
  70. void deactivate() noexcept override { parent_t::deactivate(); }
  71. void on_message(message::sample_payload_t &) noexcept { message_received = true; }
  72. bool message_received = false;
  73. };
  74. std::type_index sample_plugin_t::class_id = typeid(sample_plugin_t);
  75. struct sample_sup2_t : public rt::supervisor_test_t {
  76. using sup_base_t = rt::supervisor_test_t;
  77. std::uint32_t initialized = 0;
  78. std::uint32_t init_invoked = 0;
  79. std::uint32_t shutdown_finished = 0;
  80. std::uint32_t shutdown_conf_invoked = 0;
  81. r::address_ptr_t shutdown_addr;
  82. actor_base_t *init_child = nullptr;
  83. actor_base_t *shutdown_child = nullptr;
  84. r::extended_error_ptr_t init_ec;
  85. using rt::supervisor_test_t::supervisor_test_t;
  86. ~sample_sup2_t() override { ++destroyed; }
  87. void do_initialize(r::system_context_t *ctx) noexcept override {
  88. ++initialized;
  89. sup_base_t::do_initialize(ctx);
  90. }
  91. void init_finish() noexcept override {
  92. ++init_invoked;
  93. sup_base_t::init_finish();
  94. }
  95. virtual void shutdown_finish() noexcept override {
  96. ++shutdown_finished;
  97. rt::supervisor_test_t::shutdown_finish();
  98. }
  99. void on_child_init(actor_base_t *actor, const r::extended_error_ptr_t &ec) noexcept override {
  100. init_child = actor;
  101. init_ec = ec;
  102. }
  103. void on_child_shutdown(actor_base_t *actor) noexcept override { shutdown_child = actor; }
  104. };
  105. struct sample_sup3_t : public rt::supervisor_test_t {
  106. using sup_base_t = rt::supervisor_test_t;
  107. using rt::supervisor_test_t::supervisor_test_t;
  108. std::uint32_t received = 0;
  109. void make_subscription() noexcept {
  110. subscribe(&sample_sup3_t::on_sample);
  111. send<payload::sample_payload_t>(address);
  112. }
  113. void on_sample(message::sample_payload_t &) noexcept { ++received; }
  114. };
  115. struct sample_sup4_t : public rt::supervisor_test_t {
  116. using sup_base_t = rt::supervisor_test_t;
  117. using rt::supervisor_test_t::supervisor_test_t;
  118. std::uint32_t counter = 0;
  119. void intercept(r::message_ptr_t &, const void *tag, const r::continuation_t &continuation) noexcept override {
  120. CHECK(tag == rotor::tags::io);
  121. if (++counter % 2) {
  122. continuation();
  123. }
  124. }
  125. };
  126. struct unsubscriber_sup_t : public rt::supervisor_test_t {
  127. using sup_base_t = rt::supervisor_test_t;
  128. using rt::supervisor_test_t::supervisor_test_t;
  129. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  130. plugin.with_casted<r::plugin::starter_plugin_t>(
  131. [](auto &p) { p.subscribe_actor(&unsubscriber_sup_t::on_sample); });
  132. }
  133. void on_start() noexcept override {
  134. rt::supervisor_test_t::on_start();
  135. unsubscribe(&unsubscriber_sup_t::on_sample);
  136. }
  137. void on_sample(message::sample_payload_t &) noexcept {}
  138. };
  139. struct sample_actor_t : public r::actor_base_t {
  140. using r::actor_base_t::actor_base_t;
  141. };
  142. struct sample_actor2_t : public rt::actor_test_t {
  143. using rt::actor_test_t::actor_test_t;
  144. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  145. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
  146. alternative = p.create_address();
  147. p.set_identity("specific_name", false);
  148. });
  149. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  150. p.subscribe_actor(&sample_actor2_t::on_link, alternative);
  151. send<payload::sample_payload_t>(alternative);
  152. });
  153. }
  154. void on_link(message::sample_payload_t &) noexcept { ++received; }
  155. r::address_ptr_t alternative;
  156. int received = 0;
  157. };
  158. struct sample_actor3_t : public rt::actor_test_t {
  159. using rt::actor_test_t::actor_test_t;
  160. void shutdown_start() noexcept override {
  161. rt::actor_test_t::shutdown_start();
  162. resources->acquire();
  163. }
  164. };
  165. struct sample_actor4_t : public rt::actor_test_t {
  166. using rt::actor_test_t::actor_test_t;
  167. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  168. rt::actor_test_t::configure(plugin);
  169. plugin.with_casted<r::plugin::starter_plugin_t>(
  170. [&](auto &p) { p.subscribe_actor(&sample_actor4_t::on_message)->tag_io(); });
  171. }
  172. void on_start() noexcept override {
  173. rt::actor_test_t::on_start();
  174. send<payload::sample_payload_t>(get_address());
  175. send<payload::sample_payload_t>(get_address());
  176. }
  177. void on_message(message::sample_payload_t &) noexcept { ++received; }
  178. std::size_t received = 0;
  179. };
  180. struct sample_actor5_t : public rt::actor_test_t {
  181. using rt::actor_test_t::actor_test_t;
  182. // clang-format off
  183. using plugins_list_t = std::tuple<
  184. r::plugin::address_maker_plugin_t,
  185. r::plugin::lifetime_plugin_t,
  186. r::plugin::init_shutdown_plugin_t,
  187. r::plugin::link_server_plugin_t,
  188. r::plugin::link_client_plugin_t,
  189. r::plugin::registry_plugin_t,
  190. r::plugin::resources_plugin_t,
  191. r::plugin::starter_plugin_t,
  192. sample_plugin_t
  193. >;
  194. // clang-format on
  195. void on_start() noexcept override {
  196. rt::actor_test_t::on_start();
  197. send<payload::sample_payload_t>(get_address());
  198. send<payload::sample_payload_t>(get_address());
  199. }
  200. };
  201. struct sample_actor6_t : public rt::actor_test_t {
  202. using rt::actor_test_t::actor_test_t;
  203. void on_start() noexcept override {
  204. rt::actor_test_t::on_start();
  205. start_timer(r::pt::minutes(1), *this, &sample_actor6_t::on_timer);
  206. }
  207. void on_timer(r::request_id_t, bool cancelled_) noexcept { cancelled = cancelled_; }
  208. bool cancelled = false;
  209. };
  210. struct sample_actor7_t : public rt::actor_test_t {
  211. using rt::actor_test_t::actor_test_t;
  212. void on_start() noexcept override {
  213. rt::actor_test_t::on_start();
  214. start_timer(r::pt::minutes(1), *this,
  215. [](sample_actor7_t *actor, r::request_id_t, bool cancelled) { actor->cancelled = cancelled; });
  216. }
  217. void on_timer(r::request_id_t, bool cancelled_) noexcept { cancelled = cancelled_; }
  218. bool cancelled = false;
  219. };
  220. struct sample_actor8_t : public rt::actor_test_t {
  221. using rt::actor_test_t::actor_test_t;
  222. r::message_ptr_t msg;
  223. void on_start() noexcept override {
  224. rt::actor_test_t::on_start();
  225. subscribe(&sample_actor8_t::on_message);
  226. do_shutdown();
  227. }
  228. void shutdown_start() noexcept override {
  229. auto sup = static_cast<rt::supervisor_test_t *>(supervisor);
  230. sup->get_leader_queue().push_back(std::move(msg));
  231. rt::actor_test_t::shutdown_start();
  232. }
  233. void on_message(message::sample_payload_t &) noexcept {}
  234. };
  235. TEST_CASE("on_initialize, on_start, simple on_shutdown (handled by plugin)", "[supervisor]") {
  236. destroyed = 0;
  237. r::system_context_t *system_context = new r::system_context_t{};
  238. auto sup = system_context->create_supervisor<sample_sup_t>().timeout(rt::default_timeout).finish();
  239. REQUIRE(&sup->get_supervisor() == sup.get());
  240. REQUIRE(sup->initialized == 1);
  241. auto &identity = sup->get_identity();
  242. CHECK_THAT(identity, StartsWith("supervisor"));
  243. sup->do_process();
  244. CHECK(sup->init_invoked == 1);
  245. CHECK(sup->shutdown_started == 0);
  246. CHECK(sup->shutdown_conf_invoked == 0);
  247. CHECK(sup->active_timers.size() == 0);
  248. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  249. sup->do_shutdown();
  250. sup->do_process();
  251. REQUIRE(sup->shutdown_started == 1);
  252. REQUIRE(sup->shutdown_finished == 1);
  253. REQUIRE(sup->active_timers.size() == 0);
  254. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  255. REQUIRE(sup->get_leader_queue().size() == 0);
  256. REQUIRE(sup->get_points().size() == 0);
  257. REQUIRE(sup->get_shutdown_reason()->ec.message() == "normal shutdown");
  258. CHECK(rt::empty(sup->get_subscription()));
  259. REQUIRE(destroyed == 0);
  260. delete system_context;
  261. sup->shutdown_addr.reset();
  262. sup.reset();
  263. REQUIRE(destroyed == 1);
  264. }
  265. TEST_CASE("on_initialize, on_start, simple on_shutdown", "[supervisor]") {
  266. destroyed = 0;
  267. r::system_context_t *system_context = new r::system_context_t{};
  268. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  269. REQUIRE(&sup->get_supervisor() == sup.get());
  270. REQUIRE(sup->initialized == 1);
  271. REQUIRE(sup->init_child == nullptr);
  272. sup->do_process();
  273. REQUIRE(sup->init_invoked == 1);
  274. REQUIRE(sup->shutdown_conf_invoked == 0);
  275. REQUIRE(sup->active_timers.size() == 0);
  276. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  277. sup->do_shutdown();
  278. sup->do_process();
  279. REQUIRE(sup->shutdown_finished == 1);
  280. REQUIRE(sup->active_timers.size() == 0);
  281. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  282. REQUIRE(sup->get_leader_queue().size() == 0);
  283. REQUIRE(sup->get_points().size() == 0);
  284. CHECK(rt::empty(sup->get_subscription()));
  285. REQUIRE(sup->shutdown_child == nullptr);
  286. REQUIRE(destroyed == 0);
  287. delete system_context;
  288. sup->shutdown_addr.reset();
  289. sup.reset();
  290. REQUIRE(destroyed == 1);
  291. }
  292. TEST_CASE("start/shutdown 1 child & 1 supervisor", "[supervisor]") {
  293. r::system_context_ptr_t system_context = new r::system_context_t();
  294. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  295. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  296. CHECK_THAT(act->get_identity(), StartsWith("actor"));
  297. /* for better coverage */
  298. auto last = sup->access<rt::to::last_req_id>();
  299. auto &request_map = sup->access<rt::to::request_map>();
  300. request_map[last + 1] = r::request_curry_t();
  301. sup->do_process();
  302. request_map.clear();
  303. CHECK(sup->access<rt::to::last_req_id>() > 1);
  304. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  305. CHECK(act->access<rt::to::state>() == r::state_t::OPERATIONAL);
  306. CHECK(act->access<rt::to::resources>()->has() == 0);
  307. CHECK(sup->init_child == act.get());
  308. CHECK(!sup->init_ec);
  309. CHECK(sup->shutdown_child == nullptr);
  310. sup->do_shutdown();
  311. sup->do_process();
  312. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  313. CHECK(act->access<rt::to::state>() == r::state_t::SHUT_DOWN);
  314. CHECK(sup->shutdown_child == act.get());
  315. auto &reason = sup->shutdown_child->get_shutdown_reason();
  316. REQUIRE(reason);
  317. CHECK(reason->ec == r::shutdown_code_t::supervisor_shutdown);
  318. CHECK_THAT(reason->message(), Catch::Matchers::ContainsSubstring("shutdown has been requested by supervisor"));
  319. CHECK_THAT(reason->message(), Catch::Matchers::ContainsSubstring("normal shutdown"));
  320. auto &root = reason->next;
  321. CHECK(root);
  322. CHECK(root->ec.value() == static_cast<int>(r::shutdown_code_t::normal));
  323. CHECK(!root->next);
  324. }
  325. TEST_CASE("custom subscription", "[supervisor]") {
  326. r::system_context_ptr_t system_context = new r::system_context_t();
  327. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  328. sup->do_process();
  329. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  330. sup->make_subscription();
  331. sup->do_process();
  332. CHECK(sup->received == 1);
  333. sup->do_shutdown();
  334. sup->do_process();
  335. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  336. }
  337. TEST_CASE("shutdown immediately", "[supervisor]") {
  338. r::system_context_ptr_t system_context = new r::system_context_t();
  339. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  340. sup->do_shutdown();
  341. sup->do_process();
  342. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  343. }
  344. TEST_CASE("self unsubscriber", "[actor]") {
  345. r::system_context_ptr_t system_context = new r::system_context_t();
  346. auto sup = system_context->create_supervisor<unsubscriber_sup_t>().timeout(rt::default_timeout).finish();
  347. sup->do_process();
  348. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  349. sup->do_shutdown();
  350. sup->do_process();
  351. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  352. }
  353. TEST_CASE("alternative address subscriber", "[actor]") {
  354. r::system_context_ptr_t system_context = new r::system_context_t();
  355. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  356. auto act = sup->create_actor<sample_actor2_t>().timeout(rt::default_timeout).finish();
  357. CHECK(act->get_identity() == "specific_name");
  358. sup->do_process();
  359. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  360. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  361. CHECK(act->received == 1);
  362. sup->do_shutdown();
  363. sup->do_process();
  364. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  365. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  366. }
  367. TEST_CASE("acquire resources on shutdown start", "[actor]") {
  368. r::system_context_ptr_t system_context = new r::system_context_t();
  369. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  370. auto act = sup->create_actor<sample_actor3_t>().timeout(rt::default_timeout).finish();
  371. sup->do_process();
  372. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  373. sup->do_shutdown();
  374. sup->do_process();
  375. CHECK(act->get_state() == r::state_t::SHUTTING_DOWN);
  376. act->access<rt::to::resources>()->release();
  377. sup->do_process();
  378. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  379. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  380. }
  381. TEST_CASE("io tagging & intercepting", "[actor]") {
  382. r::system_context_ptr_t system_context = new r::system_context_t();
  383. auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
  384. auto act = sup->create_actor<sample_actor4_t>().timeout(rt::default_timeout).finish();
  385. sup->do_process();
  386. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  387. CHECK(act->received == 1);
  388. CHECK(sup->counter == 2);
  389. sup->do_shutdown();
  390. sup->do_process();
  391. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  392. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  393. }
  394. TEST_CASE("io tagging (in plugin) & intercepting", "[actor]") {
  395. r::system_context_ptr_t system_context = new r::system_context_t();
  396. auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
  397. auto act = sup->create_actor<sample_actor5_t>().timeout(rt::default_timeout).finish();
  398. sup->do_process();
  399. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  400. CHECK(sup->counter == 2);
  401. auto plugin = act->access<rt::to::get_plugin>(&std::as_const(sample_plugin_t::class_id));
  402. CHECK(plugin);
  403. CHECK(static_cast<sample_plugin_t *>(plugin)->message_received);
  404. sup->do_shutdown();
  405. sup->do_process();
  406. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  407. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  408. }
  409. TEST_CASE("timers cancellation (1)", "[actor]") {
  410. r::system_context_ptr_t system_context = new r::system_context_t();
  411. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  412. auto act = sup->create_actor<sample_actor6_t>().timeout(rt::default_timeout).finish();
  413. sup->do_process();
  414. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  415. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  416. CHECK(!act->access<rt::to::timers_map>().empty());
  417. sup->do_shutdown();
  418. sup->do_process();
  419. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  420. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  421. CHECK(act->access<rt::to::timers_map>().empty());
  422. }
  423. TEST_CASE("timers cancellation (2)", "[actor]") {
  424. r::system_context_ptr_t system_context = new r::system_context_t();
  425. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  426. auto act = sup->create_actor<sample_actor7_t>().timeout(rt::default_timeout).finish();
  427. sup->do_process();
  428. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  429. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  430. CHECK(!act->access<rt::to::timers_map>().empty());
  431. sup->do_shutdown();
  432. sup->do_process();
  433. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  434. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  435. CHECK(act->access<rt::to::timers_map>().empty());
  436. }
  437. TEST_CASE("subscription confirmation arrives on non-init phase", "[actor]") {
  438. r::system_context_ptr_t system_context = new r::system_context_t();
  439. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  440. auto act = sup->create_actor<sample_actor8_t>().timeout(rt::default_timeout).finish();
  441. auto act_configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  442. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  443. p.subscribe_actor(r::lambda<message::sample_payload_t>([](message::sample_payload_t &) noexcept { ; }));
  444. auto req = sup->get_leader_queue().back();
  445. sup->get_leader_queue().pop_back();
  446. act->msg = std::move(req);
  447. act->do_shutdown();
  448. });
  449. };
  450. act->configurer = act_configurer;
  451. sup->do_process();
  452. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  453. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  454. }