072-global_discovery.cpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "model/cluster.h"
  6. #include "model/diff/modify/update_contact.h"
  7. #include "utils/tls.h"
  8. #include "utils/error_code.h"
  9. #include "net/global_discovery_actor.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "test_supervisor.h"
  13. #include <nlohmann/json.hpp>
  14. using namespace syncspirit;
  15. using namespace syncspirit::db;
  16. using namespace syncspirit::test;
  17. using namespace syncspirit::model;
  18. using namespace syncspirit::net;
  19. namespace http = boost::beast::http;
  20. using json = nlohmann::json;
  21. namespace {
  22. static auto ssl_pair = utils::generate_pair("sample").value();
  23. struct dummy_http_actor_t : r::actor_base_t {
  24. using response_t = r::intrusive_ptr_t<net::payload::http_response_t>;
  25. using queue_t = std::list<response_t>;
  26. using r::actor_base_t::actor_base_t;
  27. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  28. r::actor_base_t::configure(plugin);
  29. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("dummy-http", true); });
  30. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  31. p.subscribe_actor(&dummy_http_actor_t::on_request);
  32. p.subscribe_actor(&dummy_http_actor_t::on_close_connection);
  33. });
  34. plugin.with_casted<r::plugin::registry_plugin_t>(
  35. [&](auto &p) { p.register_name(names::http11_gda, get_address()); });
  36. }
  37. void on_request(net::message::http_request_t &req) noexcept {
  38. if (!responses.empty()) {
  39. auto &res = *responses.front();
  40. reply_to(req, std::move(res.response), res.bytes, std::move(res.local_addr));
  41. connected = true;
  42. responses.pop_front();
  43. } else {
  44. auto ec = utils::make_error_code(utils::error_code_t::timed_out);
  45. reply_with_error(req, make_error(ec));
  46. }
  47. }
  48. void on_close_connection(net::message::http_close_connection_t &) noexcept { closed = true; }
  49. queue_t responses;
  50. bool connected = false;
  51. bool closed = false;
  52. };
  53. struct fixture_t {
  54. using http_actor_ptr_t = r::intrusive_ptr_t<dummy_http_actor_t>;
  55. using announce_msg_t = net::message::announce_notification_t;
  56. using announce_ptr_t = r::intrusive_ptr_t<announce_msg_t>;
  57. fixture_t() noexcept { utils::set_default("trace"); }
  58. virtual void run() noexcept {
  59. auto peer_id =
  60. device_id_t::from_string("VUV42CZ-IQD5A37-RPEBPM4-VVQK6E4-6WSKC7B-PVJQHHD-4PZD44V-ENC6WAZ").value();
  61. peer_device = device_t::create(peer_id, "peer-device").value();
  62. auto my_id =
  63. device_id_t::from_string("KHQNO2S-5QSILRK-YX4JZZ4-7L77APM-QNVGZJT-EKU7IFI-PNEPBMY-4MXFMQD").value();
  64. auto my_device = device_t::create(my_id, "my-device").value();
  65. cluster = new cluster_t(my_device, 1, 1);
  66. cluster->get_devices().put(my_device);
  67. cluster->get_devices().put(peer_device);
  68. r::system_context_t ctx;
  69. sup = ctx.create_supervisor<supervisor_t>().timeout(timeout).create_registry().finish();
  70. sup->cluster = cluster;
  71. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  72. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  73. p.subscribe_actor(r::lambda<announce_msg_t>([&](announce_msg_t &msg) { announce = &msg; }));
  74. });
  75. };
  76. sup->start();
  77. http_actor = sup->create_actor<dummy_http_actor_t>().timeout(timeout).finish();
  78. sup->do_process();
  79. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::OPERATIONAL);
  80. auto global_device_id =
  81. model::device_id_t::from_string("LYXKCHX-VI3NYZR-ALCJBHF-WMZYSPK-QG6QJA3-MPFYMSO-U56GTUK-NA2MIAW");
  82. gda = sup->create_actor<global_discovery_actor_t>()
  83. .cluster(cluster)
  84. .ssl_pair(&ssl_pair)
  85. .announce_url(utils::parse("https://discovery.syncthing.net/"))
  86. .device_id(std::move(global_device_id.value()))
  87. .rx_buff_size(32768ul)
  88. .io_timeout(5ul)
  89. .timeout(timeout)
  90. .finish();
  91. bool started = preprocess();
  92. sup->do_process();
  93. if (started) {
  94. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::OPERATIONAL);
  95. target_addr = gda->get_address();
  96. main();
  97. }
  98. sup->shutdown();
  99. sup->do_process();
  100. CHECK(http_actor->closed);
  101. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  102. }
  103. virtual bool preprocess() noexcept { return true; }
  104. virtual void main() noexcept {}
  105. http_actor_ptr_t http_actor;
  106. r::address_ptr_t target_addr;
  107. r::pt::time_duration timeout = r::pt::millisec{10};
  108. cluster_ptr_t cluster;
  109. device_ptr_t peer_device;
  110. r::intrusive_ptr_t<supervisor_t> sup;
  111. r::intrusive_ptr_t<net::global_discovery_actor_t> gda;
  112. announce_ptr_t announce;
  113. r::system_context_t ctx;
  114. };
  115. } // namespace
  116. void test_successful_announcement() {
  117. struct F : fixture_t {
  118. bool preprocess() noexcept override {
  119. auto uri = utils::parse("tcp://127.0.0.1");
  120. cluster->get_device()->assign_uris({uri});
  121. SECTION("successful (and empty) announce response") {
  122. http::response<http::string_body> res;
  123. res.result(204);
  124. res.set("Reannounce-After", "123");
  125. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  126. sup->do_process();
  127. CHECK(http_actor->connected);
  128. CHECK(announce);
  129. }
  130. return true;
  131. }
  132. };
  133. F().run();
  134. }
  135. void test_failed_announcement() {
  136. struct F : fixture_t {
  137. bool preprocess() noexcept override {
  138. auto uri = utils::parse("tcp://127.0.0.1");
  139. cluster->get_device()->assign_uris({uri});
  140. SECTION("successful (and empty) announce response") {
  141. http::response<http::string_body> res;
  142. res.result(204);
  143. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  144. sup->do_process();
  145. CHECK(http_actor->connected);
  146. CHECK(!announce);
  147. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  148. }
  149. return false;
  150. }
  151. };
  152. F().run();
  153. }
  154. void test_peer_discovery() {
  155. struct F : fixture_t {
  156. void main() noexcept override {
  157. sup->send<net::payload::discovery_notification_t>(sup->get_address(), peer_device->device_id());
  158. http::response<http::string_body> res;
  159. auto j = json::object();
  160. j["addresses"] = json::array({"tcp://127.0.0.2"});
  161. j["seen"] = "2020-10-13T18:41:37.02287354Z";
  162. SECTION("successful case") {
  163. res.body() = j.dump();
  164. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  165. sup->do_process();
  166. REQUIRE(peer_device->get_uris().size() == 1);
  167. CHECK(peer_device->get_uris()[0]->buffer() == "tcp://127.0.0.2");
  168. // 2nd attempt
  169. peer_device->assign_uris({});
  170. res = {};
  171. res.body() = j.dump();
  172. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  173. sup->send<net::payload::discovery_notification_t>(sup->get_address(), peer_device->device_id());
  174. sup->do_process();
  175. REQUIRE(peer_device->get_uris().size() == 1);
  176. CHECK(peer_device->get_uris()[0]->buffer() == "tcp://127.0.0.2");
  177. }
  178. SECTION("gargbage in response") {
  179. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  180. sup->do_process();
  181. REQUIRE(peer_device->get_uris().size() == 0);
  182. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::OPERATIONAL);
  183. }
  184. }
  185. };
  186. F().run();
  187. }
  188. void test_late_announcement() {
  189. struct F : fixture_t {
  190. void main() noexcept override {
  191. auto diff = model::diff::contact_diff_ptr_t{};
  192. diff = new model::diff::modify::update_contact_t(*cluster, {"127.0.0.3"});
  193. sup->send<model::payload::contact_update_t>(sup->get_address(), diff);
  194. http::response<http::string_body> res;
  195. res.result(204);
  196. res.set("Reannounce-After", "123");
  197. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  198. sup->do_process();
  199. CHECK(http_actor->connected);
  200. CHECK(announce);
  201. }
  202. };
  203. F().run();
  204. }
  205. int _init() {
  206. REGISTER_TEST_CASE(test_successful_announcement, "test_successful_announcement", "[net]");
  207. REGISTER_TEST_CASE(test_failed_announcement, "test_failed_announcement", "[net]");
  208. REGISTER_TEST_CASE(test_peer_discovery, "test_peer_discovery", "[net]");
  209. REGISTER_TEST_CASE(test_late_announcement, "test_late_announcement", "[net]");
  210. return 1;
  211. }
  212. static int v = _init();