test_supervisor.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2023 Ivan Baidakou
  3. #include "test_supervisor.h"
  4. #include "model/diff/modify/finish_file.h"
  5. #include "model/diff/modify/finish_file_ack.h"
  6. #include "net/names.h"
  7. namespace to {
  8. struct queue {};
  9. struct on_timer_trigger {};
  10. } // namespace to
  11. template <> inline auto &rotor::supervisor_t::access<to::queue>() noexcept { return queue; }
  12. namespace rotor {
  13. template <>
  14. inline auto rotor::actor_base_t::access<to::on_timer_trigger, request_id_t, bool>(request_id_t request_id,
  15. bool cancelled) noexcept {
  16. on_timer_trigger(request_id, cancelled);
  17. }
  18. } // namespace rotor
  19. using namespace syncspirit::net;
  20. using namespace syncspirit::test;
  21. supervisor_t::supervisor_t(config_t &cfg) : parent_t(cfg) {
  22. log = utils::get_logger("net.test_supervisor");
  23. auto_finish = cfg.auto_finish;
  24. }
  25. void supervisor_t::configure(r::plugin::plugin_base_t &plugin) noexcept {
  26. parent_t::configure(plugin);
  27. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity(names::coordinator, false); });
  28. plugin.with_casted<r::plugin::registry_plugin_t>(
  29. [&](auto &p) { p.register_name(names::coordinator, get_address()); });
  30. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  31. p.subscribe_actor(&supervisor_t::on_model_update);
  32. p.subscribe_actor(&supervisor_t::on_block_update);
  33. p.subscribe_actor(&supervisor_t::on_contact_update);
  34. });
  35. if (configure_callback) {
  36. configure_callback(plugin);
  37. }
  38. }
  39. void supervisor_t::do_start_timer(const r::pt::time_duration &, r::timer_handler_base_t &handler) noexcept {
  40. timers.emplace_back(&handler);
  41. }
  42. void supervisor_t::do_cancel_timer(r::request_id_t timer_id) noexcept {
  43. auto it = timers.begin();
  44. while (it != timers.end()) {
  45. auto &handler = *it;
  46. if (handler->request_id == timer_id) {
  47. auto &actor_ptr = handler->owner;
  48. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, true);
  49. on_timer_trigger(timer_id, true);
  50. timers.erase(it);
  51. return;
  52. } else {
  53. ++it;
  54. }
  55. }
  56. assert(0 && "should not happen");
  57. }
  58. void supervisor_t::do_invoke_timer(r::request_id_t timer_id) noexcept {
  59. LOG_DEBUG(log, "{}, invoking timer {}", identity, timer_id);
  60. auto predicate = [&](auto &handler) { return handler->request_id == timer_id; };
  61. auto it = std::find_if(timers.begin(), timers.end(), predicate);
  62. assert(it != timers.end());
  63. auto &handler = *it;
  64. auto &actor_ptr = handler->owner;
  65. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, false);
  66. timers.erase(it);
  67. }
  68. void supervisor_t::start() noexcept {}
  69. void supervisor_t::shutdown() noexcept { do_shutdown(); }
  70. void supervisor_t::enqueue(r::message_ptr_t message) noexcept {
  71. locality_leader->access<to::queue>().emplace_back(std::move(message));
  72. }
  73. void supervisor_t::on_model_update(model::message::model_update_t &msg) noexcept {
  74. LOG_TRACE(log, "{}, updating model", identity);
  75. auto &diff = msg.payload.diff;
  76. auto r = diff->apply(*cluster);
  77. if (!r) {
  78. LOG_ERROR(log, "{}, error updating model: {}", identity, r.assume_error().message());
  79. do_shutdown(make_error(r.assume_error()));
  80. }
  81. r = diff->visit(*this, nullptr);
  82. if (!r) {
  83. LOG_ERROR(log, "{}, error visiting model: {}", identity, r.assume_error().message());
  84. do_shutdown(make_error(r.assume_error()));
  85. }
  86. }
  87. void supervisor_t::on_block_update(model::message::block_update_t &msg) noexcept {
  88. LOG_TRACE(log, "{}, updating block", identity);
  89. auto &diff = msg.payload.diff;
  90. auto r = diff->apply(*cluster);
  91. if (!r) {
  92. LOG_ERROR(log, "{}, error updating block: {}", identity, r.assume_error().message());
  93. do_shutdown(make_error(r.assume_error()));
  94. }
  95. }
  96. void supervisor_t::on_contact_update(model::message::contact_update_t &msg) noexcept {
  97. LOG_TRACE(log, "{}, updating contact", identity);
  98. auto &diff = msg.payload.diff;
  99. auto r = diff->apply(*cluster);
  100. if (!r) {
  101. LOG_ERROR(log, "{}, error updating contact: {}", identity, r.assume_error().message());
  102. do_shutdown(make_error(r.assume_error()));
  103. }
  104. }
  105. auto supervisor_t::operator()(const model::diff::modify::finish_file_t &diff, void *custom) noexcept
  106. -> outcome::result<void> {
  107. if (auto_finish) {
  108. auto folder = cluster->get_folders().by_id(diff.folder_id);
  109. auto file_info = folder->get_folder_infos().by_device(*cluster->get_device());
  110. auto file = file_info->get_file_infos().by_name(diff.file_name);
  111. auto ack = model::diff::cluster_diff_ptr_t{};
  112. ack = new model::diff::modify::finish_file_ack_t(*file);
  113. send<model::payload::model_update_t>(get_address(), std::move(ack), this);
  114. }
  115. return outcome::success();
  116. }