123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- //
- // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
- //
- // Distributed under the MIT Software License
- //
- #include "rotor.hpp"
- #include "supervisor_test.h"
- #include "access.h"
- namespace r = rotor;
- namespace rt = r::test;
- static size_t destroyed = 0;
- struct my_supervisor_t : public rt::supervisor_test_t {
- using rt::supervisor_test_t::supervisor_test_t;
- // clang-format off
- using plugins_list_t = std::tuple<
- r::plugin::address_maker_plugin_t,
- r::plugin::locality_plugin_t,
- r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>, // for coverage
- r::plugin::lifetime_plugin_t,
- r::plugin::init_shutdown_plugin_t,
- r::plugin::foreigners_support_plugin_t,
- r::plugin::child_manager_plugin_t,
- r::plugin::link_server_plugin_t,
- r::plugin::link_client_plugin_t,
- r::plugin::registry_plugin_t,
- r::plugin::starter_plugin_t>;
- // clang-format on
- void init_start() noexcept override {
- rt::supervisor_test_t::init_start();
- assert(state == r::state_t::INITIALIZING);
- init_start_count++;
- }
- void init_finish() noexcept override {
- rt::supervisor_test_t::init_finish();
- assert(state == r::state_t::INITIALIZED);
- init_finish_count++;
- }
- void shutdown_start() noexcept override {
- rt::supervisor_test_t::shutdown_start();
- shutdown_start_count++;
- assert(state == r::state_t::SHUTTING_DOWN);
- }
- void shutdown_finish() noexcept override {
- rt::supervisor_test_t::shutdown_finish();
- shutdown_finish_count++;
- assert(state == r::state_t::SHUT_DOWN);
- }
- ~my_supervisor_t() { ++destroyed; }
- std::uint32_t init_start_count = 0;
- std::uint32_t init_finish_count = 0;
- std::uint32_t shutdown_start_count = 0;
- std::uint32_t shutdown_finish_count = 0;
- };
- TEST_CASE("two supervisors, different localities, shutdown 2nd", "[supervisor]") {
- r::system_context_t system_context;
- const char locality1[] = "abc";
- const char locality2[] = "def";
- auto sup1 =
- system_context.create_supervisor<my_supervisor_t>().locality(locality1).timeout(rt::default_timeout).finish();
- auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality2).timeout(rt::default_timeout).finish();
- REQUIRE(²->get_supervisor() == sup2.get());
- REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::INITIALIZING);
- REQUIRE(sup2->get_state() == r::state_t::INITIALIZING);
- REQUIRE(sup1->init_start_count == 1);
- REQUIRE(sup1->init_finish_count == 0);
- REQUIRE(sup2->init_start_count == 1);
- REQUIRE(sup2->init_finish_count == 0);
- sup2->do_process();
- REQUIRE(sup1->get_state() == r::state_t::INITIALIZING);
- REQUIRE(sup2->get_state() == r::state_t::INITIALIZED);
- REQUIRE(sup1->init_start_count == 1);
- REQUIRE(sup1->init_finish_count == 0);
- REQUIRE(sup2->init_start_count == 1);
- REQUIRE(sup2->init_finish_count == 1);
- sup1->do_process();
- REQUIRE(sup1->init_start_count == 1);
- REQUIRE(sup1->init_finish_count == 1);
- REQUIRE(sup2->init_start_count == 1);
- REQUIRE(sup2->init_finish_count == 1);
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::INITIALIZED);
- REQUIRE(sup1->shutdown_start_count == 0);
- sup2->do_process();
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup1->init_start_count == 1);
- REQUIRE(sup1->init_finish_count == 1);
- REQUIRE(sup2->init_start_count == 1);
- REQUIRE(sup2->init_finish_count == 1);
- REQUIRE(sup2->shutdown_start_count == 0);
- REQUIRE(sup1->shutdown_start_count == 0);
- sup2->do_shutdown();
- sup2->do_process();
- REQUIRE(sup1->shutdown_start_count == 0);
- REQUIRE(sup1->shutdown_finish_count == 0);
- REQUIRE(sup2->shutdown_start_count == 0);
- REQUIRE(sup2->shutdown_finish_count == 0);
- sup1->do_process();
- REQUIRE(sup1->shutdown_start_count == 0);
- REQUIRE(sup1->shutdown_finish_count == 0);
- REQUIRE(sup2->shutdown_start_count == 0);
- REQUIRE(sup2->shutdown_finish_count == 0);
- sup2->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->shutdown_start_count == 0);
- REQUIRE(sup1->shutdown_finish_count == 0);
- REQUIRE(sup2->shutdown_start_count == 1);
- REQUIRE(sup2->shutdown_finish_count == 1);
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->shutdown_start_count == 0);
- REQUIRE(sup1->shutdown_finish_count == 0);
- REQUIRE(sup2->shutdown_start_count == 1);
- REQUIRE(sup2->shutdown_finish_count == 1);
- sup1->do_shutdown();
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->shutdown_start_count == 1);
- REQUIRE(sup1->shutdown_finish_count == 1);
- REQUIRE(sup1->get_leader_queue().size() == 0);
- REQUIRE(sup1->get_points().size() == 0);
- REQUIRE(rt::empty(sup1->get_subscription()));
- REQUIRE(sup2->get_leader_queue().size() == 0);
- REQUIRE(sup2->get_points().size() == 0);
- REQUIRE(rt::empty(sup2->get_subscription()));
- }
- TEST_CASE("two supervisors, different localities, shutdown 1st", "[supervisor]") {
- r::system_context_t system_context;
- const char locality1[] = "abc";
- const char locality2[] = "def";
- auto sup1 =
- system_context.create_supervisor<my_supervisor_t>().locality(locality1).timeout(rt::default_timeout).finish();
- auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality2).timeout(rt::default_timeout).finish();
- REQUIRE(²->get_supervisor() == sup2.get());
- REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
- sup1->do_process();
- sup2->do_process();
- sup1->do_process();
- sup2->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- sup1->do_shutdown();
- while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
- sup1->do_process();
- sup2->do_process();
- }
- CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->get_leader_queue().size() == 0);
- REQUIRE(sup1->get_points().size() == 0);
- REQUIRE(rt::empty(sup1->get_subscription()));
- REQUIRE(sup2->get_leader_queue().size() == 0);
- REQUIRE(sup2->get_points().size() == 0);
- REQUIRE(rt::empty(sup2->get_subscription()));
- }
- TEST_CASE("two supervisors & external subscription", "[supervisor]") {
- rt::system_test_context_t ctx1;
- rt::system_test_context_t ctx2;
- auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>()
- .configurer([&](auto &, r::plugin::plugin_base_t &plugin) {
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- using message_t = rt::message::sample_t;
- auto lambda = r::lambda<message_t>([](message_t &) noexcept { ; });
- p.subscribe_actor(lambda, sup1->get_address());
- });
- })
- .timeout(rt::default_timeout)
- .finish();
- auto process = [&]() {
- while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
- bool progress = false;
- if (sup1->get_state() != r::state_t::SHUT_DOWN && !sup1->get_leader_queue().empty()) {
- auto msg = ¹->get_leader_queue().front();
- sup1->do_process();
- progress = sup1->get_leader_queue().empty() || (msg != ¹->get_leader_queue().front());
- }
- if (sup2->get_state() != r::state_t::SHUT_DOWN && !sup2->get_leader_queue().empty()) {
- auto msg = ²->get_leader_queue().front();
- sup2->do_process();
- progress = sup1->get_leader_queue().empty() || (msg != ¹->get_leader_queue().front());
- }
- if (!progress) {
- break;
- }
- }
- };
- SECTION("server-client shutdown order") {
- process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- sup1->do_shutdown();
- while (sup1->get_state() != r::state_t::SHUT_DOWN) {
- sup1->do_process();
- }
- CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup2->get_state() == r::state_t::OPERATIONAL);
- sup2->do_shutdown();
- process();
- }
- SECTION("client-server shutdown order, with message in progress") {
- process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- sup2->do_shutdown();
- while (sup2->get_state() != r::state_t::SHUT_DOWN) {
- sup2->do_process();
- }
- CHECK(sup1->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
- auto msg = sup1->get_leader_queue().front();
- sup1->get_leader_queue().pop_front();
- process();
- sup1->send<rt::payload::sample_t>(sup1->get_address(), 5);
- process();
- sup1->get_leader_queue().push_back(std::move(msg));
- sup1->do_shutdown();
- process();
- }
- CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("two supervisors, same locality", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto mark = destroyed;
- const char locality[] = "locality";
- auto sup1 =
- system_context->create_supervisor<my_supervisor_t>().locality(locality).timeout(rt::default_timeout).finish();
- auto sup2 = sup1->create_actor<my_supervisor_t>().locality(locality).timeout(rt::default_timeout).finish();
- REQUIRE(²->get_supervisor() == sup2.get());
- REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup1->init_start_count == 1);
- REQUIRE(sup1->init_finish_count == 1);
- REQUIRE(sup2->init_start_count == 1);
- REQUIRE(sup2->init_finish_count == 1);
- sup1->do_shutdown();
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->shutdown_start_count == 1);
- REQUIRE(sup1->get_leader_queue().size() == 0);
- REQUIRE(sup1->get_points().size() == 0);
- REQUIRE(rt::empty(sup1->get_subscription()));
- REQUIRE(sup2->get_leader_queue().size() == 0);
- REQUIRE(sup2->get_points().size() == 0);
- REQUIRE(rt::empty(sup2->get_subscription()));
- system_context.reset();
- sup1.reset();
- sup2.reset();
- REQUIRE(mark + 2 == destroyed);
- }
- TEST_CASE("two supervisors, down internal first, same locality", "[supervisor]") {
- r::system_context_t system_context;
- const char locality[] = "locality";
- auto sup1 =
- system_context.create_supervisor<my_supervisor_t>().timeout(rt::default_timeout).locality(locality).finish();
- auto sup2 = sup1->create_actor<my_supervisor_t>().timeout(rt::default_timeout).locality(locality).finish();
- REQUIRE(²->get_supervisor() == sup2.get());
- REQUIRE(sup2->access<rt::to::parent_supervisor>() == sup1.get());
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- // for better coverage
- auto &address = static_cast<r::actor_base_t *>(sup2.get())->get_address();
- auto ec = r::make_error_code(r::error_code_t::success);
- auto reason = r::make_error("some-ctx", ec);
- sup2->send<r::payload::shutdown_trigger_t>(address, address, reason);
- sup1->do_process();
- REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup2->get_shutdown_reason()->root()->ec.message() == "success");
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- sup1->do_shutdown();
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup1->get_leader_queue().size() == 0);
- REQUIRE(sup1->get_points().size() == 0);
- REQUIRE(rt::empty(sup1->get_subscription()));
- REQUIRE(sup2->get_leader_queue().size() == 0);
- REQUIRE(sup2->get_points().size() == 0);
- REQUIRE(rt::empty(sup2->get_subscription()));
- }
- TEST_CASE("message arrival order", "[supervisor]") {
- r::system_context_t system_context;
- int model = 0;
- int states[] = {0, 0};
- auto sup1 = system_context.create_supervisor<rt::supervisor_test_t>()
- .timeout(rt::default_timeout)
- .configurer([&](auto &sup, r::plugin::plugin_base_t &plugin) {
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- using message_t = rt::message::sample_t;
- auto lambda = r::lambda<message_t>([&](message_t &) noexcept {
- printf("sup1\n");
- if (!states[0]) {
- model += 1;
- } else {
- model *= 2;
- }
- ++states[0];
- });
- p.subscribe_actor(lambda, sup.get_address());
- });
- })
- .finish();
- auto sup2 = sup1->create_actor<rt::supervisor_test_t>()
- .timeout(rt::default_timeout)
- .configurer([&](auto &, r::plugin::plugin_base_t &plugin) {
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- using message_t = rt::message::sample_t;
- auto lambda = r::lambda<message_t>([&](message_t &) noexcept {
- printf("sup2\n");
- if (!states[1]) {
- model += 2;
- } else {
- model *= 3;
- }
- ++states[1];
- });
- printf("sup2-sb\n");
- auto addr = sup1->get_address();
- p.subscribe_actor(lambda, addr);
- printf("sup2-sb\n");
- });
- })
- .finish();
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
- REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
- sup1->send<rt::payload::sample_t>(sup1->get_address(), 0);
- sup1->send<rt::payload::sample_t>(sup1->get_address(), 0);
- sup1->do_process();
- CHECK(model == ((0 + 1 + 2) * 2 * 3));
- sup1->do_shutdown();
- sup1->do_process();
- REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
- }
|