072-global_discovery.cpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2023 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "model/cluster.h"
  6. #include "model/diff/modify/update_contact.h"
  7. #include "utils/tls.h"
  8. #include "utils/error_code.h"
  9. #include "net/global_discovery_actor.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "access.h"
  13. #include "test_supervisor.h"
  14. #include <nlohmann/json.hpp>
  15. using namespace syncspirit;
  16. using namespace syncspirit::db;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. namespace http = boost::beast::http;
  21. using json = nlohmann::json;
  22. namespace {
  23. static auto ssl_pair = utils::generate_pair("sample").value();
  24. struct dummy_http_actor_t : r::actor_base_t {
  25. using response_t = r::intrusive_ptr_t<net::payload::http_response_t>;
  26. using queue_t = std::list<response_t>;
  27. using r::actor_base_t::actor_base_t;
  28. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  29. r::actor_base_t::configure(plugin);
  30. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("dummy-http", true); });
  31. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  32. p.subscribe_actor(&dummy_http_actor_t::on_request);
  33. p.subscribe_actor(&dummy_http_actor_t::on_close_connection);
  34. });
  35. plugin.with_casted<r::plugin::registry_plugin_t>(
  36. [&](auto &p) { p.register_name(names::http11_gda, get_address()); });
  37. }
  38. void on_request(net::message::http_request_t &req) noexcept {
  39. if (!responses.empty()) {
  40. auto &res = *responses.front();
  41. reply_to(req, std::move(res.response), res.bytes, std::move(res.local_addr));
  42. connected = true;
  43. responses.pop_front();
  44. } else {
  45. auto ec = utils::make_error_code(utils::error_code_t::timed_out);
  46. reply_with_error(req, make_error(ec));
  47. }
  48. }
  49. void on_close_connection(net::message::http_close_connection_t &) noexcept { closed = true; }
  50. queue_t responses;
  51. bool connected = false;
  52. bool closed = false;
  53. };
  54. struct fixture_t {
  55. using http_actor_ptr_t = r::intrusive_ptr_t<dummy_http_actor_t>;
  56. using announce_msg_t = net::message::announce_notification_t;
  57. using announce_ptr_t = r::intrusive_ptr_t<announce_msg_t>;
  58. fixture_t() noexcept { utils::set_default("trace"); }
  59. virtual void run() noexcept {
  60. auto peer_id =
  61. device_id_t::from_string("VUV42CZ-IQD5A37-RPEBPM4-VVQK6E4-6WSKC7B-PVJQHHD-4PZD44V-ENC6WAZ").value();
  62. peer_device = device_t::create(peer_id, "peer-device").value();
  63. auto my_id =
  64. device_id_t::from_string("KHQNO2S-5QSILRK-YX4JZZ4-7L77APM-QNVGZJT-EKU7IFI-PNEPBMY-4MXFMQD").value();
  65. auto my_device = device_t::create(my_id, "my-device").value();
  66. cluster = new cluster_t(my_device, 1, 1);
  67. cluster->get_devices().put(my_device);
  68. cluster->get_devices().put(peer_device);
  69. r::system_context_t ctx;
  70. sup = ctx.create_supervisor<supervisor_t>().timeout(timeout).create_registry().finish();
  71. sup->cluster = cluster;
  72. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  73. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  74. p.subscribe_actor(r::lambda<announce_msg_t>([&](announce_msg_t &msg) { announce = &msg; }));
  75. });
  76. };
  77. sup->start();
  78. http_actor = sup->create_actor<dummy_http_actor_t>().timeout(timeout).finish();
  79. sup->do_process();
  80. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::OPERATIONAL);
  81. auto global_device_id =
  82. model::device_id_t::from_string("LYXKCHX-VI3NYZR-ALCJBHF-WMZYSPK-QG6QJA3-MPFYMSO-U56GTUK-NA2MIAW");
  83. gda = sup->create_actor<global_discovery_actor_t>()
  84. .cluster(cluster)
  85. .ssl_pair(&ssl_pair)
  86. .announce_url(utils::parse("https://discovery.syncthing.net/").value())
  87. .device_id(std::move(global_device_id.value()))
  88. .rx_buff_size(32768ul)
  89. .io_timeout(5ul)
  90. .timeout(timeout)
  91. .finish();
  92. bool started = preprocess();
  93. sup->do_process();
  94. if (started) {
  95. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::OPERATIONAL);
  96. target_addr = gda->get_address();
  97. main();
  98. }
  99. sup->shutdown();
  100. sup->do_process();
  101. CHECK(http_actor->closed);
  102. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  103. }
  104. virtual bool preprocess() noexcept { return true; }
  105. virtual void main() noexcept {}
  106. http_actor_ptr_t http_actor;
  107. r::address_ptr_t target_addr;
  108. r::pt::time_duration timeout = r::pt::millisec{10};
  109. cluster_ptr_t cluster;
  110. device_ptr_t peer_device;
  111. r::intrusive_ptr_t<supervisor_t> sup;
  112. r::intrusive_ptr_t<net::global_discovery_actor_t> gda;
  113. announce_ptr_t announce;
  114. r::system_context_t ctx;
  115. };
  116. } // namespace
  117. void test_successful_announcement() {
  118. struct F : fixture_t {
  119. bool preprocess() noexcept override {
  120. auto uri = utils::parse("tcp://127.0.0.1").value();
  121. cluster->get_device()->assign_uris({uri});
  122. SECTION("successful (and empty) announce response") {
  123. http::response<http::string_body> res;
  124. res.result(204);
  125. res.set("Reannounce-After", "123");
  126. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  127. sup->do_process();
  128. CHECK(http_actor->connected);
  129. CHECK(announce);
  130. }
  131. return true;
  132. }
  133. };
  134. F().run();
  135. }
  136. void test_failed_announcement() {
  137. struct F : fixture_t {
  138. bool preprocess() noexcept override {
  139. auto uri = utils::parse("tcp://127.0.0.1").value();
  140. cluster->get_device()->assign_uris({uri});
  141. SECTION("successful (and empty) announce response") {
  142. http::response<http::string_body> res;
  143. res.result(204);
  144. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  145. sup->do_process();
  146. CHECK(http_actor->connected);
  147. CHECK(!announce);
  148. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  149. }
  150. return false;
  151. }
  152. };
  153. F().run();
  154. }
  155. void test_peer_discovery() {
  156. struct F : fixture_t {
  157. void main() noexcept override {
  158. sup->send<net::payload::discovery_notification_t>(sup->get_address(), peer_device->device_id());
  159. http::response<http::string_body> res;
  160. auto j = json::object();
  161. j["addresses"] = json::array({"tcp://127.0.0.2"});
  162. j["seen"] = "2020-10-13T18:41:37.02287354Z";
  163. SECTION("successful case") {
  164. res.body() = j.dump();
  165. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  166. sup->do_process();
  167. REQUIRE(peer_device->get_uris().size() == 1);
  168. CHECK(peer_device->get_uris()[0].full == "tcp://127.0.0.2");
  169. // 2nd attempt
  170. peer_device->assign_uris({});
  171. res = {};
  172. res.body() = j.dump();
  173. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  174. sup->send<net::payload::discovery_notification_t>(sup->get_address(), peer_device->device_id());
  175. sup->do_process();
  176. REQUIRE(peer_device->get_uris().size() == 1);
  177. CHECK(peer_device->get_uris()[0].full == "tcp://127.0.0.2");
  178. }
  179. SECTION("gargbage in response") {
  180. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  181. sup->do_process();
  182. REQUIRE(peer_device->get_uris().size() == 0);
  183. CHECK(static_cast<r::actor_base_t *>(gda.get())->access<to::state>() == r::state_t::OPERATIONAL);
  184. }
  185. }
  186. };
  187. F().run();
  188. }
  189. void test_late_announcement() {
  190. struct F : fixture_t {
  191. void main() noexcept override {
  192. auto diff = model::diff::contact_diff_ptr_t{};
  193. diff = new model::diff::modify::update_contact_t(*cluster, {"127.0.0.3"});
  194. sup->send<model::payload::contact_update_t>(sup->get_address(), diff);
  195. http::response<http::string_body> res;
  196. res.result(204);
  197. res.set("Reannounce-After", "123");
  198. http_actor->responses.push_back(new net::payload::http_response_t(std::move(res), 0));
  199. sup->do_process();
  200. CHECK(http_actor->connected);
  201. CHECK(announce);
  202. }
  203. };
  204. F().run();
  205. }
  206. int _init() {
  207. REGISTER_TEST_CASE(test_successful_announcement, "test_successful_announcement", "[net]");
  208. REGISTER_TEST_CASE(test_failed_announcement, "test_failed_announcement", "[net]");
  209. REGISTER_TEST_CASE(test_peer_discovery, "test_peer_discovery", "[net]");
  210. REGISTER_TEST_CASE(test_late_announcement, "test_late_announcement", "[net]");
  211. return 1;
  212. }
  213. static int v = _init();