019-link-unlink.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. //
  2. // Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "rotor.hpp"
  7. #include "actor_test.h"
  8. #include "supervisor_test.h"
  9. #include "system_context_test.h"
  10. #include "access.h"
  11. namespace r = rotor;
  12. namespace rt = r::test;
  13. struct double_linked_actor_t : r::actor_base_t {
  14. using r::actor_base_t::actor_base_t;
  15. using message_ptr_t = r::intrusive_ptr_t<r::message::link_response_t>;
  16. struct resource {
  17. static const constexpr r::plugin::resource_id_t linking = 0;
  18. static const constexpr r::plugin::resource_id_t unlinking = 1;
  19. };
  20. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  21. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { alternative = p.create_address(); });
  22. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  23. p.subscribe_actor(&double_linked_actor_t::on_link_res, alternative);
  24. p.subscribe_actor(&double_linked_actor_t::on_unlink_req, alternative);
  25. for (auto i = 0; i < 2; ++i) {
  26. resources->acquire(resource::linking);
  27. request_via<r::payload::link_request_t>(target, alternative, false).send(rt::default_timeout);
  28. }
  29. });
  30. }
  31. void on_link_res(r::message::link_response_t &res) noexcept {
  32. resources->release(resource::linking);
  33. if (!message1)
  34. message1 = &res;
  35. else if (!message2)
  36. message2 = &res;
  37. }
  38. virtual void on_start() noexcept override {
  39. r::actor_base_t::on_start();
  40. resources->acquire(resource::unlinking);
  41. }
  42. void on_unlink_req(r::message::unlink_request_t &message) noexcept {
  43. reply_to(message, alternative);
  44. if (resources->has(resource::unlinking))
  45. resources->release(resource::unlinking);
  46. }
  47. r::address_ptr_t target;
  48. message_ptr_t message1, message2;
  49. r::address_ptr_t alternative;
  50. };
  51. struct tracked_actor_t : rt::actor_test_t {
  52. using rt::actor_test_t::actor_test_t;
  53. std::uint32_t shutdown_event = 0;
  54. };
  55. struct ignore_unlink_actor_t : rt::actor_test_t {
  56. using rt::actor_test_t::actor_test_t;
  57. r::address_ptr_t server_addr;
  58. bool on_unlink(const r::address_ptr_t &addr) noexcept override {
  59. server_addr = addr;
  60. return false;
  61. }
  62. };
  63. TEST_CASE("client/server, common workflow", "[actor]") {
  64. r::system_context_t system_context;
  65. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  66. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  67. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  68. auto &addr_s = act_s->get_address();
  69. bool invoked = false;
  70. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  71. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  72. p.link(addr_s, false, [&](auto &ee) mutable {
  73. REQUIRE(!ee);
  74. invoked = true;
  75. });
  76. });
  77. };
  78. sup->do_process();
  79. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  80. REQUIRE(invoked);
  81. SECTION("simultaneous shutdown") {
  82. sup->do_shutdown();
  83. sup->do_process();
  84. }
  85. SECTION("controlled shutdown") {
  86. SECTION("indirect client-initiated unlink via client-shutdown") {
  87. act_c->do_shutdown();
  88. sup->do_process();
  89. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  90. }
  91. SECTION("indirect client-initiated unlink via server-shutdown") {
  92. act_s->do_shutdown();
  93. sup->do_process();
  94. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  95. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  96. }
  97. sup->do_shutdown();
  98. sup->do_process();
  99. }
  100. }
  101. TEST_CASE("link not possible (timeout) => shutdown", "[actor]") {
  102. r::system_context_t system_context;
  103. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  104. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  105. auto some_addr = sup->make_address();
  106. bool invoked = false;
  107. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  108. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  109. p.link(some_addr, false, [&](auto &ec) mutable {
  110. REQUIRE(ec);
  111. invoked = true;
  112. });
  113. });
  114. };
  115. sup->do_process();
  116. REQUIRE(sup->get_state() == r::state_t::INITIALIZING);
  117. REQUIRE(sup->active_timers.size() == 3);
  118. auto timer_it = *(sup->active_timers.rbegin());
  119. sup->do_invoke_timer(timer_it->request_id);
  120. sup->do_process();
  121. REQUIRE(invoked);
  122. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  123. }
  124. #if 0
  125. TEST_CASE("link not possible => supervisor is shutted down", "[actor]") {
  126. r::system_context_t system_context;
  127. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  128. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  129. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  130. auto server_addr = act_s->get_address();
  131. act_c->link_request(server_addr, rt::default_timeout);
  132. sup->do_process();
  133. REQUIRE(act_c->get_state() == r::state_t::SHUT_DOWN);
  134. REQUIRE(act_s->get_state() == r::state_t::SHUT_DOWN);
  135. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  136. }
  137. #endif
  138. TEST_CASE("link (supervisor) not possible => supervisor is shutted down", "[actor]") {
  139. r::system_context_t system_context;
  140. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  141. auto addr = sup->create_address();
  142. sup->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  143. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr); });
  144. };
  145. sup->do_process();
  146. sup->do_invoke_timer((*sup->active_timers.begin())->request_id);
  147. sup->do_process();
  148. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  149. }
  150. TEST_CASE("unlink", "[actor]") {
  151. rt::system_context_test_t system_context;
  152. const char l1[] = "abc";
  153. const char l2[] = "def";
  154. auto sup1 =
  155. system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  156. auto sup2 = sup1->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  157. auto act_s = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  158. auto act_c = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  159. auto &addr_s = act_s->get_address();
  160. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  161. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, false, [&](auto &) {}); });
  162. };
  163. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  164. sup1->do_process();
  165. sup2->do_process();
  166. }
  167. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  168. SECTION("unlink failure") {
  169. act_s->do_shutdown();
  170. sup1->do_process();
  171. REQUIRE(sup1->active_timers.size() == 2);
  172. auto unlink_req = sup1->get_timer(1);
  173. sup1->do_invoke_timer(unlink_req);
  174. sup1->do_process();
  175. REQUIRE(system_context.reason->ec == r::error_code_t::request_timeout);
  176. REQUIRE(act_s->get_state() == r::state_t::SHUTTING_DOWN);
  177. act_s->force_cleanup();
  178. }
  179. SECTION("unlink-notify on unlink-request") {
  180. SECTION("client, then server") {
  181. act_s->do_shutdown();
  182. act_c->do_shutdown();
  183. sup2->do_process();
  184. sup1->do_process();
  185. sup2->do_process();
  186. sup1->do_process();
  187. }
  188. SECTION("server, then client") {
  189. act_s->do_shutdown();
  190. act_c->do_shutdown();
  191. sup1->do_process();
  192. sup2->do_process();
  193. sup1->do_process();
  194. sup2->do_process();
  195. }
  196. }
  197. sup1->do_shutdown();
  198. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  199. sup1->do_process();
  200. sup2->do_process();
  201. }
  202. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  203. }
  204. TEST_CASE("unlink reaction", "[actor]") {
  205. using request_ptr_t = r::intrusive_ptr_t<r::message::unlink_request_t>;
  206. rt::system_context_test_t system_context;
  207. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  208. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  209. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  210. auto &addr_s = act_s->get_address();
  211. request_ptr_t unlink_req;
  212. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  213. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  214. p.link(addr_s, false, [&](auto &) {});
  215. p.on_unlink([&](auto &req) {
  216. unlink_req = &req;
  217. p.forget_link(req);
  218. return true;
  219. });
  220. });
  221. };
  222. sup->do_process();
  223. act_s->do_shutdown();
  224. sup->do_process();
  225. REQUIRE(unlink_req);
  226. REQUIRE(unlink_req->message_type == r::message::unlink_request_t::message_type);
  227. sup->do_shutdown();
  228. sup->do_process();
  229. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  230. }
  231. TEST_CASE("auto-unlink on shutdown", "[actor]") {
  232. rt::system_context_test_t ctx1;
  233. rt::system_context_test_t ctx2;
  234. const char l1[] = "abc";
  235. const char l2[] = "def";
  236. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  237. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  238. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  239. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  240. auto &addr_s = act_s->get_address();
  241. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  242. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, false, [&](auto &) {}); });
  243. };
  244. sup1->do_process();
  245. REQUIRE(act_c->get_state() == r::state_t::INITIALIZING);
  246. act_c->do_shutdown();
  247. sup1->do_process();
  248. REQUIRE(act_c->get_state() == r::state_t::SHUT_DOWN);
  249. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  250. sup2->do_process();
  251. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  252. sup2->do_shutdown();
  253. sup2->do_process();
  254. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  255. }
  256. TEST_CASE("link to operational only", "[actor]") {
  257. rt::system_context_test_t ctx1;
  258. rt::system_context_test_t ctx2;
  259. rt::system_context_test_t ctx3;
  260. const char l1[] = "abc";
  261. const char l2[] = "def";
  262. const char l3[] = "ghi";
  263. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  264. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  265. auto sup3 = ctx3.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l3).finish();
  266. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  267. auto act_s1 = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  268. auto act_s2 = sup3->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  269. auto &addr_s1 = act_s1->get_address();
  270. auto &addr_s2 = act_s2->get_address();
  271. auto process_12 = [&]() {
  272. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  273. sup1->do_process();
  274. sup2->do_process();
  275. }
  276. };
  277. auto process_123 = [&]() {
  278. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty() ||
  279. !sup3->get_leader_queue().empty()) {
  280. sup1->do_process();
  281. sup2->do_process();
  282. sup3->do_process();
  283. }
  284. };
  285. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  286. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s1, true, [&](auto &) {}); });
  287. };
  288. act_s1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  289. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s2, true, [&](auto &) {}); });
  290. };
  291. process_12();
  292. CHECK(act_c->get_state() == r::state_t::INITIALIZING);
  293. CHECK(act_s1->get_state() == r::state_t::INITIALIZING);
  294. process_123();
  295. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  296. CHECK(act_s1->get_state() == r::state_t::OPERATIONAL);
  297. CHECK(act_s2->get_state() == r::state_t::OPERATIONAL);
  298. sup1->do_shutdown();
  299. sup2->do_shutdown();
  300. sup3->do_shutdown();
  301. process_123();
  302. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  303. CHECK(act_s1->get_state() == r::state_t::SHUT_DOWN);
  304. CHECK(act_s2->get_state() == r::state_t::SHUT_DOWN);
  305. }
  306. TEST_CASE("unlink notify / response race", "[actor]") {
  307. rt::system_context_test_t system_context;
  308. const char l1[] = "abc";
  309. const char l2[] = "def";
  310. auto sup1 =
  311. system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  312. auto sup2 = sup1->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  313. auto act_s = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  314. auto act_c = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  315. auto &addr_s = act_s->get_address();
  316. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  317. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, true, [&](auto &) {}); });
  318. };
  319. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  320. sup1->do_process();
  321. sup2->do_process();
  322. }
  323. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  324. act_s->do_shutdown();
  325. act_c->do_shutdown();
  326. sup1->do_process();
  327. // extract unlink request to let it produce unlink notify
  328. auto unlink_request = sup2->get_leader_queue().back();
  329. REQUIRE(unlink_request->type_index == r::message::unlink_request_t::message_type);
  330. sup2->get_leader_queue().pop_back();
  331. sup2->do_process();
  332. sup1->do_shutdown();
  333. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  334. sup1->do_process();
  335. sup2->do_process();
  336. }
  337. CHECK(sup1->active_timers.size() == 0);
  338. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  339. }
  340. TEST_CASE("link errors", "[actor]") {
  341. rt::system_context_test_t ctx1;
  342. rt::system_context_test_t ctx2;
  343. const char l1[] = "abc";
  344. const char l2[] = "def";
  345. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  346. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  347. auto process_12 = [&]() {
  348. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  349. sup1->do_process();
  350. sup2->do_process();
  351. }
  352. };
  353. SECTION("double link attempt") {
  354. auto act_c = sup1->create_actor<double_linked_actor_t>().timeout(rt::default_timeout).finish();
  355. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  356. act_c->target = act_s->get_address();
  357. process_12();
  358. REQUIRE(act_c->message1);
  359. CHECK(!act_c->message1->payload.ee);
  360. REQUIRE(act_c->message2);
  361. CHECK(act_c->message2->payload.ee);
  362. CHECK(act_c->message2->payload.ee->ec.message() == std::string("already linked"));
  363. }
  364. SECTION("not linkable") {
  365. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  366. sup2->do_process();
  367. act_s->access<rt::to::resources>()->acquire();
  368. act_s->do_shutdown();
  369. sup2->do_process();
  370. REQUIRE(act_s->get_state() == r::state_t::SHUTTING_DOWN);
  371. SECTION("check error") {
  372. r::extended_error_ptr_t err;
  373. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  374. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  375. plugin.with_casted<r::plugin::link_client_plugin_t>(
  376. [&](auto &p) { p.link(act_s->get_address(), false, [&](auto &ec) { err = ec; }); });
  377. };
  378. process_12();
  379. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  380. REQUIRE(err);
  381. CHECK(err->ec.message() == std::string("actor is not linkeable"));
  382. }
  383. SECTION("get the error during shutdown") {
  384. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  385. sup1->do_process();
  386. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  387. auto id = &std::as_const(r::plugin::link_client_plugin_t::class_identity);
  388. auto plugin1 = act_c->access<rt::to::get_plugin>(id);
  389. auto p1 = static_cast<r::plugin::link_client_plugin_t *>(plugin1);
  390. p1->link(act_s->get_address(), false, [&](auto &) {});
  391. act_c->access<rt::to::resources>()->acquire();
  392. act_c->do_shutdown();
  393. process_12();
  394. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  395. act_c->access<rt::to::resources>()->release();
  396. }
  397. act_s->access<rt::to::resources>()->release();
  398. }
  399. SECTION("unlink during shutting down") {
  400. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  401. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  402. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  403. plugin.with_casted<r::plugin::link_client_plugin_t>(
  404. [&](auto &p) { p.link(act_s->get_address(), false, [&](auto &) {}); });
  405. };
  406. process_12();
  407. CHECK(sup1->get_state() == r::state_t::OPERATIONAL);
  408. CHECK(sup2->get_state() == r::state_t::OPERATIONAL);
  409. act_c->do_shutdown();
  410. act_c->access<rt::to::resources>()->acquire();
  411. sup1->do_process();
  412. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  413. act_s->do_shutdown();
  414. sup2->do_process();
  415. sup1->do_process();
  416. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  417. act_c->access<rt::to::resources>()->release();
  418. process_12();
  419. }
  420. sup1->do_shutdown();
  421. sup2->do_shutdown();
  422. process_12();
  423. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  424. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  425. }
  426. TEST_CASE("proper shutdown order, defined by linkage", "[actor]") {
  427. r::system_context_t system_context;
  428. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  429. auto act_1 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  430. auto act_2 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  431. auto act_3 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  432. /*
  433. printf("a1 = %p(%p), a2 = %p(%p), a3 = %p(%p)\n", act_1.get(), act_1->get_address().get(),
  434. act_2.get(), act_2->get_address().get(), act_3.get(), act_3->get_address().get());
  435. */
  436. std::uint32_t event_id = 1;
  437. auto shutdowner = [&](auto &me) {
  438. auto &self = static_cast<tracked_actor_t &>(me);
  439. self.shutdown_event = event_id++;
  440. };
  441. act_1->shutdowner = act_2->shutdowner = act_3->shutdowner = shutdowner;
  442. act_1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  443. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  444. p.link(act_2->get_address(), false);
  445. p.link(act_3->get_address(), false);
  446. });
  447. };
  448. act_2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  449. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(act_3->get_address(), false); });
  450. };
  451. sup->do_process();
  452. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  453. sup->do_shutdown();
  454. sup->do_process();
  455. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  456. CHECK(act_1->shutdown_event == 1);
  457. CHECK(act_2->shutdown_event == 2);
  458. CHECK(act_3->shutdown_event == 3);
  459. }
  460. TEST_CASE("unlink of root supervisor", "[actor]") {
  461. rt::system_context_test_t ctx;
  462. rt::system_context_test_t ctx1;
  463. rt::system_context_test_t ctx2;
  464. const char l1[] = "abc";
  465. const char l2[] = "def";
  466. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  467. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  468. sup2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  469. plugin.with_casted<r::plugin::link_client_plugin_t>(
  470. [&](auto &p) { p.link(sup1->get_address(), false, [&](auto &) {}); });
  471. };
  472. auto process_12 = [&]() {
  473. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  474. sup1->do_process();
  475. sup2->do_process();
  476. }
  477. };
  478. process_12();
  479. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  480. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  481. sup1->do_shutdown();
  482. sup1->do_process();
  483. sup2->do_shutdown();
  484. process_12();
  485. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  486. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  487. }
  488. TEST_CASE("ignore unlink", "[actor]") {
  489. rt::system_context_test_t ctx;
  490. auto sup = ctx.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  491. auto act_c = sup->create_actor<ignore_unlink_actor_t>().timeout(rt::default_timeout).finish();
  492. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  493. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  494. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(act_s->get_address(), true); });
  495. };
  496. sup->do_process();
  497. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  498. act_s->do_shutdown();
  499. sup->do_process();
  500. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  501. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  502. CHECK(act_c->server_addr == act_s->get_address());
  503. sup->do_shutdown();
  504. sup->do_process();
  505. }
  506. TEST_CASE("unlink in supervisor", "[supervisor]") {
  507. rt::system_context_test_t ctx1;
  508. rt::system_context_test_t ctx2;
  509. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality("abc").finish();
  510. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality("def").finish();
  511. sup2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  512. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(sup1->get_address(), false); });
  513. };
  514. sup1->do_process();
  515. auto p = sup1->get_casted_plugin<r::plugin::resources_plugin_t>();
  516. REQUIRE(p);
  517. p->acquire(0);
  518. sup1->do_shutdown();
  519. sup1->do_process();
  520. sup2->do_process();
  521. sup2->do_shutdown();
  522. sup2->do_process();
  523. sup1->do_process();
  524. p->release(0);
  525. sup1->do_process();
  526. sup2->do_process();
  527. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  528. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  529. }