078-relay.cpp 16 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2025 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/cluster_visitor.h"
  10. #include "model/diff/contact/relay_connect_request.h"
  11. #include "model/diff/contact/unknown_connected.h"
  12. #include "net/names.h"
  13. #include "net/messages.h"
  14. #include "net/relay_actor.h"
  15. #include "transport/stream.h"
  16. #include <rotor/asio.hpp>
  17. #include <boost/algorithm/string/replace.hpp>
  18. using namespace syncspirit;
  19. using namespace syncspirit::test;
  20. using namespace syncspirit::model;
  21. using namespace syncspirit::net;
  22. namespace asio = boost::asio;
  23. namespace sys = boost::system;
  24. namespace r = rotor;
  25. namespace ra = r::asio;
  26. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  27. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  28. auto host = "127.0.0.1";
  29. struct supervisor_t : ra::supervisor_asio_t {
  30. using parent_t = ra::supervisor_asio_t;
  31. using parent_t::parent_t;
  32. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  33. parent_t::configure(plugin);
  34. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  35. p.register_name(names::coordinator, get_address());
  36. p.register_name(names::peer_supervisor, get_address());
  37. p.register_name(names::http11_relay, get_address());
  38. });
  39. if (configure_callback) {
  40. configure_callback(plugin);
  41. }
  42. }
  43. void on_child_shutdown(actor_base_t *actor) noexcept override {
  44. if (actor) {
  45. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  46. actor->get_shutdown_reason()->message());
  47. }
  48. parent_t::on_child_shutdown(actor);
  49. }
  50. void shutdown_finish() noexcept override {
  51. parent_t::shutdown_finish();
  52. if (acceptor) {
  53. acceptor->cancel();
  54. }
  55. }
  56. auto get_state() noexcept { return state; }
  57. asio::ip::tcp::acceptor *acceptor = nullptr;
  58. configure_callback_t configure_callback;
  59. };
  60. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  61. using actor_ptr_t = r::intrusive_ptr_t<relay_actor_t>;
  62. struct fixture_t : private model::diff::cluster_visitor_t {
  63. using acceptor_t = asio::ip::tcp::acceptor;
  64. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  65. test::init_logging();
  66. log = utils::get_logger("fixture");
  67. relay_config = config::relay_config_t{
  68. true,
  69. true,
  70. utils::parse("https://some-endpoint.com/"),
  71. 1024 * 1024,
  72. };
  73. }
  74. void run() noexcept {
  75. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  76. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  77. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  78. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  79. using model_update_t = model::message::model_update_t;
  80. p.subscribe_actor(r::lambda<model_update_t>([&](model_update_t &msg) { on(msg); }));
  81. using http_req_t = net::message::http_request_t;
  82. p.subscribe_actor(r::lambda<http_req_t>([&](http_req_t &req) {
  83. LOG_INFO(log, "received http request");
  84. http::response<http::string_body> res;
  85. res.result(200);
  86. res.body() = public_relays;
  87. sup->reply_to(req, std::move(res), public_relays.size());
  88. }));
  89. using connect_req_t = net::message::connect_request_t;
  90. p.subscribe_actor(r::lambda<connect_req_t>([&](connect_req_t &req) {
  91. LOG_INFO(log, "(connect request)");
  92. on(req);
  93. }));
  94. });
  95. };
  96. sup->start();
  97. sup->do_process();
  98. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  99. acceptor.open(ep.protocol());
  100. acceptor.bind(ep);
  101. acceptor.listen();
  102. listening_ep = acceptor.local_endpoint();
  103. my_keys = utils::generate_pair("me").value();
  104. relay_keys = utils::generate_pair("relay").value();
  105. peer_keys = utils::generate_pair("peer").value();
  106. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  107. auto rd = model::device_id_t::from_cert(relay_keys.cert_data).value();
  108. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  109. my_device = device_t::create(md, "my-device").value();
  110. relay_device = device_t::create(rd, "relay-device").value();
  111. peer_device = device_t::create(rd, "peer-device").value();
  112. public_relays = generate_public_relays(listening_ep, relay_device);
  113. log->debug("public relays json: {}", public_relays);
  114. initiate_accept();
  115. cluster = new cluster_t(my_device, 1);
  116. cluster->get_devices().put(my_device);
  117. cluster->get_devices().put(peer_device);
  118. session_key = "lorem-imspum-dolor";
  119. main();
  120. }
  121. virtual void main() noexcept {}
  122. virtual std::string generate_public_relays(const asio::ip::tcp::endpoint &,
  123. model::device_ptr_t &relay_device) noexcept {
  124. std::string pattern = R""(
  125. {
  126. "relays": [
  127. {
  128. "url": "##URL##&pingInterval=0m1s&networkTimeout=2m0s&sessionLimitBps=0&globalLimitBps=0&statusAddr=:22070&providedBy=ina",
  129. "location": {
  130. "latitude": 50.1049,
  131. "longitude": 8.6295,
  132. "city": "Frankfurt am Main",
  133. "country": "DE",
  134. "continent": "EU"
  135. }
  136. }
  137. ]
  138. }
  139. )"";
  140. auto url = fmt::format("relay://{}/?id={}", listening_ep, relay_device->device_id().get_value());
  141. return boost::algorithm::replace_first_copy(pattern, "##URL##", url);
  142. }
  143. virtual void initiate_accept() noexcept {
  144. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  145. sup->acceptor = &acceptor;
  146. }
  147. virtual void accept(const sys::error_code &ec) noexcept {
  148. LOG_INFO(log, "accept (relay), ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  149. auto uri = utils::parse("tcp://127.0.0.1:0/");
  150. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  151. relay_trans = transport::initiate_stream(cfg);
  152. relay_read();
  153. }
  154. virtual actor_ptr_t create_actor() noexcept {
  155. return sup->create_actor<actor_ptr_t::element_type>()
  156. .timeout(timeout)
  157. .cluster(cluster)
  158. .relay_config(relay_config)
  159. .escalate_failure()
  160. .finish();
  161. }
  162. virtual void on(net::message::connect_request_t &req) noexcept {
  163. auto &uri = req.payload.request_payload.uri;
  164. log->info("requested connect to {}", uri);
  165. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  166. auto ip = asio::ip::make_address(host);
  167. auto peer_ep = tcp::endpoint(ip, uri->port_number());
  168. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  169. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  170. auto trans = transport::initiate_stream(cfg);
  171. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  172. using ptr_t = model::intrusive_ptr_t<std::decay_t<decltype(req)>>;
  173. auto ptr = ptr_t(&req);
  174. transport::connect_fn_t on_connect = [ptr, trans, addresses_ptr, this](const tcp::endpoint &ep) {
  175. LOG_INFO(log, "active/connected");
  176. sup->reply_to(*ptr, trans, ep);
  177. };
  178. trans->async_connect(addresses_ptr, on_connect, on_error);
  179. }
  180. void send_relay(const proto::relay::message_t &msg) noexcept {
  181. proto::relay::serialize(msg, relay_tx);
  182. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/write, err: {}", ec.message()); };
  183. transport::io_fn_t on_write = [&](size_t bytes) { LOG_TRACE(log, "relay/write, {} bytes", bytes); };
  184. relay_trans->async_send(asio::buffer(relay_tx), on_write, on_error);
  185. }
  186. virtual void on_relay(proto::relay::ping_t &) noexcept {};
  187. virtual void on_relay(proto::relay::pong_t &) noexcept {};
  188. virtual void on_relay(proto::relay::join_relay_request_t &) noexcept {
  189. LOG_INFO(log, "join_relay_request_t");
  190. send_relay(proto::relay::response_t{0, "ok"});
  191. };
  192. virtual void on_relay(proto::relay::join_session_request_t &) noexcept {};
  193. virtual void on_relay(proto::relay::response_t &) noexcept {};
  194. virtual void on_relay(proto::relay::connect_request_t &) noexcept {};
  195. virtual void on_relay(proto::relay::session_invitation_t &) noexcept {};
  196. virtual void on(model::message::model_update_t &update) noexcept {
  197. auto &diff = *update.payload.diff;
  198. auto r = diff.apply(*cluster, get_apply_controller());
  199. if (!r) {
  200. LOG_ERROR(log, "error applying diff: {}", r.error().message());
  201. }
  202. r = diff.visit(*this, nullptr);
  203. if (!r) {
  204. LOG_ERROR(log, "error visiting diff: {}", r.error().message());
  205. }
  206. }
  207. void relay_read() noexcept {
  208. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/read, err: {}", ec.message()); };
  209. transport::io_fn_t on_read = [&](size_t bytes) {
  210. LOG_TRACE(log, "relay/read, {} bytes", bytes);
  211. auto msg = proto::relay::parse({relay_rx.data(), bytes});
  212. auto wrapped = std::get_if<proto::relay::wrapped_message_t>(&msg);
  213. if (!wrapped) {
  214. LOG_ERROR(log, "relay/read non-message?");
  215. return;
  216. }
  217. std::visit([&](auto &it) { on_relay(it); }, wrapped->message);
  218. };
  219. relay_rx.resize(1500);
  220. auto buff = asio::buffer(relay_rx.data(), relay_rx.size());
  221. relay_trans->async_recv(buff, on_read, on_error);
  222. LOG_TRACE(log, "relay/async recv");
  223. }
  224. config::relay_config_t relay_config;
  225. cluster_ptr_t cluster;
  226. asio::io_context io_ctx;
  227. ra::system_context_asio_t ctx;
  228. acceptor_t acceptor;
  229. supervisor_ptr_t sup;
  230. asio::ip::tcp::endpoint listening_ep;
  231. utils::logger_t log;
  232. asio::ip::tcp::socket peer_sock;
  233. std::string public_relays;
  234. utils::key_pair_t my_keys;
  235. utils::key_pair_t relay_keys;
  236. utils::key_pair_t peer_keys;
  237. model::device_ptr_t my_device;
  238. model::device_ptr_t relay_device;
  239. model::device_ptr_t peer_device;
  240. transport::stream_sp_t relay_trans;
  241. std::string relay_rx;
  242. std::string relay_tx;
  243. std::string session_key;
  244. };
  245. void test_master_connect() {
  246. struct F : fixture_t {
  247. void main() noexcept override {
  248. auto act = create_actor();
  249. io_ctx.run();
  250. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  251. REQUIRE(my_device->get_uris().size() == 1);
  252. CHECK(my_device->get_uris()[0]->scheme() == "relay");
  253. sup->shutdown();
  254. io_ctx.restart();
  255. io_ctx.run();
  256. CHECK(my_device->get_uris().size() == 0);
  257. io_ctx.restart();
  258. io_ctx.run();
  259. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  260. }
  261. void on(model::message::model_update_t &update) noexcept override {
  262. LOG_INFO(log, "contact_update_t");
  263. fixture_t::on(update);
  264. io_ctx.stop();
  265. }
  266. };
  267. F().run();
  268. }
  269. void test_passive() {
  270. struct F : fixture_t {
  271. void main() noexcept override {
  272. auto act = create_actor();
  273. io_ctx.run();
  274. CHECK(sent);
  275. CHECK(received);
  276. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  277. sup->shutdown();
  278. io_ctx.restart();
  279. io_ctx.run();
  280. CHECK(my_device->get_uris().size() == 0);
  281. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  282. }
  283. void on(model::message::model_update_t &update) noexcept override {
  284. LOG_INFO(log, "contact_update_t");
  285. fixture_t::on(update);
  286. if (my_device->get_uris().size() == 1 && !sent) {
  287. sent = true;
  288. auto msg = proto::relay::session_invitation_t{
  289. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  290. send_relay(msg);
  291. }
  292. }
  293. outcome::result<void> operator()(const model::diff::contact::relay_connect_request_t &diff,
  294. void *) noexcept override {
  295. CHECK(diff.peer == peer_device->device_id());
  296. CHECK(diff.session_key == session_key);
  297. CHECK(diff.relay.port() == 12345);
  298. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  299. received = true;
  300. io_ctx.stop();
  301. return outcome::success();
  302. }
  303. bool sent = false;
  304. bool received = false;
  305. };
  306. F().run();
  307. }
  308. void test_passive_ping_pong() {
  309. struct F : fixture_t {
  310. void main() noexcept override {
  311. auto act = create_actor();
  312. io_ctx.run();
  313. CHECK(received);
  314. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  315. sup->shutdown();
  316. io_ctx.restart();
  317. io_ctx.run();
  318. CHECK(my_device->get_uris().size() == 0);
  319. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  320. }
  321. void on_relay(proto::relay::join_relay_request_t &request) noexcept override {
  322. fixture_t::on_relay(request);
  323. relay_read();
  324. };
  325. void on_relay(proto::relay::ping_t &message) noexcept override {
  326. fixture_t::on_relay(message);
  327. send_relay(proto::relay::pong_t{});
  328. auto msg = proto::relay::session_invitation_t{
  329. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  330. send_relay(msg);
  331. };
  332. outcome::result<void> operator()(const model::diff::contact::relay_connect_request_t &diff,
  333. void *) noexcept override {
  334. CHECK(diff.peer == peer_device->device_id());
  335. CHECK(diff.session_key == session_key);
  336. CHECK(diff.relay.port() == 12345);
  337. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  338. received = true;
  339. io_ctx.stop();
  340. return outcome::success();
  341. }
  342. bool received = false;
  343. };
  344. F().run();
  345. }
  346. void test_passive_unknown() {
  347. struct F : fixture_t {
  348. void main() noexcept override {
  349. auto act = create_actor();
  350. cluster->get_devices().remove(peer_device);
  351. io_ctx.run();
  352. CHECK(sent);
  353. CHECK(unknownon_connected);
  354. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  355. sup->shutdown();
  356. io_ctx.restart();
  357. io_ctx.run();
  358. CHECK(my_device->get_uris().size() == 0);
  359. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  360. }
  361. void on(model::message::model_update_t &update) noexcept override {
  362. LOG_INFO(log, "contact_update_t");
  363. fixture_t::on(update);
  364. if (my_device->get_uris().size() == 1 && !sent) {
  365. sent = true;
  366. auto msg = proto::relay::session_invitation_t{
  367. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  368. send_relay(msg);
  369. }
  370. }
  371. outcome::result<void> operator()(const model::diff::contact::unknown_connected_t &diff,
  372. void *) noexcept override {
  373. CHECK(diff.device_id == peer_device->device_id());
  374. unknownon_connected = true;
  375. io_ctx.stop();
  376. return outcome::success();
  377. }
  378. bool sent = false;
  379. bool unknownon_connected = false;
  380. };
  381. F().run();
  382. }
  383. int _init() {
  384. REGISTER_TEST_CASE(test_master_connect, "test_master_connect", "[relay]");
  385. REGISTER_TEST_CASE(test_passive, "test_passive", "[relay]");
  386. REGISTER_TEST_CASE(test_passive_ping_pong, "test_passive_ping_pong", "[relay]");
  387. REGISTER_TEST_CASE(test_passive_unknown, "test_passive_unknown", "[relay]");
  388. return 1;
  389. }
  390. static int v = _init();