123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- //
- // Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
- //
- // Distributed under the MIT Software License
- //
- #include <catch2/matchers/catch_matchers_string.hpp>
- #include "rotor.hpp"
- #include "supervisor_test.h"
- #include "actor_test.h"
- #include "access.h"
- namespace r = rotor;
- namespace rt = rotor::test;
- using namespace Catch::Matchers;
- static std::uint32_t destroyed = 0;
- struct init_shutdown_plugin_t;
- namespace payload {
- struct sample_payload_t {};
- } // namespace payload
- namespace message {
- using sample_payload_t = r::message_t<payload::sample_payload_t>;
- }
- struct sample_sup_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using plugins_list_t = std::tuple<r::plugin::address_maker_plugin_t, r::plugin::locality_plugin_t,
- r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>,
- r::plugin::lifetime_plugin_t, init_shutdown_plugin_t, /* use custom */
- r::plugin::foreigners_support_plugin_t, r::plugin::child_manager_plugin_t,
- r::plugin::starter_plugin_t>;
- std::uint32_t initialized = 0;
- std::uint32_t init_invoked = 0;
- std::uint32_t shutdown_started = 0;
- std::uint32_t shutdown_finished = 0;
- std::uint32_t shutdown_conf_invoked = 0;
- r::address_ptr_t shutdown_addr;
- using rt::supervisor_test_t::supervisor_test_t;
- ~sample_sup_t() override { ++destroyed; }
- void do_initialize(r::system_context_t *ctx) noexcept override {
- ++initialized;
- sup_base_t::do_initialize(ctx);
- }
- void shutdown_finish() noexcept override {
- ++shutdown_finished;
- rt::supervisor_test_t::shutdown_finish();
- }
- };
- struct init_shutdown_plugin_t : r::plugin::init_shutdown_plugin_t {
- using parent_t = r::plugin::init_shutdown_plugin_t;
- void deactivate() noexcept override { parent_t::deactivate(); }
- bool handle_shutdown(r::message::shutdown_request_t *message) noexcept override {
- auto sup = static_cast<sample_sup_t *>(actor);
- sup->shutdown_started++;
- return parent_t::handle_shutdown(message);
- }
- bool handle_init(r::message::init_request_t *message) noexcept override {
- auto sup = static_cast<sample_sup_t *>(actor);
- sup->init_invoked++;
- return parent_t::handle_init(message);
- }
- };
- struct sample_plugin_t : r::plugin::plugin_base_t {
- using parent_t = r::plugin::plugin_base_t;
- static std::type_index class_id;
- const std::type_index &identity() const noexcept override { return class_id; }
- void activate(r::actor_base_t *actor_) noexcept override {
- parent_t::activate(actor_);
- auto info = subscribe(&sample_plugin_t::on_message);
- info->tag_io();
- info->tag_io(); // for better coverage
- }
- void deactivate() noexcept override { parent_t::deactivate(); }
- void on_message(message::sample_payload_t &) noexcept { message_received = true; }
- bool message_received = false;
- };
- std::type_index sample_plugin_t::class_id = typeid(sample_plugin_t);
- struct sample_sup2_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- std::uint32_t initialized = 0;
- std::uint32_t init_invoked = 0;
- std::uint32_t shutdown_finished = 0;
- std::uint32_t shutdown_conf_invoked = 0;
- r::address_ptr_t shutdown_addr;
- actor_base_t *init_child = nullptr;
- actor_base_t *shutdown_child = nullptr;
- r::extended_error_ptr_t init_ec;
- using rt::supervisor_test_t::supervisor_test_t;
- ~sample_sup2_t() override { ++destroyed; }
- void do_initialize(r::system_context_t *ctx) noexcept override {
- ++initialized;
- sup_base_t::do_initialize(ctx);
- }
- void init_finish() noexcept override {
- ++init_invoked;
- sup_base_t::init_finish();
- }
- virtual void shutdown_finish() noexcept override {
- ++shutdown_finished;
- rt::supervisor_test_t::shutdown_finish();
- }
- void on_child_init(actor_base_t *actor, const r::extended_error_ptr_t &ec) noexcept override {
- init_child = actor;
- init_ec = ec;
- }
- void on_child_shutdown(actor_base_t *actor) noexcept override { shutdown_child = actor; }
- };
- struct sample_sup3_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- std::uint32_t received = 0;
- void make_subscription() noexcept {
- subscribe(&sample_sup3_t::on_sample);
- send<payload::sample_payload_t>(address);
- }
- void on_sample(message::sample_payload_t &) noexcept { ++received; }
- };
- struct sample_sup4_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- std::uint32_t counter = 0;
- void intercept(r::message_ptr_t &, const void *tag, const r::continuation_t &continuation) noexcept override {
- CHECK(tag == rotor::tags::io);
- if (++counter % 2) {
- continuation();
- }
- }
- };
- struct unsubscriber_sup_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::starter_plugin_t>(
- [](auto &p) { p.subscribe_actor(&unsubscriber_sup_t::on_sample); });
- }
- void on_start() noexcept override {
- rt::supervisor_test_t::on_start();
- unsubscribe(&unsubscriber_sup_t::on_sample);
- }
- void on_sample(message::sample_payload_t &) noexcept {}
- };
- struct sample_actor_t : public r::actor_base_t {
- using r::actor_base_t::actor_base_t;
- };
- struct sample_actor2_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
- alternative = p.create_address();
- p.set_identity("specific_name", false);
- });
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- p.subscribe_actor(&sample_actor2_t::on_link, alternative);
- send<payload::sample_payload_t>(alternative);
- });
- }
- void on_link(message::sample_payload_t &) noexcept { ++received; }
- r::address_ptr_t alternative;
- int received = 0;
- };
- struct sample_actor3_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void shutdown_start() noexcept override {
- rt::actor_test_t::shutdown_start();
- resources->acquire();
- }
- };
- struct sample_actor4_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- rt::actor_test_t::configure(plugin);
- plugin.with_casted<r::plugin::starter_plugin_t>(
- [&](auto &p) { p.subscribe_actor(&sample_actor4_t::on_message)->tag_io(); });
- }
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- send<payload::sample_payload_t>(get_address());
- send<payload::sample_payload_t>(get_address());
- }
- void on_message(message::sample_payload_t &) noexcept { ++received; }
- std::size_t received = 0;
- };
- struct sample_actor5_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- // clang-format off
- using plugins_list_t = std::tuple<
- r::plugin::address_maker_plugin_t,
- r::plugin::lifetime_plugin_t,
- r::plugin::init_shutdown_plugin_t,
- r::plugin::link_server_plugin_t,
- r::plugin::link_client_plugin_t,
- r::plugin::registry_plugin_t,
- r::plugin::resources_plugin_t,
- r::plugin::starter_plugin_t,
- sample_plugin_t
- >;
- // clang-format on
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- send<payload::sample_payload_t>(get_address());
- send<payload::sample_payload_t>(get_address());
- }
- };
- struct sample_actor6_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- start_timer(r::pt::minutes(1), *this, &sample_actor6_t::on_timer);
- }
- void on_timer(r::request_id_t, bool cancelled_) noexcept { cancelled = cancelled_; }
- bool cancelled = false;
- };
- struct sample_actor7_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- start_timer(r::pt::minutes(1), *this,
- [](sample_actor7_t *actor, r::request_id_t, bool cancelled) { actor->cancelled = cancelled; });
- }
- void on_timer(r::request_id_t, bool cancelled_) noexcept { cancelled = cancelled_; }
- bool cancelled = false;
- };
- struct sample_actor8_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- r::message_ptr_t msg;
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- subscribe(&sample_actor8_t::on_message);
- do_shutdown();
- }
- void shutdown_start() noexcept override {
- auto sup = static_cast<rt::supervisor_test_t *>(supervisor);
- sup->get_leader_queue().push_back(std::move(msg));
- rt::actor_test_t::shutdown_start();
- }
- void on_message(message::sample_payload_t &) noexcept {}
- };
- TEST_CASE("on_initialize, on_start, simple on_shutdown (handled by plugin)", "[supervisor]") {
- destroyed = 0;
- r::system_context_t *system_context = new r::system_context_t{};
- auto sup = system_context->create_supervisor<sample_sup_t>().timeout(rt::default_timeout).finish();
- REQUIRE(&sup->get_supervisor() == sup.get());
- REQUIRE(sup->initialized == 1);
- auto &identity = sup->get_identity();
- CHECK_THAT(identity, StartsWith("supervisor"));
- sup->do_process();
- CHECK(sup->init_invoked == 1);
- CHECK(sup->shutdown_started == 0);
- CHECK(sup->shutdown_conf_invoked == 0);
- CHECK(sup->active_timers.size() == 0);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- REQUIRE(sup->shutdown_started == 1);
- REQUIRE(sup->shutdown_finished == 1);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup->get_leader_queue().size() == 0);
- REQUIRE(sup->get_points().size() == 0);
- REQUIRE(sup->get_shutdown_reason()->ec.message() == "normal shutdown");
- CHECK(rt::empty(sup->get_subscription()));
- REQUIRE(destroyed == 0);
- delete system_context;
- sup->shutdown_addr.reset();
- sup.reset();
- REQUIRE(destroyed == 1);
- }
- TEST_CASE("on_initialize, on_start, simple on_shutdown", "[supervisor]") {
- destroyed = 0;
- r::system_context_t *system_context = new r::system_context_t{};
- auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
- REQUIRE(&sup->get_supervisor() == sup.get());
- REQUIRE(sup->initialized == 1);
- REQUIRE(sup->init_child == nullptr);
- sup->do_process();
- REQUIRE(sup->init_invoked == 1);
- REQUIRE(sup->shutdown_conf_invoked == 0);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- REQUIRE(sup->shutdown_finished == 1);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup->get_leader_queue().size() == 0);
- REQUIRE(sup->get_points().size() == 0);
- CHECK(rt::empty(sup->get_subscription()));
- REQUIRE(sup->shutdown_child == nullptr);
- REQUIRE(destroyed == 0);
- delete system_context;
- sup->shutdown_addr.reset();
- sup.reset();
- REQUIRE(destroyed == 1);
- }
- TEST_CASE("start/shutdown 1 child & 1 supervisor", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
- CHECK_THAT(act->get_identity(), StartsWith("actor"));
- /* for better coverage */
- auto last = sup->access<rt::to::last_req_id>();
- auto &request_map = sup->access<rt::to::request_map>();
- request_map[last + 1] = r::request_curry_t();
- sup->do_process();
- request_map.clear();
- CHECK(sup->access<rt::to::last_req_id>() > 1);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->access<rt::to::state>() == r::state_t::OPERATIONAL);
- CHECK(act->access<rt::to::resources>()->has() == 0);
- CHECK(sup->init_child == act.get());
- CHECK(!sup->init_ec);
- CHECK(sup->shutdown_child == nullptr);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->access<rt::to::state>() == r::state_t::SHUT_DOWN);
- CHECK(sup->shutdown_child == act.get());
- auto &reason = sup->shutdown_child->get_shutdown_reason();
- REQUIRE(reason);
- CHECK(reason->ec == r::shutdown_code_t::supervisor_shutdown);
- CHECK_THAT(reason->message(), Catch::Matchers::ContainsSubstring("shutdown has been requested by supervisor"));
- CHECK_THAT(reason->message(), Catch::Matchers::ContainsSubstring("normal shutdown"));
- auto &root = reason->next;
- CHECK(root);
- CHECK(root->ec.value() == static_cast<int>(r::shutdown_code_t::normal));
- CHECK(!root->next);
- }
- TEST_CASE("custom subscription", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->make_subscription();
- sup->do_process();
- CHECK(sup->received == 1);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("shutdown immediately", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("self unsubscriber", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<unsubscriber_sup_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("alternative address subscriber", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor2_t>().timeout(rt::default_timeout).finish();
- CHECK(act->get_identity() == "specific_name");
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->received == 1);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("acquire resources on shutdown start", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor3_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUTTING_DOWN);
- act->access<rt::to::resources>()->release();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("io tagging & intercepting", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor4_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->received == 1);
- CHECK(sup->counter == 2);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("io tagging (in plugin) & intercepting", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor5_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup->counter == 2);
- auto plugin = act->access<rt::to::get_plugin>(&std::as_const(sample_plugin_t::class_id));
- CHECK(plugin);
- CHECK(static_cast<sample_plugin_t *>(plugin)->message_received);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("timers cancellation (1)", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor6_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(!act->access<rt::to::timers_map>().empty());
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->access<rt::to::timers_map>().empty());
- }
- TEST_CASE("timers cancellation (2)", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor7_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(!act->access<rt::to::timers_map>().empty());
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->access<rt::to::timers_map>().empty());
- }
- TEST_CASE("subscription confirmation arrives on non-init phase", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor8_t>().timeout(rt::default_timeout).finish();
- auto act_configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- p.subscribe_actor(r::lambda<message::sample_payload_t>([](message::sample_payload_t &) noexcept { ; }));
- auto req = sup->get_leader_queue().back();
- sup->get_leader_queue().pop_back();
- act->msg = std::move(req);
- act->do_shutdown();
- });
- };
- act->configurer = act_configurer;
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
|