test_supervisor.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test_supervisor.h"
  4. #include "model/diff/modify/clone_block.h"
  5. #include "model/diff/modify/append_block.h"
  6. #include "model/diff/modify/finish_file.h"
  7. #include "model/diff/advance/remote_copy.h"
  8. #include "net/names.h"
  9. namespace to {
  10. struct queue {};
  11. struct on_timer_trigger {};
  12. } // namespace to
  13. template <> inline auto &rotor::supervisor_t::access<to::queue>() noexcept { return queue; }
  14. namespace rotor {
  15. template <>
  16. inline auto rotor::actor_base_t::access<to::on_timer_trigger, request_id_t, bool>(request_id_t request_id,
  17. bool cancelled) noexcept {
  18. on_timer_trigger(request_id, cancelled);
  19. }
  20. } // namespace rotor
  21. using namespace syncspirit::net;
  22. using namespace syncspirit::test;
  23. supervisor_t::supervisor_t(config_t &cfg) : parent_t(cfg) {
  24. auto_finish = cfg.auto_finish;
  25. auto_ack_blocks = cfg.auto_ack_blocks;
  26. sequencer = model::make_sequencer(1234);
  27. }
  28. void supervisor_t::configure(r::plugin::plugin_base_t &plugin) noexcept {
  29. parent_t::configure(plugin);
  30. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
  31. p.set_identity(std::string(names::coordinator) + ".test", false);
  32. log = utils::get_logger(identity);
  33. sink = p.create_address();
  34. });
  35. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  36. p.register_name(names::coordinator, get_address());
  37. p.register_name(names::sink, get_address());
  38. });
  39. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  40. p.subscribe_actor(&supervisor_t::on_model_update);
  41. p.subscribe_actor(&supervisor_t::on_model_sink, sink);
  42. });
  43. if (configure_callback) {
  44. configure_callback(plugin);
  45. }
  46. }
  47. void supervisor_t::do_start_timer(const r::pt::time_duration &, r::timer_handler_base_t &handler) noexcept {
  48. timers.emplace_back(&handler);
  49. }
  50. void supervisor_t::do_cancel_timer(r::request_id_t timer_id) noexcept {
  51. auto it = timers.begin();
  52. while (it != timers.end()) {
  53. auto &handler = *it;
  54. if (handler->request_id == timer_id) {
  55. auto &actor_ptr = handler->owner;
  56. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, true);
  57. on_timer_trigger(timer_id, true);
  58. timers.erase(it);
  59. return;
  60. } else {
  61. ++it;
  62. }
  63. }
  64. assert(0 && "should not happen");
  65. }
  66. void supervisor_t::do_invoke_timer(r::request_id_t timer_id) noexcept {
  67. LOG_DEBUG(log, "{}, invoking timer {}", identity, timer_id);
  68. auto predicate = [&](auto &handler) { return handler->request_id == timer_id; };
  69. auto it = std::find_if(timers.begin(), timers.end(), predicate);
  70. assert(it != timers.end());
  71. auto &handler = *it;
  72. auto &actor_ptr = handler->owner;
  73. actor_ptr->access<to::on_timer_trigger, r::request_id_t, bool>(timer_id, false);
  74. timers.erase(it);
  75. }
  76. void supervisor_t::start() noexcept {}
  77. void supervisor_t::shutdown() noexcept { do_shutdown(); }
  78. void supervisor_t::enqueue(r::message_ptr_t message) noexcept {
  79. locality_leader->access<to::queue>().emplace_back(std::move(message));
  80. }
  81. void supervisor_t::on_model_update(model::message::model_update_t &msg) noexcept {
  82. LOG_TRACE(log, "{}, updating model", identity);
  83. auto &diff = msg.payload.diff;
  84. auto r = diff->apply(*cluster, *this);
  85. if (!r) {
  86. LOG_ERROR(log, "{}, error updating model: {}", identity, r.assume_error().message());
  87. do_shutdown(make_error(r.assume_error()));
  88. }
  89. r = diff->visit(*this, nullptr);
  90. if (!r) {
  91. LOG_ERROR(log, "{}, error visiting model: {}", identity, r.assume_error().message());
  92. do_shutdown(make_error(r.assume_error()));
  93. }
  94. }
  95. void supervisor_t::on_model_sink(model::message::model_update_t &message) noexcept {
  96. LOG_TRACE(log, "on_model_sink");
  97. auto custom = const_cast<void *>(message.payload.custom);
  98. auto diff_ptr = reinterpret_cast<model::diff::cluster_diff_t *>(custom);
  99. if (diff_ptr) {
  100. auto diff = model::diff::cluster_diff_ptr_t(diff_ptr, false);
  101. send<model::payload::model_update_t>(get_address(), std::move(diff));
  102. }
  103. }
  104. auto supervisor_t::consume_errors() noexcept -> io_errors_t { return std::move(io_errors); }
  105. auto supervisor_t::operator()(const model::diff::local::io_failure_t &diff, void *custom) noexcept
  106. -> outcome::result<void> {
  107. auto &errs = diff.errors;
  108. std::copy(errs.begin(), errs.end(), std::back_inserter(io_errors));
  109. return diff.visit_next(*this, custom);
  110. }
  111. auto supervisor_t::operator()(const model::diff::modify::finish_file_t &diff, void *custom) noexcept
  112. -> outcome::result<void> {
  113. if (auto_finish) {
  114. auto folder = cluster->get_folders().by_id(diff.folder_id);
  115. auto file_info = folder->get_folder_infos().by_device_id(diff.peer_id);
  116. auto file = file_info->get_file_infos().by_name(diff.file_name);
  117. auto ack = model::diff::advance::advance_t::create(diff.action, *file, *sequencer);
  118. send<model::payload::model_update_t>(get_address(), std::move(ack), this);
  119. }
  120. return diff.visit_next(*this, custom);
  121. }
  122. auto supervisor_t::operator()(const model::diff::modify::append_block_t &diff, void *custom) noexcept
  123. -> outcome::result<void> {
  124. auto ack_diff = diff.ack();
  125. if (auto_ack_blocks) {
  126. send<model::payload::model_update_t>(address, diff.ack(), this);
  127. } else {
  128. if (delayed_ack_holder) {
  129. delayed_ack_current = delayed_ack_current->assign_sibling(ack_diff.get());
  130. } else {
  131. delayed_ack_holder = ack_diff;
  132. delayed_ack_current = ack_diff.get();
  133. }
  134. }
  135. return diff.visit_next(*this, custom);
  136. }
  137. auto supervisor_t::operator()(const model::diff::modify::clone_block_t &diff, void *custom) noexcept
  138. -> outcome::result<void> {
  139. if (auto_ack_blocks) {
  140. send<model::payload::model_update_t>(address, diff.ack(), this);
  141. }
  142. return diff.visit_next(*this, custom);
  143. }