079-peer.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2023 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/peer/peer_state.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "net/peer_actor.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. #include <boost/algorithm/string/replace.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. namespace asio = boost::asio;
  21. namespace sys = boost::system;
  22. namespace r = rotor;
  23. namespace ra = r::asio;
  24. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using parent_t = ra::supervisor_asio_t;
  29. using parent_t::parent_t;
  30. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  31. parent_t::configure(plugin);
  32. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  33. p.register_name(names::coordinator, get_address());
  34. p.register_name(names::peer_supervisor, get_address());
  35. });
  36. if (configure_callback) {
  37. configure_callback(plugin);
  38. }
  39. }
  40. void on_child_shutdown(actor_base_t *actor) noexcept override {
  41. if (actor) {
  42. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  43. actor->get_shutdown_reason()->message());
  44. }
  45. parent_t::on_child_shutdown(actor);
  46. }
  47. void shutdown_finish() noexcept override {
  48. parent_t::shutdown_finish();
  49. if (acceptor) {
  50. acceptor->cancel();
  51. }
  52. }
  53. auto get_state() noexcept { return state; }
  54. asio::ip::tcp::acceptor *acceptor = nullptr;
  55. configure_callback_t configure_callback;
  56. };
  57. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  58. using actor_ptr_t = r::intrusive_ptr_t<peer_actor_t>;
  59. struct fixture_t : private model::diff::contact_visitor_t {
  60. using acceptor_t = asio::ip::tcp::acceptor;
  61. using diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  62. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  63. utils::set_default("trace");
  64. log = utils::get_logger("fixture");
  65. }
  66. virtual void run() noexcept {
  67. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  68. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  69. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  70. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  71. using diff_t = typename diff_ptr_t::element_type;
  72. p.subscribe_actor(r::lambda<diff_t>([&](diff_t &msg) {
  73. LOG_INFO(log, "received diff message");
  74. auto &diff = msg.payload.diff;
  75. auto r = diff->apply(*cluster);
  76. if (!r) {
  77. LOG_ERROR(log, "error updating model: {}", r.assume_error().message());
  78. sup->do_shutdown();
  79. }
  80. }));
  81. });
  82. };
  83. sup->start();
  84. sup->do_process();
  85. my_keys = utils::generate_pair("my").value();
  86. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  87. my_device = device_t::create(md, "my-device").value();
  88. peer_keys = utils::generate_pair("peer").value();
  89. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  90. peer_device = device_t::create(pd, "peer-device").value();
  91. cluster = new cluster_t(my_device, 1, 1);
  92. cluster->get_devices().put(my_device);
  93. cluster->get_devices().put(peer_device);
  94. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  95. acceptor.open(ep.protocol());
  96. acceptor.bind(ep);
  97. acceptor.listen();
  98. auto local_ep = acceptor.local_endpoint();
  99. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  100. sup->acceptor = &acceptor;
  101. auto uri_str = fmt::format("tcp://{}:{}/", local_ep.address(), local_ep.port());
  102. LOG_TRACE(log, "Connecting to {}", uri_str);
  103. auto uri = utils::parse(uri_str).value();
  104. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  105. client_trans = transport::initiate_stream(cfg);
  106. tcp::resolver resolver(io_ctx);
  107. auto addresses = resolver.resolve(host, std::to_string(local_ep.port()));
  108. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  109. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  110. transport::connect_fn_t on_connect = [addresses_ptr, this](const tcp::endpoint &ep) {
  111. LOG_INFO(log, "active/connected");
  112. main();
  113. };
  114. client_trans->async_connect(*addresses_ptr, on_connect, on_error);
  115. io_ctx.run();
  116. }
  117. virtual void main() noexcept {}
  118. virtual actor_ptr_t create_actor() noexcept {
  119. auto diff = model::diff::cluster_diff_ptr_t();
  120. auto state = model::device_state_t::dialing;
  121. auto sha256 = peer_device->device_id().get_sha256();
  122. diff = new model::diff::peer::peer_state_t(*cluster, sha256, nullptr, state);
  123. sup->send<model::payload::model_update_t>(sup->get_address(), std::move(diff));
  124. auto bep_config = config::bep_config_t();
  125. bep_config.rx_buff_size = 1024;
  126. return sup->create_actor<actor_ptr_t::element_type>()
  127. .timeout(timeout)
  128. .cluster(cluster)
  129. .coordinator(sup->get_address())
  130. .bep_config(bep_config)
  131. .transport(peer_trans)
  132. .peer_device_id(peer_device->device_id())
  133. .device_name("peer-device")
  134. .peer_proto("tcp")
  135. .escalate_failure()
  136. .finish();
  137. }
  138. virtual void accept(const sys::error_code &ec) noexcept {
  139. LOG_INFO(log, "accept, ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  140. auto uri = utils::parse("tcp://127.0.0.1:0/").value();
  141. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  142. peer_trans = transport::initiate_stream(cfg);
  143. main();
  144. }
  145. cluster_ptr_t cluster;
  146. supervisor_ptr_t sup;
  147. asio::io_context io_ctx;
  148. ra::system_context_asio_t ctx;
  149. acceptor_t acceptor;
  150. asio::ip::tcp::socket peer_sock;
  151. utils::logger_t log;
  152. utils::key_pair_t peer_keys;
  153. utils::key_pair_t my_keys;
  154. model::device_ptr_t peer_device;
  155. model::device_ptr_t my_device;
  156. transport::stream_sp_t peer_trans;
  157. transport::stream_sp_t client_trans;
  158. };
  159. void test_shutdown_on_hello_timeout() {
  160. struct F : fixture_t {
  161. void main() noexcept override { auto act = create_actor(); }
  162. void run() noexcept override {
  163. fixture_t::run();
  164. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  165. }
  166. };
  167. F().run();
  168. }
  169. int _init() {
  170. REGISTER_TEST_CASE(test_shutdown_on_hello_timeout, "test_shutdown_on_hello_timeout", "[peer]");
  171. return 1;
  172. }
  173. static int v = _init();