030-registry.cpp 21 KB


  1. //
  2. // Copyright (c) 2019-2023 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "access.h"
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "actor_test.h"
  10. namespace r = rotor;
  11. namespace rt = r::test;
  12. struct manual_actor_t : public r::actor_base_t {
  13. using r::actor_base_t::actor_base_t;
  14. // no registry plugin
  15. // clang-format off
  16. using plugins_list_t = std::tuple<
  17. r::plugin::address_maker_plugin_t,
  18. r::plugin::lifetime_plugin_t,
  19. r::plugin::init_shutdown_plugin_t,
  20. r::plugin::starter_plugin_t>;
  21. // clang-format on
  22. using discovery_reply_t = r::intrusive_ptr_t<r::message::discovery_response_t>;
  23. using future_reply_t = r::intrusive_ptr_t<r::message::discovery_future_t>;
  24. using registration_reply_t = r::intrusive_ptr_t<r::message::registration_response_t>;
  25. r::address_ptr_t registry_addr;
  26. discovery_reply_t discovery_reply;
  27. future_reply_t future_reply;
  28. registration_reply_t registration_reply;
  29. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  30. r::actor_base_t::configure(plugin);
  31. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  32. p.subscribe_actor(&manual_actor_t::on_discovery);
  33. p.subscribe_actor(&manual_actor_t::on_registration_reply);
  34. p.subscribe_actor(&manual_actor_t::on_future);
  35. });
  36. }
  37. void query_name(const std::string &name) {
  38. request<r::payload::discovery_request_t>(registry_addr, name).send(rt::default_timeout);
  39. }
  40. r::request_id_t promise_name(const std::string &name) {
  41. return request<r::payload::discovery_promise_t>(registry_addr, name).send(rt::default_timeout);
  42. }
  43. void cancel_name(r::request_id_t request_id) {
  44. using payload_t = r::message::discovery_cancel_t::payload_t;
  45. send<payload_t>(registry_addr, request_id, address);
  46. }
  47. void register_name(const std::string &name) {
  48. request<r::payload::registration_request_t>(registry_addr, name, address).send(rt::default_timeout);
  49. }
  50. void unregister_all() { send<r::payload::deregistration_notify_t>(registry_addr, address); }
  51. void unregister_name(const std::string &name) { send<r::payload::deregistration_service_t>(registry_addr, name); }
  52. void on_discovery(r::message::discovery_response_t &reply) noexcept { discovery_reply.reset(&reply); }
  53. void on_future(r::message::discovery_future_t &reply) noexcept { future_reply.reset(&reply); }
  54. void on_registration_reply(r::message::registration_response_t &reply) noexcept {
  55. registration_reply.reset(&reply);
  56. }
  57. };
  58. struct sample_actor_t : rt::actor_test_t {
  59. using rt::actor_test_t::actor_test_t;
  60. r::address_ptr_t service_addr;
  61. };
  62. TEST_CASE("supervisor related tests", "[registry][supervisor]") {
  63. r::system_context_t system_context;
  64. rt::supervisor_test_ptr_t sup;
  65. SECTION("no registry on supervisor by default") {
  66. sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  67. sup->do_process();
  68. CHECK(!sup->access<rt::to::registry>());
  69. }
  70. SECTION("registry is created, when asked") {
  71. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  72. .timeout(rt::default_timeout)
  73. .create_registry(true)
  74. .finish();
  75. sup->do_process();
  76. CHECK(sup->access<rt::to::registry>());
  77. }
  78. SECTION("registry is inherited") {
  79. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  80. .timeout(rt::default_timeout)
  81. .create_registry(true)
  82. .finish();
  83. auto sup2 = sup->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  84. sup->do_process();
  85. CHECK(sup->access<rt::to::registry>());
  86. CHECK(sup2->access<rt::to::registry>());
  87. }
  88. SECTION("registry is set from different locality") {
  89. const char locality1[] = "abc";
  90. const char locality2[] = "def";
  91. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  92. .timeout(rt::default_timeout)
  93. .locality(locality1)
  94. .finish();
  95. auto reg = sup->create_actor<r::registry_t>().timeout(rt::default_timeout).finish();
  96. sup->do_process();
  97. CHECK(!sup->access<rt::to::registry>());
  98. auto sup2 = sup->create_actor<rt::supervisor_test_t>()
  99. .timeout(rt::default_timeout)
  100. .locality(locality2)
  101. .registry_address(reg->get_address())
  102. .finish();
  103. while (!sup->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  104. sup->do_process();
  105. sup2->do_process();
  106. }
  107. CHECK(sup2->access<rt::to::registry>());
  108. sup2->do_shutdown();
  109. while (!sup->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  110. sup->do_process();
  111. sup2->do_process();
  112. }
  113. }
  114. sup->do_shutdown();
  115. sup->do_process();
  116. }
  117. TEST_CASE("registry actor (server)", "[registry][supervisor]") {
  118. r::system_context_t system_context;
  119. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  120. .timeout(rt::default_timeout)
  121. .create_registry(true)
  122. .finish();
  123. auto act = sup->create_actor<manual_actor_t>().timeout(rt::default_timeout).finish();
  124. act->registry_addr = sup->access<rt::to::registry>();
  125. sup->do_process();
  126. SECTION("discovery non-existing name") {
  127. act->query_name("some-name");
  128. sup->do_process();
  129. REQUIRE((bool)act->discovery_reply);
  130. auto &ec = act->discovery_reply->payload.ee->ec;
  131. CHECK(ec == r::error_code_t::unknown_service);
  132. CHECK(ec.message() == "the requested service name is not registered");
  133. }
  134. SECTION("duplicate registration attempt") {
  135. act->register_name("nnn");
  136. sup->do_process();
  137. REQUIRE((bool)act->registration_reply);
  138. REQUIRE(!act->registration_reply->payload.ee);
  139. act->register_name("nnn");
  140. sup->do_process();
  141. auto &ec = act->registration_reply->payload.ee->ec;
  142. REQUIRE((bool)ec);
  143. REQUIRE(ec == r::error_code_t::already_registered);
  144. REQUIRE(ec.message() == "service name is already registered");
  145. }
  146. SECTION("reg 2 names, check, unreg on, check") {
  147. act->register_name("s1");
  148. sup->do_process();
  149. REQUIRE((bool)act->registration_reply);
  150. REQUIRE(!act->registration_reply->payload.ee);
  151. act->query_name("s1");
  152. sup->do_process();
  153. REQUIRE((bool)act->discovery_reply);
  154. REQUIRE(!act->discovery_reply->payload.ee);
  155. REQUIRE(act->discovery_reply->payload.res.service_addr.get() == act->get_address().get());
  156. act->register_name("s2");
  157. sup->do_process();
  158. REQUIRE((bool)act->registration_reply);
  159. REQUIRE(!act->registration_reply->payload.ee);
  160. act->query_name("s2");
  161. sup->do_process();
  162. REQUIRE((bool)act->discovery_reply);
  163. REQUIRE(!act->discovery_reply->payload.ee);
  164. REQUIRE(act->discovery_reply->payload.res.service_addr.get() == act->get_address().get());
  165. act->register_name("s3");
  166. sup->do_process();
  167. REQUIRE((bool)act->registration_reply);
  168. REQUIRE(!act->registration_reply->payload.ee);
  169. act->unregister_name("s2");
  170. act->query_name("s2");
  171. sup->do_process();
  172. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  173. act->unregister_all();
  174. act->query_name("s1");
  175. sup->do_process();
  176. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  177. act->query_name("s3");
  178. sup->do_process();
  179. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  180. }
  181. SECTION("promise & future") {
  182. REQUIRE(!act->future_reply);
  183. SECTION("promise, register, future") {
  184. act->promise_name("s1");
  185. act->register_name("s1");
  186. sup->do_process();
  187. CHECK(act->future_reply);
  188. CHECK(act->future_reply->payload.res.service_addr.get() == act->get_address().get());
  189. }
  190. SECTION("future, register, promise") {
  191. act->register_name("s1");
  192. act->promise_name("s1");
  193. sup->do_process();
  194. CHECK(act->future_reply);
  195. CHECK(act->future_reply->payload.res.service_addr.get() == act->get_address().get());
  196. }
  197. SECTION("cancel") {
  198. auto req_id = act->promise_name("s1");
  199. act->cancel_name(req_id);
  200. sup->do_process();
  201. auto id = &std::as_const(r::plugin::child_manager_plugin_t::class_identity);
  202. auto plugin = static_cast<r::actor_base_t *>(sup.get())->access<rt::to::get_plugin>(id);
  203. auto &reply = act->future_reply;
  204. CHECK(reply->payload.ee);
  205. CHECK(reply->payload.ee->ec.message() == "request has been cancelled");
  206. auto &actors_map = static_cast<r::plugin::child_manager_plugin_t *>(plugin)->access<rt::to::actors_map>();
  207. auto actor_state = actors_map.find(act->registry_addr);
  208. auto &registry = actor_state->second->actor;
  209. auto &promises = static_cast<r::registry_t *>(registry.get())->access<rt::to::promises>();
  210. CHECK(promises.empty());
  211. }
  212. }
  213. sup->do_shutdown();
  214. sup->do_process();
  215. }
  216. TEST_CASE("registry plugin (client)", "[registry][supervisor]") {
  217. r::system_context_t system_context;
  218. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  219. .timeout(rt::default_timeout)
  220. .create_registry(true)
  221. .finish();
  222. SECTION("common case (just discover)") {
  223. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  224. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  225. plugin.with_casted<r::plugin::registry_plugin_t>(
  226. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  227. };
  228. sup->do_process();
  229. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  230. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  231. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  232. plugin.with_casted<r::plugin::registry_plugin_t>(
  233. [&](auto &p) { p.discover_name("service-name", act_c->service_addr); });
  234. };
  235. sup->do_process();
  236. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  237. CHECK(act_c->service_addr == act_s->get_address());
  238. sup->do_shutdown();
  239. sup->do_process();
  240. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  241. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  242. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  243. }
  244. SECTION("common case (discover & link)") {
  245. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  246. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  247. plugin.with_casted<r::plugin::registry_plugin_t>(
  248. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  249. };
  250. sup->do_process();
  251. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  252. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  253. int successes = 0;
  254. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  255. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  256. p.discover_name("service-name", act_c->service_addr)
  257. .link(true)
  258. .callback([&](auto /*phase*/, auto &ec) mutable {
  259. REQUIRE(!ec);
  260. ++successes;
  261. });
  262. });
  263. };
  264. sup->do_process();
  265. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  266. CHECK(act_c->service_addr == act_s->get_address());
  267. CHECK(successes == 2);
  268. sup->do_shutdown();
  269. sup->do_process();
  270. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  271. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  272. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  273. }
  274. SECTION("aliasing (discover & link)") {
  275. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  276. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  277. plugin.with_casted<r::plugin::registry_plugin_t>([&actor](auto &p) {
  278. p.register_name("service-name", actor.get_address());
  279. p.register_name("service-alias", actor.get_address());
  280. });
  281. };
  282. sup->do_process();
  283. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  284. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  285. int successes = 0;
  286. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  287. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  288. p.discover_name("service-name", act_c->service_addr)
  289. .link(true)
  290. .callback([&](auto /*phase*/, auto &ec) mutable {
  291. REQUIRE(!ec);
  292. ++successes;
  293. });
  294. p.discover_name("service-alias", act_c->service_addr)
  295. .link(true)
  296. .callback([&](auto /*phase*/, auto &ec) mutable {
  297. REQUIRE(!ec);
  298. ++successes;
  299. });
  300. });
  301. };
  302. sup->do_process();
  303. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  304. CHECK(act_c->service_addr == act_s->get_address());
  305. CHECK(successes == 4);
  306. sup->do_shutdown();
  307. sup->do_process();
  308. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  309. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  310. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  311. }
  312. SECTION("common case (promise & link)") {
  313. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  314. int successes = 0;
  315. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  316. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  317. p.discover_name("service-name", act_c->service_addr, true)
  318. .link(true)
  319. .callback([&](auto /*phase*/, auto &ec) mutable {
  320. REQUIRE(!ec);
  321. ++successes;
  322. });
  323. });
  324. };
  325. sup->do_process();
  326. CHECK(successes == 0);
  327. SECTION("successful link") {
  328. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  329. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  330. plugin.with_casted<r::plugin::registry_plugin_t>(
  331. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  332. };
  333. sup->do_process();
  334. CHECK(successes == 2);
  335. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  336. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  337. CHECK(act_c->service_addr == act_s->get_address());
  338. }
  339. SECTION("cancel promise") {
  340. CHECK(act_c->get_state() == r::state_t::INITIALIZING);
  341. act_c->do_shutdown();
  342. sup->do_process();
  343. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  344. auto id = &std::as_const(r::plugin::registry_plugin_t::class_identity);
  345. auto plugin = act_c->access<rt::to::get_plugin>(id);
  346. auto p = static_cast<r::plugin::registry_plugin_t *>(plugin);
  347. auto &dm = p->access<rt::to::discovery_map>();
  348. CHECK(dm.size() == 0);
  349. }
  350. sup->do_shutdown();
  351. sup->do_process();
  352. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  353. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  354. }
  355. SECTION("discovery non-existing name => fail to init") {
  356. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  357. act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  358. plugin.with_casted<r::plugin::registry_plugin_t>(
  359. [&act](auto &p) { p.discover_name("non-existing-service", act->service_addr); });
  360. };
  361. sup->do_process();
  362. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  363. auto &reason = act->get_shutdown_reason();
  364. CHECK(reason->ec == r::shutdown_code_t::supervisor_shutdown);
  365. CHECK(reason->ec.message() == "actor shutdown has been requested by supervisor");
  366. CHECK(reason->next->ec == r::shutdown_code_t::child_init_failed);
  367. CHECK(reason->next->ec.message() == "supervisor shutdown due to child init failure");
  368. auto &down_reason = reason->next->next->next;
  369. REQUIRE(down_reason);
  370. CHECK(down_reason->ec == r::error_code_t::discovery_failed);
  371. CHECK(down_reason->ec.message() == "discovery has been failed");
  372. }
  373. SECTION("double name registration => fail") {
  374. auto act1 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  375. auto act2 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  376. printf("act1 = %p(%p), act2 = %p(%p)\n", (void *)act1.get(), (void *)act1->get_address().get(),
  377. (void *)act2.get(), (void *)act2->get_address().get());
  378. auto configurer = [](auto &actor, r::plugin::plugin_base_t &plugin) {
  379. plugin.with_casted<r::plugin::registry_plugin_t>(
  380. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  381. };
  382. act1->configurer = configurer;
  383. act2->configurer = configurer;
  384. sup->do_process();
  385. CHECK(act1->get_state() == r::state_t::SHUT_DOWN);
  386. CHECK(act2->get_state() == r::state_t::SHUT_DOWN);
  387. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  388. auto &reason = act2->get_shutdown_reason();
  389. auto &down_reason = reason->next->next->next;
  390. REQUIRE(down_reason);
  391. CHECK(down_reason->ec == r::error_code_t::registration_failed);
  392. CHECK(down_reason->ec.message() == "registration has been failed");
  393. }
  394. }
  395. TEST_CASE("notify linked clients about going to shutdown", "[registry][supervisor]") {
  396. r::system_context_t system_context;
  397. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  398. .timeout(rt::default_timeout)
  399. .create_registry(true)
  400. .finish();
  401. auto act1 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  402. act1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  403. plugin.with_casted<r::plugin::registry_plugin_t>([&act1](auto &p) {
  404. p.register_name("my-actor", act1->get_address());
  405. p.discover_name("non-existing-service", act1->service_addr, true);
  406. });
  407. };
  408. auto act2 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  409. act2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  410. plugin.with_casted<r::plugin::registry_plugin_t>(
  411. [&act1](auto &p) { p.discover_name("my-actor", act1->service_addr, true).link(false); });
  412. };
  413. sup->do_process();
  414. REQUIRE(act1->get_state() == r::state_t::INITIALIZING);
  415. REQUIRE(act2->get_state() == r::state_t::OPERATIONAL);
  416. REQUIRE(sup->get_state() == r::state_t::INITIALIZING);
  417. act1->do_shutdown();
  418. sup->do_process();
  419. CHECK(act1->get_state() == r::state_t::SHUT_DOWN);
  420. CHECK(act2->get_state() == r::state_t::SHUT_DOWN);
  421. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  422. }
  423. TEST_CASE("no problems when supervisor registers self in a registry", "[registry][supervisor]") {
  424. r::system_context_t system_context;
  425. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  426. .timeout(rt::default_timeout)
  427. .create_registry(true)
  428. .configurer([](auto &actor, r::plugin::plugin_base_t &plugin) {
  429. plugin.with_casted<r::plugin::registry_plugin_t>(
  430. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  431. })
  432. .finish();
  433. SECTION("single supervisor and it's registry") {
  434. sup->do_process();
  435. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  436. }
  437. SECTION("supervisor + actor") {
  438. sup->do_process();
  439. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  440. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  441. act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  442. plugin.with_casted<r::plugin::registry_plugin_t>(
  443. [&](auto &p) { p.discover_name("service-name", act->service_addr, false).link(false); });
  444. };
  445. sup->do_process();
  446. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  447. }
  448. sup->do_shutdown();
  449. sup->do_process();
  450. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  451. }