020-two-supervisors.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "rotor.hpp"
  7. #include "supervisor_test.h"
  8. #include "access.h"
  9. namespace r = rotor;
  10. namespace rt = r::test;
  11. static size_t destroyed = 0;
  12. struct my_supervisor_t : public rt::supervisor_test_t {
  13. using rt::supervisor_test_t::supervisor_test_t;
  14. // clang-format off
  15. using plugins_list_t = std::tuple<
  16. r::plugin::address_maker_plugin_t,
  17. r::plugin::locality_plugin_t,
  18. r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>, // for coverage
  19. r::plugin::lifetime_plugin_t,
  20. r::plugin::init_shutdown_plugin_t,
  21. r::plugin::foreigners_support_plugin_t,
  22. r::plugin::child_manager_plugin_t,
  23. r::plugin::link_server_plugin_t,
  24. r::plugin::link_client_plugin_t,
  25. r::plugin::registry_plugin_t,
  26. r::plugin::starter_plugin_t>;
  27. // clang-format on
  28. void init_start() noexcept override {
  29. rt::supervisor_test_t::init_start();
  30. assert(state == r::state_t::INITIALIZING);
  31. init_start_count++;
  32. }
  33. void init_finish() noexcept override {
  34. rt::supervisor_test_t::init_finish();
  35. assert(state == r::state_t::INITIALIZED);
  36. init_finish_count++;
  37. }
  38. void shutdown_start() noexcept override {
  39. rt::supervisor_test_t::shutdown_start();
  40. shutdown_start_count++;
  41. assert(state == r::state_t::SHUTTING_DOWN);
  42. }
  43. void shutdown_finish() noexcept override {
  44. rt::supervisor_test_t::shutdown_finish();
  45. shutdown_finish_count++;
  46. assert(state == r::state_t::SHUT_DOWN);
  47. }
  48. ~my_supervisor_t() { ++destroyed; }
  49. std::uint32_t init_start_count = 0;
  50. std::uint32_t init_finish_count = 0;
  51. std::uint32_t shutdown_start_count = 0;
  52. std::uint32_t shutdown_finish_count = 0;
  53. };
  54. TEST_CASE("two supervisors, different localities, shutdown 2nd", "[supervisor]") {
  55. r::system_context_t system_context;
  56. const char locality1[] = "abc";
  57. const char locality2[] = "def";
  58. auto sup1 =
  59. system_context.create_supervisor<my_supervisor_t>().locality(locality1).timeout(rt::default_timeout).finish();
  60. auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality2).timeout(rt::default_timeout).finish();
  61. REQUIRE(&sup2->get_supervisor() == sup2.get());
  62. REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
  63. sup1->do_process();
  64. REQUIRE(sup1->get_state() == r::state_t::INITIALIZING);
  65. REQUIRE(sup2->get_state() == r::state_t::INITIALIZING);
  66. REQUIRE(sup1->init_start_count == 1);
  67. REQUIRE(sup1->init_finish_count == 0);
  68. REQUIRE(sup2->init_start_count == 1);
  69. REQUIRE(sup2->init_finish_count == 0);
  70. sup2->do_process();
  71. REQUIRE(sup1->get_state() == r::state_t::INITIALIZING);
  72. REQUIRE(sup2->get_state() == r::state_t::INITIALIZED);
  73. REQUIRE(sup1->init_start_count == 1);
  74. REQUIRE(sup1->init_finish_count == 0);
  75. REQUIRE(sup2->init_start_count == 1);
  76. REQUIRE(sup2->init_finish_count == 1);
  77. sup1->do_process();
  78. REQUIRE(sup1->init_start_count == 1);
  79. REQUIRE(sup1->init_finish_count == 1);
  80. REQUIRE(sup2->init_start_count == 1);
  81. REQUIRE(sup2->init_finish_count == 1);
  82. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  83. REQUIRE(sup2->get_state() == r::state_t::INITIALIZED);
  84. REQUIRE(sup1->shutdown_start_count == 0);
  85. sup2->do_process();
  86. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  87. REQUIRE(sup1->init_start_count == 1);
  88. REQUIRE(sup1->init_finish_count == 1);
  89. REQUIRE(sup2->init_start_count == 1);
  90. REQUIRE(sup2->init_finish_count == 1);
  91. REQUIRE(sup2->shutdown_start_count == 0);
  92. REQUIRE(sup1->shutdown_start_count == 0);
  93. sup2->do_shutdown();
  94. sup2->do_process();
  95. REQUIRE(sup1->shutdown_start_count == 0);
  96. REQUIRE(sup1->shutdown_finish_count == 0);
  97. REQUIRE(sup2->shutdown_start_count == 0);
  98. REQUIRE(sup2->shutdown_finish_count == 0);
  99. sup1->do_process();
  100. REQUIRE(sup1->shutdown_start_count == 0);
  101. REQUIRE(sup1->shutdown_finish_count == 0);
  102. REQUIRE(sup2->shutdown_start_count == 0);
  103. REQUIRE(sup2->shutdown_finish_count == 0);
  104. sup2->do_process();
  105. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  106. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  107. REQUIRE(sup1->shutdown_start_count == 0);
  108. REQUIRE(sup1->shutdown_finish_count == 0);
  109. REQUIRE(sup2->shutdown_start_count == 1);
  110. REQUIRE(sup2->shutdown_finish_count == 1);
  111. sup1->do_process();
  112. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  113. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  114. REQUIRE(sup1->shutdown_start_count == 0);
  115. REQUIRE(sup1->shutdown_finish_count == 0);
  116. REQUIRE(sup2->shutdown_start_count == 1);
  117. REQUIRE(sup2->shutdown_finish_count == 1);
  118. sup1->do_shutdown();
  119. sup1->do_process();
  120. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  121. REQUIRE(sup1->shutdown_start_count == 1);
  122. REQUIRE(sup1->shutdown_finish_count == 1);
  123. REQUIRE(sup1->get_leader_queue().size() == 0);
  124. REQUIRE(sup1->get_points().size() == 0);
  125. REQUIRE(rt::empty(sup1->get_subscription()));
  126. REQUIRE(sup2->get_leader_queue().size() == 0);
  127. REQUIRE(sup2->get_points().size() == 0);
  128. REQUIRE(rt::empty(sup2->get_subscription()));
  129. }
  130. TEST_CASE("two supervisors, different localities, shutdown 1st", "[supervisor]") {
  131. r::system_context_t system_context;
  132. const char locality1[] = "abc";
  133. const char locality2[] = "def";
  134. auto sup1 =
  135. system_context.create_supervisor<my_supervisor_t>().locality(locality1).timeout(rt::default_timeout).finish();
  136. auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality2).timeout(rt::default_timeout).finish();
  137. REQUIRE(&sup2->get_supervisor() == sup2.get());
  138. REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
  139. sup1->do_process();
  140. sup2->do_process();
  141. sup1->do_process();
  142. sup2->do_process();
  143. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  144. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  145. sup1->do_shutdown();
  146. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  147. sup1->do_process();
  148. sup2->do_process();
  149. }
  150. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  151. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  152. REQUIRE(sup1->get_leader_queue().size() == 0);
  153. REQUIRE(sup1->get_points().size() == 0);
  154. REQUIRE(rt::empty(sup1->get_subscription()));
  155. REQUIRE(sup2->get_leader_queue().size() == 0);
  156. REQUIRE(sup2->get_points().size() == 0);
  157. REQUIRE(rt::empty(sup2->get_subscription()));
  158. }
  159. TEST_CASE("two supervisors & external subscription", "[supervisor]") {
  160. rt::system_test_context_t ctx1;
  161. rt::system_test_context_t ctx2;
  162. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  163. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>()
  164. .configurer([&](auto &, r::plugin::plugin_base_t &plugin) {
  165. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  166. using message_t = rt::message::sample_t;
  167. auto lambda = r::lambda<message_t>([](message_t &) noexcept { ; });
  168. p.subscribe_actor(lambda, sup1->get_address());
  169. });
  170. })
  171. .timeout(rt::default_timeout)
  172. .finish();
  173. auto process = [&]() {
  174. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  175. bool progress = false;
  176. if (sup1->get_state() != r::state_t::SHUT_DOWN && !sup1->get_leader_queue().empty()) {
  177. auto msg = &sup1->get_leader_queue().front();
  178. sup1->do_process();
  179. progress = sup1->get_leader_queue().empty() || (msg != &sup1->get_leader_queue().front());
  180. }
  181. if (sup2->get_state() != r::state_t::SHUT_DOWN && !sup2->get_leader_queue().empty()) {
  182. auto msg = &sup2->get_leader_queue().front();
  183. sup2->do_process();
  184. progress = sup1->get_leader_queue().empty() || (msg != &sup1->get_leader_queue().front());
  185. }
  186. if (!progress) {
  187. break;
  188. }
  189. }
  190. };
  191. SECTION("server-client shutdown order") {
  192. process();
  193. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  194. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  195. sup1->do_shutdown();
  196. while (sup1->get_state() != r::state_t::SHUT_DOWN) {
  197. sup1->do_process();
  198. }
  199. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  200. CHECK(sup2->get_state() == r::state_t::OPERATIONAL);
  201. sup2->do_shutdown();
  202. process();
  203. }
  204. SECTION("client-server shutdown order, with message in progress") {
  205. process();
  206. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  207. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  208. sup2->do_shutdown();
  209. while (sup2->get_state() != r::state_t::SHUT_DOWN) {
  210. sup2->do_process();
  211. }
  212. CHECK(sup1->get_state() == r::state_t::OPERATIONAL);
  213. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  214. auto msg = sup1->get_leader_queue().front();
  215. sup1->get_leader_queue().pop_front();
  216. process();
  217. sup1->send<rt::payload::sample_t>(sup1->get_address(), 5);
  218. process();
  219. sup1->get_leader_queue().push_back(std::move(msg));
  220. sup1->do_shutdown();
  221. process();
  222. }
  223. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  224. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  225. }
  226. TEST_CASE("two supervisors, same locality", "[supervisor]") {
  227. r::system_context_ptr_t system_context = new r::system_context_t();
  228. auto mark = destroyed;
  229. const char locality[] = "locality";
  230. auto sup1 =
  231. system_context->create_supervisor<my_supervisor_t>().locality(locality).timeout(rt::default_timeout).finish();
  232. auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality).timeout(rt::default_timeout).finish();
  233. REQUIRE(&sup2->get_supervisor() == sup2.get());
  234. REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
  235. sup1->do_process();
  236. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  237. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  238. REQUIRE(sup1->init_start_count == 1);
  239. REQUIRE(sup1->init_finish_count == 1);
  240. REQUIRE(sup2->init_start_count == 1);
  241. REQUIRE(sup2->init_finish_count == 1);
  242. sup1->do_shutdown();
  243. sup1->do_process();
  244. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  245. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  246. REQUIRE(sup1->shutdown_start_count == 1);
  247. REQUIRE(sup1->get_leader_queue().size() == 0);
  248. REQUIRE(sup1->get_points().size() == 0);
  249. REQUIRE(rt::empty(sup1->get_subscription()));
  250. REQUIRE(sup2->get_leader_queue().size() == 0);
  251. REQUIRE(sup2->get_points().size() == 0);
  252. REQUIRE(rt::empty(sup2->get_subscription()));
  253. system_context.reset();
  254. sup1.reset();
  255. sup2.reset();
  256. REQUIRE(mark + 2 == destroyed);
  257. }
  258. TEST_CASE("two supervisors, down internal first, same locality", "[supervisor]") {
  259. r::system_context_t system_context;
  260. const char locality[] = "locality";
  261. auto sup1 =
  262. system_context.create_supervisor<my_supervisor_t>().timeout(rt::default_timeout).locality(locality).finish();
  263. auto sup2 = sup1->create_actor<my_supervisor_t>().timeout(rt::default_timeout).locality(locality).finish();
  264. REQUIRE(&sup2->get_supervisor() == sup2.get());
  265. REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
  266. sup1->do_process();
  267. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  268. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  269. // for better coverage
  270. auto &address = static_cast<r::actor_base_t *>(sup2.get())->get_address();
  271. auto ec = r::make_error_code(r::error_code_t::success);
  272. auto reason = r::make_error("some-ctx", ec);
  273. sup2->send<r::payload::shutdown_trigger_t>(address, address, reason);
  274. sup1->do_process();
  275. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  276. CHECK(sup2->get_shutdown_reason()->root()->ec.message() == "success");
  277. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  278. sup1->do_shutdown();
  279. sup1->do_process();
  280. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  281. REQUIRE(sup1->get_leader_queue().size() == 0);
  282. REQUIRE(sup1->get_points().size() == 0);
  283. REQUIRE(rt::empty(sup1->get_subscription()));
  284. REQUIRE(sup2->get_leader_queue().size() == 0);
  285. REQUIRE(sup2->get_points().size() == 0);
  286. REQUIRE(rt::empty(sup2->get_subscription()));
  287. }
  288. TEST_CASE("message arrival order", "[supervisor]") {
  289. r::system_context_t system_context;
  290. int model = 0;
  291. int states[] = {0, 0};
  292. auto sup1 = system_context.create_supervisor<rt::supervisor_test_t>()
  293. .timeout(rt::default_timeout)
  294. .configurer([&](auto &sup, r::plugin::plugin_base_t &plugin) {
  295. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  296. using message_t = rt::message::sample_t;
  297. auto lambda = r::lambda<message_t>([&](message_t &) noexcept {
  298. printf("sup1\n");
  299. if (!states[0]) {
  300. model += 1;
  301. } else {
  302. model *= 2;
  303. }
  304. ++states[0];
  305. });
  306. p.subscribe_actor(lambda, sup.get_address());
  307. });
  308. })
  309. .finish();
  310. auto sup2 = sup1->create_actor<rt::supervisor_test_t>()
  311. .timeout(rt::default_timeout)
  312. .configurer([&](auto &, r::plugin::plugin_base_t &plugin) {
  313. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  314. using message_t = rt::message::sample_t;
  315. auto lambda = r::lambda<message_t>([&](message_t &) noexcept {
  316. printf("sup2\n");
  317. if (!states[1]) {
  318. model += 2;
  319. } else {
  320. model *= 3;
  321. }
  322. ++states[1];
  323. });
  324. printf("sup2-sb\n");
  325. auto addr = sup1->get_address();
  326. p.subscribe_actor(lambda, addr);
  327. printf("sup2-sb\n");
  328. });
  329. })
  330. .finish();
  331. sup1->do_process();
  332. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  333. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  334. sup1->send<rt::payload::sample_t>(sup1->get_address(), 0);
  335. sup1->send<rt::payload::sample_t>(sup1->get_address(), 0);
  336. sup1->do_process();
  337. CHECK(model == ((0 + 1 + 2) * 2 * 3));
  338. sup1->do_shutdown();
  339. sup1->do_process();
  340. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  341. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  342. }