test_supervisor.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #include "test_supervisor.h"
  2. #include "net/names.h"
  3. namespace to {
  4. struct queue{};
  5. struct on_timer_trigger{};
  6. }
  7. template <> inline auto &rotor::supervisor_t::access<to::queue>() noexcept { return queue; }
  8. namespace rotor {
  9. template <> inline auto rotor::actor_base_t::access<to::on_timer_trigger, request_id_t, bool>(request_id_t request_id,
  10. bool cancelled) noexcept {
  11. on_timer_trigger(request_id, cancelled);
  12. }
  13. }
  14. using namespace syncspirit::net;
  15. using namespace syncspirit::test;
  16. supervisor_t::supervisor_t(r::supervisor_config_t& cfg): r::supervisor_t(cfg) {
  17. log = utils::get_logger("net.test_supervisor");
  18. }
  19. void supervisor_t::configure(r::plugin::plugin_base_t &plugin) noexcept {
  20. parent_t::configure(plugin);
  21. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity(names::coordinator, false); });
  22. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  23. p.register_name(names::coordinator, get_address());
  24. });
  25. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  26. p.subscribe_actor(&supervisor_t::on_model_update);
  27. p.subscribe_actor(&supervisor_t::on_block_update);
  28. p.subscribe_actor(&supervisor_t::on_contact_update);
  29. });
  30. if (configure_callback) {
  31. configure_callback(plugin);
  32. }
  33. }
  34. void supervisor_t::do_start_timer(const r::pt::time_duration &interval, r::timer_handler_base_t &handler) noexcept {
  35. timers.emplace_back(&handler);
  36. }
  37. void supervisor_t::do_cancel_timer(r::request_id_t timer_id) noexcept {
  38. auto it = timers.begin();
  39. while (it != timers.end()) {
  40. auto& handler = *it;
  41. if (handler->request_id == timer_id) {
  42. auto& actor_ptr = handler->owner;
  43. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, true);
  44. on_timer_trigger(timer_id, true);
  45. timers.erase(it);
  46. return;
  47. } else {
  48. ++it;
  49. }
  50. }
  51. assert(0 && "should not happen");
  52. }
  53. void supervisor_t::do_invoke_timer(r::request_id_t timer_id) noexcept {
  54. LOG_DEBUG(log, "{}, invoking timer {}", identity, timer_id);
  55. auto predicate = [&](auto& handler) { return handler->request_id == timer_id; };
  56. auto it = std::find_if(timers.begin(), timers.end(), predicate);
  57. assert(it != timers.end());
  58. auto& handler = *it;
  59. auto& actor_ptr = handler->owner;
  60. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, false);
  61. timers.erase(it);
  62. }
  63. void supervisor_t::start() noexcept {}
  64. void supervisor_t::shutdown() noexcept { do_shutdown(); }
  65. void supervisor_t::enqueue(r::message_ptr_t message) noexcept {
  66. locality_leader->access<to::queue>().emplace_back(std::move(message));
  67. }
  68. void supervisor_t::on_model_update(model::message::model_update_t &msg) noexcept {
  69. LOG_TRACE(log, "{}, updating model", identity);
  70. auto& diff = msg.payload.diff;
  71. auto r = diff->apply(*cluster);
  72. if (!r) {
  73. LOG_ERROR(log, "{}, error updating model: {}", identity, r.assume_error().message());
  74. do_shutdown(make_error(r.assume_error()));
  75. }
  76. }
  77. void supervisor_t::on_block_update(model::message::block_update_t &msg) noexcept {
  78. LOG_TRACE(log, "{}, updating block", identity);
  79. auto& diff = msg.payload.diff;
  80. auto r = diff->apply(*cluster);
  81. if (!r) {
  82. LOG_ERROR(log, "{}, error updating block: {}", identity, r.assume_error().message());
  83. do_shutdown(make_error(r.assume_error()));
  84. }
  85. }
  86. void supervisor_t::on_contact_update(model::message::contact_update_t &msg) noexcept {
  87. LOG_TRACE(log, "{}, updating contact", identity);
  88. auto& diff = msg.payload.diff;
  89. auto r = diff->apply(*cluster);
  90. if (!r) {
  91. LOG_ERROR(log, "{}, error updating contact: {}", identity, r.assume_error().message());
  92. do_shutdown(make_error(r.assume_error()));
  93. }
  94. }