078-relay.cpp 13 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2023 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/modify/relay_connect_request.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "net/relay_actor.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. #include <boost/algorithm/string/replace.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. namespace asio = boost::asio;
  21. namespace sys = boost::system;
  22. namespace r = rotor;
  23. namespace ra = r::asio;
  24. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using parent_t = ra::supervisor_asio_t;
  29. using parent_t::parent_t;
  30. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  31. parent_t::configure(plugin);
  32. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  33. p.register_name(names::coordinator, get_address());
  34. p.register_name(names::peer_supervisor, get_address());
  35. p.register_name(names::http11_relay, get_address());
  36. });
  37. if (configure_callback) {
  38. configure_callback(plugin);
  39. }
  40. }
  41. void on_child_shutdown(actor_base_t *actor) noexcept override {
  42. if (actor) {
  43. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  44. actor->get_shutdown_reason()->message());
  45. }
  46. parent_t::on_child_shutdown(actor);
  47. }
  48. void shutdown_finish() noexcept override {
  49. parent_t::shutdown_finish();
  50. if (acceptor) {
  51. acceptor->cancel();
  52. }
  53. }
  54. auto get_state() noexcept { return state; }
  55. asio::ip::tcp::acceptor *acceptor = nullptr;
  56. configure_callback_t configure_callback;
  57. };
  58. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  59. using actor_ptr_t = r::intrusive_ptr_t<relay_actor_t>;
  60. struct fixture_t : private model::diff::contact_visitor_t {
  61. using acceptor_t = asio::ip::tcp::acceptor;
  62. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  63. utils::set_default("trace");
  64. log = utils::get_logger("fixture");
  65. relay_config = config::relay_config_t{
  66. true,
  67. "https://some-endpoint.com/",
  68. 1024 * 1024,
  69. };
  70. }
  71. void run() noexcept {
  72. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  73. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  74. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  75. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  76. using contact_update_t = model::message::contact_update_t;
  77. p.subscribe_actor(r::lambda<contact_update_t>([&](contact_update_t &msg) { on(msg); }));
  78. using http_req_t = net::message::http_request_t;
  79. p.subscribe_actor(r::lambda<http_req_t>([&](http_req_t &req) {
  80. LOG_INFO(log, "received http request");
  81. http::response<http::string_body> res;
  82. res.result(200);
  83. res.body() = public_relays;
  84. sup->reply_to(req, std::move(res), public_relays.size());
  85. }));
  86. using connect_req_t = net::message::connect_request_t;
  87. p.subscribe_actor(r::lambda<connect_req_t>([&](connect_req_t &req) {
  88. LOG_INFO(log, "(connect request)");
  89. on(req);
  90. }));
  91. });
  92. };
  93. sup->start();
  94. sup->do_process();
  95. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  96. acceptor.open(ep.protocol());
  97. acceptor.bind(ep);
  98. acceptor.listen();
  99. listening_ep = acceptor.local_endpoint();
  100. my_keys = utils::generate_pair("me").value();
  101. relay_keys = utils::generate_pair("relay").value();
  102. peer_keys = utils::generate_pair("peer").value();
  103. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  104. auto rd = model::device_id_t::from_cert(relay_keys.cert_data).value();
  105. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  106. my_device = device_t::create(md, "my-device").value();
  107. relay_device = device_t::create(rd, "relay-device").value();
  108. peer_device = device_t::create(rd, "peer-device").value();
  109. public_relays = generate_public_relays(listening_ep, relay_device);
  110. log->debug("public relays json: {}", public_relays);
  111. initiate_accept();
  112. cluster = new cluster_t(my_device, 1, 1);
  113. cluster->get_devices().put(my_device);
  114. cluster->get_devices().put(peer_device);
  115. session_key = "lorem-imspum-dolor";
  116. main();
  117. }
  118. virtual void main() noexcept {}
  119. virtual std::string generate_public_relays(const asio::ip::tcp::endpoint &,
  120. model::device_ptr_t &relay_device) noexcept {
  121. std::string pattern = R""(
  122. {
  123. "relays": [
  124. {
  125. "url": "##URL##&pingInterval=0m5s&networkTimeout=2m0s&sessionLimitBps=0&globalLimitBps=0&statusAddr=:22070&providedBy=ina",
  126. "location": {
  127. "latitude": 50.1049,
  128. "longitude": 8.6295,
  129. "city": "Frankfurt am Main",
  130. "country": "DE",
  131. "continent": "EU"
  132. }
  133. }
  134. ]
  135. }
  136. )"";
  137. auto url = fmt::format("relay://{}/?id={}", listening_ep, relay_device->device_id().get_value());
  138. return boost::algorithm::replace_first_copy(pattern, "##URL##", url);
  139. }
  140. virtual void initiate_accept() noexcept {
  141. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  142. sup->acceptor = &acceptor;
  143. }
  144. virtual void accept(const sys::error_code &ec) noexcept {
  145. LOG_INFO(log, "accept (relay), ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  146. auto uri = utils::parse("tcp://127.0.0.1:0/").value();
  147. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  148. relay_trans = transport::initiate_stream(cfg);
  149. relay_read();
  150. }
  151. virtual actor_ptr_t create_actor() noexcept {
  152. return sup->create_actor<actor_ptr_t::element_type>()
  153. .timeout(timeout)
  154. .cluster(cluster)
  155. .relay_config(relay_config)
  156. .escalate_failure()
  157. .finish();
  158. }
  159. virtual void on(net::message::connect_request_t &req) noexcept {
  160. auto &uri = req.payload.request_payload.uri;
  161. log->info("requested connect to {}", uri.full);
  162. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  163. tcp::resolver resolver(io_ctx);
  164. auto addresses = resolver.resolve(host, std::to_string(uri.port));
  165. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  166. auto trans = transport::initiate_stream(cfg);
  167. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  168. using ptr_t = model::intrusive_ptr_t<std::decay_t<decltype(req)>>;
  169. auto ptr = ptr_t(&req);
  170. transport::connect_fn_t on_connect = [ptr, trans, addresses_ptr, this](const tcp::endpoint &ep) {
  171. LOG_INFO(log, "active/connected");
  172. sup->reply_to(*ptr, trans, ep);
  173. };
  174. trans->async_connect(*addresses_ptr, on_connect, on_error);
  175. }
  176. void send_relay(const proto::relay::message_t &msg) noexcept {
  177. proto::relay::serialize(msg, relay_tx);
  178. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/write, err: {}", ec.message()); };
  179. transport::io_fn_t on_write = [&](size_t bytes) { LOG_TRACE(log, "relay/write, {} bytes", bytes); };
  180. relay_trans->async_send(asio::buffer(relay_tx), on_write, on_error);
  181. }
  182. void on(proto::relay::ping_t &) noexcept {
  183. };
  184. void on(proto::relay::pong_t &) noexcept {
  185. };
  186. void on(proto::relay::join_relay_request_t &) noexcept {
  187. LOG_INFO(log, "join_relay_request_t");
  188. send_relay(proto::relay::response_t{0, "ok"});
  189. };
  190. void on(proto::relay::join_session_request_t &) noexcept {
  191. };
  192. void on(proto::relay::response_t &) noexcept {
  193. };
  194. void on(proto::relay::connect_request_t &) noexcept {
  195. };
  196. void on(proto::relay::session_invitation_t &) noexcept {
  197. };
  198. virtual void on(model::message::contact_update_t &update) noexcept {
  199. auto &diff = *update.payload.diff;
  200. auto r = diff.apply(*cluster);
  201. if (!r) {
  202. LOG_ERROR(log, "error applying diff: {}", r.error().message());
  203. }
  204. r = diff.visit(*this, nullptr);
  205. if (!r) {
  206. LOG_ERROR(log, "error visiting diff: {}", r.error().message());
  207. }
  208. }
  209. void relay_read() noexcept {
  210. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/read, err: {}", ec.message()); };
  211. transport::io_fn_t on_read = [&](size_t bytes) {
  212. LOG_TRACE(log, "relay/read, {} bytes", bytes);
  213. auto msg = proto::relay::parse({relay_rx.data(), bytes});
  214. auto wrapped = std::get_if<proto::relay::wrapped_message_t>(&msg);
  215. if (!wrapped) {
  216. LOG_ERROR(log, "relay/read non-message?");
  217. return;
  218. }
  219. std::visit([&](auto &it) { on(it); }, wrapped->message);
  220. };
  221. relay_rx.resize(1500);
  222. auto buff = asio::buffer(relay_rx.data(), relay_rx.size());
  223. relay_trans->async_recv(buff, on_read, on_error);
  224. LOG_TRACE(log, "relay/async recv");
  225. }
  226. config::relay_config_t relay_config;
  227. cluster_ptr_t cluster;
  228. asio::io_context io_ctx;
  229. ra::system_context_asio_t ctx;
  230. acceptor_t acceptor;
  231. supervisor_ptr_t sup;
  232. asio::ip::tcp::endpoint listening_ep;
  233. utils::logger_t log;
  234. asio::ip::tcp::socket peer_sock;
  235. std::string public_relays;
  236. utils::key_pair_t my_keys;
  237. utils::key_pair_t relay_keys;
  238. utils::key_pair_t peer_keys;
  239. model::device_ptr_t my_device;
  240. model::device_ptr_t relay_device;
  241. model::device_ptr_t peer_device;
  242. transport::stream_sp_t relay_trans;
  243. std::string relay_rx;
  244. std::string relay_tx;
  245. std::string session_key;
  246. };
  247. void test_master_connect() {
  248. struct F : fixture_t {
  249. void main() noexcept override {
  250. auto act = create_actor();
  251. io_ctx.run();
  252. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  253. REQUIRE(my_device->get_uris().size() == 1);
  254. CHECK(my_device->get_uris()[0].proto == "relay");
  255. sup->shutdown();
  256. io_ctx.restart();
  257. io_ctx.run();
  258. CHECK(my_device->get_uris().size() == 0);
  259. io_ctx.restart();
  260. io_ctx.run();
  261. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  262. }
  263. void on(model::message::contact_update_t &update) noexcept override {
  264. LOG_INFO(log, "contact_update_t");
  265. fixture_t::on(update);
  266. io_ctx.stop();
  267. }
  268. };
  269. F().run();
  270. }
  271. void test_passive() {
  272. struct F : fixture_t {
  273. void main() noexcept override {
  274. auto act = create_actor();
  275. io_ctx.run();
  276. CHECK(sent);
  277. CHECK(received);
  278. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  279. sup->shutdown();
  280. io_ctx.restart();
  281. io_ctx.run();
  282. CHECK(my_device->get_uris().size() == 0);
  283. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  284. }
  285. void on(model::message::contact_update_t &update) noexcept override {
  286. LOG_INFO(log, "contact_update_t");
  287. fixture_t::on(update);
  288. if (my_device->get_uris().size() == 1 && !sent) {
  289. sent = true;
  290. auto msg = proto::relay::session_invitation_t{
  291. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  292. send_relay(msg);
  293. }
  294. }
  295. outcome::result<void> operator()(const model::diff::modify::relay_connect_request_t &diff,
  296. void *) noexcept override {
  297. CHECK(diff.peer == peer_device->device_id());
  298. CHECK(diff.session_key == session_key);
  299. CHECK(diff.relay.port() == 12345);
  300. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  301. received = true;
  302. io_ctx.stop();
  303. return outcome::success();
  304. }
  305. bool sent = false;
  306. bool received = false;
  307. };
  308. F().run();
  309. }
  310. int _init() {
  311. REGISTER_TEST_CASE(test_master_connect, "test_master_connect", "[relay]");
  312. REGISTER_TEST_CASE(test_passive, "test_passive", "[relay]");
  313. return 1;
  314. }
  315. static int v = _init();