078-relay.cpp 13 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/contact_visitor.h"
  10. #include "model/diff/contact/relay_connect_request.h"
  11. #include "net/names.h"
  12. #include "net/messages.h"
  13. #include "net/relay_actor.h"
  14. #include "transport/stream.h"
  15. #include <rotor/asio.hpp>
  16. #include <boost/algorithm/string/replace.hpp>
  17. using namespace syncspirit;
  18. using namespace syncspirit::test;
  19. using namespace syncspirit::model;
  20. using namespace syncspirit::net;
  21. namespace asio = boost::asio;
  22. namespace sys = boost::system;
  23. namespace r = rotor;
  24. namespace ra = r::asio;
  25. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  26. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  27. auto host = "127.0.0.1";
  28. struct supervisor_t : ra::supervisor_asio_t {
  29. using parent_t = ra::supervisor_asio_t;
  30. using parent_t::parent_t;
  31. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  32. parent_t::configure(plugin);
  33. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  34. p.register_name(names::coordinator, get_address());
  35. p.register_name(names::peer_supervisor, get_address());
  36. p.register_name(names::http11_relay, get_address());
  37. });
  38. if (configure_callback) {
  39. configure_callback(plugin);
  40. }
  41. }
  42. void on_child_shutdown(actor_base_t *actor) noexcept override {
  43. if (actor) {
  44. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  45. actor->get_shutdown_reason()->message());
  46. }
  47. parent_t::on_child_shutdown(actor);
  48. }
  49. void shutdown_finish() noexcept override {
  50. parent_t::shutdown_finish();
  51. if (acceptor) {
  52. acceptor->cancel();
  53. }
  54. }
  55. auto get_state() noexcept { return state; }
  56. asio::ip::tcp::acceptor *acceptor = nullptr;
  57. configure_callback_t configure_callback;
  58. };
  59. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  60. using actor_ptr_t = r::intrusive_ptr_t<relay_actor_t>;
  61. struct fixture_t : private model::diff::contact_visitor_t {
  62. using acceptor_t = asio::ip::tcp::acceptor;
  63. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  64. utils::set_default("trace");
  65. log = utils::get_logger("fixture");
  66. relay_config = config::relay_config_t{
  67. true,
  68. true,
  69. utils::parse("https://some-endpoint.com/"),
  70. 1024 * 1024,
  71. };
  72. }
  73. void run() noexcept {
  74. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  75. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  76. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  77. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  78. using contact_update_t = model::message::contact_update_t;
  79. p.subscribe_actor(r::lambda<contact_update_t>([&](contact_update_t &msg) { on(msg); }));
  80. using http_req_t = net::message::http_request_t;
  81. p.subscribe_actor(r::lambda<http_req_t>([&](http_req_t &req) {
  82. LOG_INFO(log, "received http request");
  83. http::response<http::string_body> res;
  84. res.result(200);
  85. res.body() = public_relays;
  86. sup->reply_to(req, std::move(res), public_relays.size());
  87. }));
  88. using connect_req_t = net::message::connect_request_t;
  89. p.subscribe_actor(r::lambda<connect_req_t>([&](connect_req_t &req) {
  90. LOG_INFO(log, "(connect request)");
  91. on(req);
  92. }));
  93. });
  94. };
  95. sup->start();
  96. sup->do_process();
  97. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  98. acceptor.open(ep.protocol());
  99. acceptor.bind(ep);
  100. acceptor.listen();
  101. listening_ep = acceptor.local_endpoint();
  102. my_keys = utils::generate_pair("me").value();
  103. relay_keys = utils::generate_pair("relay").value();
  104. peer_keys = utils::generate_pair("peer").value();
  105. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  106. auto rd = model::device_id_t::from_cert(relay_keys.cert_data).value();
  107. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  108. my_device = device_t::create(md, "my-device").value();
  109. relay_device = device_t::create(rd, "relay-device").value();
  110. peer_device = device_t::create(rd, "peer-device").value();
  111. public_relays = generate_public_relays(listening_ep, relay_device);
  112. log->debug("public relays json: {}", public_relays);
  113. initiate_accept();
  114. cluster = new cluster_t(my_device, 1);
  115. cluster->get_devices().put(my_device);
  116. cluster->get_devices().put(peer_device);
  117. session_key = "lorem-imspum-dolor";
  118. main();
  119. }
  120. virtual void main() noexcept {}
  121. virtual std::string generate_public_relays(const asio::ip::tcp::endpoint &,
  122. model::device_ptr_t &relay_device) noexcept {
  123. std::string pattern = R""(
  124. {
  125. "relays": [
  126. {
  127. "url": "##URL##&pingInterval=0m5s&networkTimeout=2m0s&sessionLimitBps=0&globalLimitBps=0&statusAddr=:22070&providedBy=ina",
  128. "location": {
  129. "latitude": 50.1049,
  130. "longitude": 8.6295,
  131. "city": "Frankfurt am Main",
  132. "country": "DE",
  133. "continent": "EU"
  134. }
  135. }
  136. ]
  137. }
  138. )"";
  139. auto url = fmt::format("relay://{}/?id={}", listening_ep, relay_device->device_id().get_value());
  140. return boost::algorithm::replace_first_copy(pattern, "##URL##", url);
  141. }
  142. virtual void initiate_accept() noexcept {
  143. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  144. sup->acceptor = &acceptor;
  145. }
  146. virtual void accept(const sys::error_code &ec) noexcept {
  147. LOG_INFO(log, "accept (relay), ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  148. auto uri = utils::parse("tcp://127.0.0.1:0/");
  149. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  150. relay_trans = transport::initiate_stream(cfg);
  151. relay_read();
  152. }
  153. virtual actor_ptr_t create_actor() noexcept {
  154. return sup->create_actor<actor_ptr_t::element_type>()
  155. .timeout(timeout)
  156. .cluster(cluster)
  157. .relay_config(relay_config)
  158. .escalate_failure()
  159. .finish();
  160. }
  161. virtual void on(net::message::connect_request_t &req) noexcept {
  162. auto &uri = req.payload.request_payload.uri;
  163. log->info("requested connect to {}", uri);
  164. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  165. auto ip = asio::ip::make_address(host);
  166. auto peer_ep = tcp::endpoint(ip, uri->port_number());
  167. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  168. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  169. auto trans = transport::initiate_stream(cfg);
  170. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  171. using ptr_t = model::intrusive_ptr_t<std::decay_t<decltype(req)>>;
  172. auto ptr = ptr_t(&req);
  173. transport::connect_fn_t on_connect = [ptr, trans, addresses_ptr, this](const tcp::endpoint &ep) {
  174. LOG_INFO(log, "active/connected");
  175. sup->reply_to(*ptr, trans, ep);
  176. };
  177. trans->async_connect(addresses_ptr, on_connect, on_error);
  178. }
  179. void send_relay(const proto::relay::message_t &msg) noexcept {
  180. proto::relay::serialize(msg, relay_tx);
  181. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/write, err: {}", ec.message()); };
  182. transport::io_fn_t on_write = [&](size_t bytes) { LOG_TRACE(log, "relay/write, {} bytes", bytes); };
  183. relay_trans->async_send(asio::buffer(relay_tx), on_write, on_error);
  184. }
  185. void on(proto::relay::ping_t &) noexcept {
  186. };
  187. void on(proto::relay::pong_t &) noexcept {
  188. };
  189. void on(proto::relay::join_relay_request_t &) noexcept {
  190. LOG_INFO(log, "join_relay_request_t");
  191. send_relay(proto::relay::response_t{0, "ok"});
  192. };
  193. void on(proto::relay::join_session_request_t &) noexcept {
  194. };
  195. void on(proto::relay::response_t &) noexcept {
  196. };
  197. void on(proto::relay::connect_request_t &) noexcept {
  198. };
  199. void on(proto::relay::session_invitation_t &) noexcept {
  200. };
  201. virtual void on(model::message::contact_update_t &update) noexcept {
  202. auto &diff = *update.payload.diff;
  203. auto r = diff.apply(*cluster);
  204. if (!r) {
  205. LOG_ERROR(log, "error applying diff: {}", r.error().message());
  206. }
  207. r = diff.visit(*this, nullptr);
  208. if (!r) {
  209. LOG_ERROR(log, "error visiting diff: {}", r.error().message());
  210. }
  211. }
  212. void relay_read() noexcept {
  213. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/read, err: {}", ec.message()); };
  214. transport::io_fn_t on_read = [&](size_t bytes) {
  215. LOG_TRACE(log, "relay/read, {} bytes", bytes);
  216. auto msg = proto::relay::parse({relay_rx.data(), bytes});
  217. auto wrapped = std::get_if<proto::relay::wrapped_message_t>(&msg);
  218. if (!wrapped) {
  219. LOG_ERROR(log, "relay/read non-message?");
  220. return;
  221. }
  222. std::visit([&](auto &it) { on(it); }, wrapped->message);
  223. };
  224. relay_rx.resize(1500);
  225. auto buff = asio::buffer(relay_rx.data(), relay_rx.size());
  226. relay_trans->async_recv(buff, on_read, on_error);
  227. LOG_TRACE(log, "relay/async recv");
  228. }
  229. config::relay_config_t relay_config;
  230. cluster_ptr_t cluster;
  231. asio::io_context io_ctx;
  232. ra::system_context_asio_t ctx;
  233. acceptor_t acceptor;
  234. supervisor_ptr_t sup;
  235. asio::ip::tcp::endpoint listening_ep;
  236. utils::logger_t log;
  237. asio::ip::tcp::socket peer_sock;
  238. std::string public_relays;
  239. utils::key_pair_t my_keys;
  240. utils::key_pair_t relay_keys;
  241. utils::key_pair_t peer_keys;
  242. model::device_ptr_t my_device;
  243. model::device_ptr_t relay_device;
  244. model::device_ptr_t peer_device;
  245. transport::stream_sp_t relay_trans;
  246. std::string relay_rx;
  247. std::string relay_tx;
  248. std::string session_key;
  249. };
  250. void test_master_connect() {
  251. struct F : fixture_t {
  252. void main() noexcept override {
  253. auto act = create_actor();
  254. io_ctx.run();
  255. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  256. REQUIRE(my_device->get_uris().size() == 1);
  257. CHECK(my_device->get_uris()[0]->scheme() == "relay");
  258. sup->shutdown();
  259. io_ctx.restart();
  260. io_ctx.run();
  261. CHECK(my_device->get_uris().size() == 0);
  262. io_ctx.restart();
  263. io_ctx.run();
  264. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  265. }
  266. void on(model::message::contact_update_t &update) noexcept override {
  267. LOG_INFO(log, "contact_update_t");
  268. fixture_t::on(update);
  269. io_ctx.stop();
  270. }
  271. };
  272. F().run();
  273. }
  274. void test_passive() {
  275. struct F : fixture_t {
  276. void main() noexcept override {
  277. auto act = create_actor();
  278. io_ctx.run();
  279. CHECK(sent);
  280. CHECK(received);
  281. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  282. sup->shutdown();
  283. io_ctx.restart();
  284. io_ctx.run();
  285. CHECK(my_device->get_uris().size() == 0);
  286. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  287. }
  288. void on(model::message::contact_update_t &update) noexcept override {
  289. LOG_INFO(log, "contact_update_t");
  290. fixture_t::on(update);
  291. if (my_device->get_uris().size() == 1 && !sent) {
  292. sent = true;
  293. auto msg = proto::relay::session_invitation_t{
  294. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  295. send_relay(msg);
  296. }
  297. }
  298. outcome::result<void> operator()(const model::diff::contact::relay_connect_request_t &diff,
  299. void *) noexcept override {
  300. CHECK(diff.peer == peer_device->device_id());
  301. CHECK(diff.session_key == session_key);
  302. CHECK(diff.relay.port() == 12345);
  303. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  304. received = true;
  305. io_ctx.stop();
  306. return outcome::success();
  307. }
  308. bool sent = false;
  309. bool received = false;
  310. };
  311. F().run();
  312. }
  313. int _init() {
  314. REGISTER_TEST_CASE(test_master_connect, "test_master_connect", "[relay]");
  315. REGISTER_TEST_CASE(test_passive, "test_passive", "[relay]");
  316. return 1;
  317. }
  318. static int v = _init();