078-relay.cpp 13 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/modify/relay_connect_request.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "net/relay_actor.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. #include <boost/algorithm/string/replace.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. namespace asio = boost::asio;
  21. namespace sys = boost::system;
  22. namespace r = rotor;
  23. namespace ra = r::asio;
  24. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using parent_t = ra::supervisor_asio_t;
  29. using parent_t::parent_t;
  30. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  31. parent_t::configure(plugin);
  32. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  33. p.register_name(names::coordinator, get_address());
  34. p.register_name(names::peer_supervisor, get_address());
  35. p.register_name(names::http11_relay, get_address());
  36. });
  37. if (configure_callback) {
  38. configure_callback(plugin);
  39. }
  40. }
  41. void on_child_shutdown(actor_base_t *actor) noexcept override {
  42. if (actor) {
  43. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  44. actor->get_shutdown_reason()->message());
  45. }
  46. parent_t::on_child_shutdown(actor);
  47. }
  48. void shutdown_finish() noexcept override {
  49. parent_t::shutdown_finish();
  50. if (acceptor) {
  51. acceptor->cancel();
  52. }
  53. }
  54. auto get_state() noexcept { return state; }
  55. asio::ip::tcp::acceptor *acceptor = nullptr;
  56. configure_callback_t configure_callback;
  57. };
  58. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  59. using actor_ptr_t = r::intrusive_ptr_t<relay_actor_t>;
  60. struct fixture_t : private model::diff::contact_visitor_t {
  61. using acceptor_t = asio::ip::tcp::acceptor;
  62. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  63. utils::set_default("trace");
  64. log = utils::get_logger("fixture");
  65. relay_config = config::relay_config_t{
  66. true,
  67. utils::parse("https://some-endpoint.com/"),
  68. 1024 * 1024,
  69. };
  70. }
  71. void run() noexcept {
  72. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  73. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  74. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  75. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  76. using contact_update_t = model::message::contact_update_t;
  77. p.subscribe_actor(r::lambda<contact_update_t>([&](contact_update_t &msg) { on(msg); }));
  78. using http_req_t = net::message::http_request_t;
  79. p.subscribe_actor(r::lambda<http_req_t>([&](http_req_t &req) {
  80. LOG_INFO(log, "received http request");
  81. http::response<http::string_body> res;
  82. res.result(200);
  83. res.body() = public_relays;
  84. sup->reply_to(req, std::move(res), public_relays.size());
  85. }));
  86. using connect_req_t = net::message::connect_request_t;
  87. p.subscribe_actor(r::lambda<connect_req_t>([&](connect_req_t &req) {
  88. LOG_INFO(log, "(connect request)");
  89. on(req);
  90. }));
  91. });
  92. };
  93. sup->start();
  94. sup->do_process();
  95. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  96. acceptor.open(ep.protocol());
  97. acceptor.bind(ep);
  98. acceptor.listen();
  99. listening_ep = acceptor.local_endpoint();
  100. my_keys = utils::generate_pair("me").value();
  101. relay_keys = utils::generate_pair("relay").value();
  102. peer_keys = utils::generate_pair("peer").value();
  103. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  104. auto rd = model::device_id_t::from_cert(relay_keys.cert_data).value();
  105. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  106. my_device = device_t::create(md, "my-device").value();
  107. relay_device = device_t::create(rd, "relay-device").value();
  108. peer_device = device_t::create(rd, "peer-device").value();
  109. public_relays = generate_public_relays(listening_ep, relay_device);
  110. log->debug("public relays json: {}", public_relays);
  111. initiate_accept();
  112. cluster = new cluster_t(my_device, 1, 1);
  113. cluster->get_devices().put(my_device);
  114. cluster->get_devices().put(peer_device);
  115. session_key = "lorem-imspum-dolor";
  116. main();
  117. }
  118. virtual void main() noexcept {}
  119. virtual std::string generate_public_relays(const asio::ip::tcp::endpoint &,
  120. model::device_ptr_t &relay_device) noexcept {
  121. std::string pattern = R""(
  122. {
  123. "relays": [
  124. {
  125. "url": "##URL##&pingInterval=0m5s&networkTimeout=2m0s&sessionLimitBps=0&globalLimitBps=0&statusAddr=:22070&providedBy=ina",
  126. "location": {
  127. "latitude": 50.1049,
  128. "longitude": 8.6295,
  129. "city": "Frankfurt am Main",
  130. "country": "DE",
  131. "continent": "EU"
  132. }
  133. }
  134. ]
  135. }
  136. )"";
  137. auto url = fmt::format("relay://{}/?id={}", listening_ep, relay_device->device_id().get_value());
  138. return boost::algorithm::replace_first_copy(pattern, "##URL##", url);
  139. }
  140. virtual void initiate_accept() noexcept {
  141. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  142. sup->acceptor = &acceptor;
  143. }
  144. virtual void accept(const sys::error_code &ec) noexcept {
  145. LOG_INFO(log, "accept (relay), ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  146. auto uri = utils::parse("tcp://127.0.0.1:0/");
  147. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  148. relay_trans = transport::initiate_stream(cfg);
  149. relay_read();
  150. }
  151. virtual actor_ptr_t create_actor() noexcept {
  152. return sup->create_actor<actor_ptr_t::element_type>()
  153. .timeout(timeout)
  154. .cluster(cluster)
  155. .relay_config(relay_config)
  156. .escalate_failure()
  157. .finish();
  158. }
  159. virtual void on(net::message::connect_request_t &req) noexcept {
  160. auto &uri = req.payload.request_payload.uri;
  161. log->info("requested connect to {}", uri);
  162. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  163. auto ip = asio::ip::make_address(host);
  164. auto peer_ep = tcp::endpoint(ip, uri->port_number());
  165. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  166. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  167. auto trans = transport::initiate_stream(cfg);
  168. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  169. using ptr_t = model::intrusive_ptr_t<std::decay_t<decltype(req)>>;
  170. auto ptr = ptr_t(&req);
  171. transport::connect_fn_t on_connect = [ptr, trans, addresses_ptr, this](const tcp::endpoint &ep) {
  172. LOG_INFO(log, "active/connected");
  173. sup->reply_to(*ptr, trans, ep);
  174. };
  175. trans->async_connect(*addresses_ptr, on_connect, on_error);
  176. }
  177. void send_relay(const proto::relay::message_t &msg) noexcept {
  178. proto::relay::serialize(msg, relay_tx);
  179. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/write, err: {}", ec.message()); };
  180. transport::io_fn_t on_write = [&](size_t bytes) { LOG_TRACE(log, "relay/write, {} bytes", bytes); };
  181. relay_trans->async_send(asio::buffer(relay_tx), on_write, on_error);
  182. }
  183. void on(proto::relay::ping_t &) noexcept {
  184. };
  185. void on(proto::relay::pong_t &) noexcept {
  186. };
  187. void on(proto::relay::join_relay_request_t &) noexcept {
  188. LOG_INFO(log, "join_relay_request_t");
  189. send_relay(proto::relay::response_t{0, "ok"});
  190. };
  191. void on(proto::relay::join_session_request_t &) noexcept {
  192. };
  193. void on(proto::relay::response_t &) noexcept {
  194. };
  195. void on(proto::relay::connect_request_t &) noexcept {
  196. };
  197. void on(proto::relay::session_invitation_t &) noexcept {
  198. };
  199. virtual void on(model::message::contact_update_t &update) noexcept {
  200. auto &diff = *update.payload.diff;
  201. auto r = diff.apply(*cluster);
  202. if (!r) {
  203. LOG_ERROR(log, "error applying diff: {}", r.error().message());
  204. }
  205. r = diff.visit(*this, nullptr);
  206. if (!r) {
  207. LOG_ERROR(log, "error visiting diff: {}", r.error().message());
  208. }
  209. }
  210. void relay_read() noexcept {
  211. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/read, err: {}", ec.message()); };
  212. transport::io_fn_t on_read = [&](size_t bytes) {
  213. LOG_TRACE(log, "relay/read, {} bytes", bytes);
  214. auto msg = proto::relay::parse({relay_rx.data(), bytes});
  215. auto wrapped = std::get_if<proto::relay::wrapped_message_t>(&msg);
  216. if (!wrapped) {
  217. LOG_ERROR(log, "relay/read non-message?");
  218. return;
  219. }
  220. std::visit([&](auto &it) { on(it); }, wrapped->message);
  221. };
  222. relay_rx.resize(1500);
  223. auto buff = asio::buffer(relay_rx.data(), relay_rx.size());
  224. relay_trans->async_recv(buff, on_read, on_error);
  225. LOG_TRACE(log, "relay/async recv");
  226. }
  227. config::relay_config_t relay_config;
  228. cluster_ptr_t cluster;
  229. asio::io_context io_ctx;
  230. ra::system_context_asio_t ctx;
  231. acceptor_t acceptor;
  232. supervisor_ptr_t sup;
  233. asio::ip::tcp::endpoint listening_ep;
  234. utils::logger_t log;
  235. asio::ip::tcp::socket peer_sock;
  236. std::string public_relays;
  237. utils::key_pair_t my_keys;
  238. utils::key_pair_t relay_keys;
  239. utils::key_pair_t peer_keys;
  240. model::device_ptr_t my_device;
  241. model::device_ptr_t relay_device;
  242. model::device_ptr_t peer_device;
  243. transport::stream_sp_t relay_trans;
  244. std::string relay_rx;
  245. std::string relay_tx;
  246. std::string session_key;
  247. };
  248. void test_master_connect() {
  249. struct F : fixture_t {
  250. void main() noexcept override {
  251. auto act = create_actor();
  252. io_ctx.run();
  253. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  254. REQUIRE(my_device->get_uris().size() == 1);
  255. CHECK(my_device->get_uris()[0]->scheme() == "relay");
  256. sup->shutdown();
  257. io_ctx.restart();
  258. io_ctx.run();
  259. CHECK(my_device->get_uris().size() == 0);
  260. io_ctx.restart();
  261. io_ctx.run();
  262. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  263. }
  264. void on(model::message::contact_update_t &update) noexcept override {
  265. LOG_INFO(log, "contact_update_t");
  266. fixture_t::on(update);
  267. io_ctx.stop();
  268. }
  269. };
  270. F().run();
  271. }
  272. void test_passive() {
  273. struct F : fixture_t {
  274. void main() noexcept override {
  275. auto act = create_actor();
  276. io_ctx.run();
  277. CHECK(sent);
  278. CHECK(received);
  279. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  280. sup->shutdown();
  281. io_ctx.restart();
  282. io_ctx.run();
  283. CHECK(my_device->get_uris().size() == 0);
  284. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  285. }
  286. void on(model::message::contact_update_t &update) noexcept override {
  287. LOG_INFO(log, "contact_update_t");
  288. fixture_t::on(update);
  289. if (my_device->get_uris().size() == 1 && !sent) {
  290. sent = true;
  291. auto msg = proto::relay::session_invitation_t{
  292. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  293. send_relay(msg);
  294. }
  295. }
  296. outcome::result<void> operator()(const model::diff::modify::relay_connect_request_t &diff,
  297. void *) noexcept override {
  298. CHECK(diff.peer == peer_device->device_id());
  299. CHECK(diff.session_key == session_key);
  300. CHECK(diff.relay.port() == 12345);
  301. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  302. received = true;
  303. io_ctx.stop();
  304. return outcome::success();
  305. }
  306. bool sent = false;
  307. bool received = false;
  308. };
  309. F().run();
  310. }
  311. int _init() {
  312. REGISTER_TEST_CASE(test_master_connect, "test_master_connect", "[relay]");
  313. REGISTER_TEST_CASE(test_passive, "test_passive", "[relay]");
  314. return 1;
  315. }
  316. static int v = _init();