077-initiator.cpp 35 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "net/names.h"
  10. #include "net/initiator_actor.h"
  11. #include "net/resolver_actor.h"
  12. #include "proto/relay_support.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. using namespace syncspirit;
  16. using namespace syncspirit::test;
  17. using namespace syncspirit::model;
  18. using namespace syncspirit::net;
  19. namespace asio = boost::asio;
  20. namespace sys = boost::system;
  21. namespace r = rotor;
  22. namespace ra = r::asio;
  23. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  24. using finish_callback_t = std::function<void()>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{2000}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using ra::supervisor_asio_t::supervisor_asio_t;
  29. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  30. ra::supervisor_asio_t::configure(plugin);
  31. plugin.with_casted<r::plugin::registry_plugin_t>(
  32. [&](auto &p) { p.register_name(names::coordinator, get_address()); });
  33. if (configure_callback) {
  34. configure_callback(plugin);
  35. }
  36. }
  37. void shutdown_finish() noexcept override {
  38. ra::supervisor_asio_t::shutdown_finish();
  39. if (finish_callback) {
  40. finish_callback();
  41. }
  42. }
  43. auto get_state() noexcept { return state; }
  44. finish_callback_t finish_callback;
  45. configure_callback_t configure_callback;
  46. };
  47. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  48. using actor_ptr_t = r::intrusive_ptr_t<initiator_actor_t>;
  49. struct fixture_t {
  50. using acceptor_t = asio::ip::tcp::acceptor;
  51. using ready_ptr_t = r::intrusive_ptr_t<net::message::peer_connected_t>;
  52. using diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  53. using diff_msgs_t = std::vector<diff_ptr_t>;
  54. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  55. utils::set_default("trace");
  56. log = utils::get_logger("fixture");
  57. }
  58. virtual void finish() {
  59. acceptor.cancel();
  60. if (peer_trans) {
  61. peer_trans->cancel();
  62. }
  63. }
  64. void run() noexcept {
  65. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  66. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  67. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  68. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  69. using connected_t = typename ready_ptr_t::element_type;
  70. using diff_t = typename diff_ptr_t::element_type;
  71. p.subscribe_actor(r::lambda<connected_t>([&](connected_t &msg) {
  72. connected_message = &msg;
  73. LOG_INFO(log, "received message::peer_connected_t");
  74. }));
  75. p.subscribe_actor(r::lambda<diff_t>([&](diff_t &msg) {
  76. diff_msgs.emplace_back(&msg);
  77. LOG_INFO(log, "received diff message");
  78. }));
  79. });
  80. };
  81. sup->finish_callback = [&]() { finish(); };
  82. sup->start();
  83. sup->create_actor<resolver_actor_t>().resolve_timeout(timeout / 2).timeout(timeout).finish();
  84. sup->do_process();
  85. my_keys = utils::generate_pair("me").value();
  86. peer_keys = utils::generate_pair("peer").value();
  87. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  88. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  89. my_device = device_t::create(md, "my-device").value();
  90. peer_device = device_t::create(pd, "peer-device").value();
  91. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  92. acceptor.open(ep.protocol());
  93. acceptor.bind(ep);
  94. acceptor.listen();
  95. listening_ep = acceptor.local_endpoint();
  96. peer_uri = utils::parse(get_uri(listening_ep));
  97. log->debug("listening on {}", peer_uri);
  98. initiate_accept();
  99. cluster = new cluster_t(my_device, 1, 1);
  100. cluster->get_devices().put(my_device);
  101. cluster->get_devices().put(peer_device);
  102. main();
  103. }
  104. virtual void initiate_accept() noexcept {
  105. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  106. }
  107. virtual std::string get_uri(const asio::ip::tcp::endpoint &) noexcept {
  108. return fmt::format("tcp://{}", listening_ep);
  109. }
  110. virtual void accept(const sys::error_code &ec) noexcept {
  111. LOG_INFO(log, "accept, ec: {}", ec.message());
  112. peer_trans = transport::initiate_tls_passive(*sup, peer_keys, std::move(peer_sock));
  113. initiate_peer_handshake();
  114. }
  115. virtual void initiate_peer_handshake() noexcept {
  116. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  117. const model::device_id_t *) {
  118. valid_handshake = valid_peer;
  119. on_peer_handshake();
  120. };
  121. transport::error_fn_t on_error = [](const auto &) {};
  122. peer_trans->async_handshake(handshake_fn, on_error);
  123. }
  124. virtual void on_peer_handshake() noexcept { LOG_INFO(log, "peer handshake"); }
  125. void initiate_active() noexcept {
  126. auto ip = asio::ip::make_address(host);
  127. auto ep = tcp::endpoint(ip, listening_ep.port());
  128. auto addresses = std::vector<tcp::endpoint>{ep};
  129. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  130. peer_trans = transport::initiate_tls_active(*sup, peer_keys, my_device->device_id(), peer_uri);
  131. transport::error_fn_t on_error = [&](auto &ec) {
  132. LOG_WARN(log, "initiate_active/connect, err: {}", ec.message());
  133. };
  134. transport::connect_fn_t on_connect = [&](auto) {
  135. LOG_INFO(log, "initiate_active/peer connect");
  136. active_connect();
  137. };
  138. peer_trans->async_connect(addresses_ptr, on_connect, on_error);
  139. }
  140. virtual void active_connect() {
  141. LOG_TRACE(log, "active_connect");
  142. transport::handshake_fn_t handshake_fn = [this](bool, utils::x509_t &, const tcp::endpoint &,
  143. const model::device_id_t *) {
  144. valid_handshake = true;
  145. LOG_INFO(log, "test_passive_success/peer handshake");
  146. };
  147. transport::error_fn_t on_hs_error = [&](const auto &ec) {
  148. LOG_WARN(log, "test_passive_success/peer handshake, err: {}", ec.message());
  149. };
  150. peer_trans->async_handshake(handshake_fn, on_hs_error);
  151. }
  152. virtual void main() noexcept {}
  153. virtual actor_ptr_t create_actor() noexcept {
  154. return sup->create_actor<initiator_actor_t>()
  155. .timeout(timeout)
  156. .peer_device_id(peer_device->device_id())
  157. .relay_session(relay_session)
  158. .relay_enabled(true)
  159. .uris(utils::uri_container_t{peer_uri})
  160. .cluster(use_model ? cluster : nullptr)
  161. .sink(sup->get_address())
  162. .ssl_pair(&my_keys)
  163. .router(*sup)
  164. .escalate_failure()
  165. .finish();
  166. }
  167. virtual actor_ptr_t create_passive_actor() noexcept {
  168. return sup->create_actor<initiator_actor_t>()
  169. .timeout(timeout)
  170. .sock(std::move(peer_sock))
  171. .ssl_pair(&my_keys)
  172. .router(*sup)
  173. .cluster(cluster)
  174. .sink(sup->get_address())
  175. .escalate_failure()
  176. .finish();
  177. }
  178. cluster_ptr_t cluster;
  179. asio::io_context io_ctx{1};
  180. ra::system_context_asio_t ctx;
  181. acceptor_t acceptor;
  182. supervisor_ptr_t sup;
  183. asio::ip::tcp::endpoint listening_ep;
  184. utils::logger_t log;
  185. asio::ip::tcp::socket peer_sock;
  186. config::bep_config_t bep_config;
  187. utils::key_pair_t my_keys;
  188. utils::key_pair_t peer_keys;
  189. utils::uri_ptr_t peer_uri;
  190. model::device_ptr_t my_device;
  191. model::device_ptr_t peer_device;
  192. transport::stream_sp_t peer_trans;
  193. ready_ptr_t connected_message;
  194. diff_msgs_t diff_msgs;
  195. std::string relay_session;
  196. bool use_model = true;
  197. bool valid_handshake = false;
  198. };
  199. void test_connect_timeout() {
  200. struct F : fixture_t {
  201. void initiate_accept() noexcept override {}
  202. void main() noexcept override {
  203. auto act = create_actor();
  204. io_ctx.run();
  205. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  206. CHECK(!connected_message);
  207. }
  208. };
  209. F().run();
  210. }
  211. void test_connect_unsupported_proto() {
  212. struct F : fixture_t {
  213. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  214. return fmt::format("xxx://{}", listening_ep);
  215. }
  216. void main() noexcept override {
  217. create_actor();
  218. io_ctx.run();
  219. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  220. CHECK(!connected_message);
  221. }
  222. };
  223. F().run();
  224. }
  225. void test_handshake_timeout() {
  226. struct F : fixture_t {
  227. void accept(const sys::error_code &ec) noexcept override { LOG_INFO(log, "accept (ignoring)", ec.message()); }
  228. void main() noexcept override {
  229. auto act = create_actor();
  230. io_ctx.run();
  231. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  232. CHECK(!connected_message);
  233. REQUIRE(diff_msgs.size() == 2);
  234. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  235. CHECK(peer_device->get_state() == device_state_t::dialing);
  236. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  237. CHECK(peer_device->get_state() == device_state_t::offline);
  238. }
  239. };
  240. F().run();
  241. }
  242. void test_handshake_garbage() {
  243. struct F : fixture_t {
  244. void accept(const sys::error_code &) noexcept override {
  245. auto buff = asio::buffer("garbage-garbage-garbage");
  246. peer_sock.write_some(buff);
  247. }
  248. void main() noexcept override {
  249. auto act = create_actor();
  250. io_ctx.run();
  251. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  252. CHECK(!connected_message);
  253. REQUIRE(diff_msgs.size() == 2);
  254. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  255. CHECK(peer_device->get_state() == device_state_t::dialing);
  256. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  257. CHECK(peer_device->get_state() == device_state_t::offline);
  258. }
  259. };
  260. F().run();
  261. }
  262. void test_connection_refused() {
  263. struct F : fixture_t {
  264. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  265. return fmt::format("tcp://{}:0", host);
  266. }
  267. void main() noexcept override {
  268. auto act = create_actor();
  269. io_ctx.run();
  270. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  271. CHECK(!connected_message);
  272. }
  273. };
  274. F().run();
  275. }
  276. void test_connection_refused_no_model() {
  277. struct F : fixture_t {
  278. F() { use_model = false; }
  279. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  280. return fmt::format("tcp://{}:0", host);
  281. }
  282. void main() noexcept override {
  283. auto act = create_actor();
  284. io_ctx.run();
  285. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  286. CHECK(!connected_message);
  287. }
  288. };
  289. F().run();
  290. }
  291. void test_resolve_failure() {
  292. struct F : fixture_t {
  293. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override { return "tcp://x.example.com"; }
  294. void main() noexcept override {
  295. auto act = create_actor();
  296. io_ctx.run();
  297. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  298. CHECK(!connected_message);
  299. }
  300. };
  301. F().run();
  302. }
  303. void test_success() {
  304. struct F : fixture_t {
  305. void main() noexcept override {
  306. auto act = create_actor();
  307. io_ctx.run();
  308. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  309. REQUIRE(connected_message);
  310. CHECK(connected_message->payload.proto == "tcp");
  311. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  312. CHECK(valid_handshake);
  313. sup->do_shutdown();
  314. sup->do_process();
  315. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  316. REQUIRE(diff_msgs.size() == 1);
  317. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  318. CHECK(peer_device->get_state() == device_state_t::dialing);
  319. }
  320. };
  321. F().run();
  322. }
  323. void test_success_no_model() {
  324. struct F : fixture_t {
  325. F() { use_model = false; }
  326. void main() noexcept override {
  327. auto act = create_actor();
  328. io_ctx.run();
  329. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  330. CHECK(connected_message);
  331. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  332. CHECK(valid_handshake);
  333. sup->do_shutdown();
  334. sup->do_process();
  335. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  336. REQUIRE(diff_msgs.size() == 0);
  337. }
  338. };
  339. F().run();
  340. }
  341. struct passive_fixture_t : fixture_t {
  342. actor_ptr_t act;
  343. bool active_connect_invoked = false;
  344. void active_connect() override {
  345. LOG_TRACE(log, "active_connect");
  346. if (!act || active_connect_invoked) {
  347. return;
  348. }
  349. active_connect_invoked = true;
  350. active_connect_impl();
  351. }
  352. virtual void active_connect_impl() { fixture_t::active_connect(); }
  353. void accept(const sys::error_code &ec) noexcept override {
  354. LOG_INFO(log, "test_passive_success/accept, ec: {}", ec.message());
  355. act = create_passive_actor();
  356. sup->do_process();
  357. active_connect();
  358. }
  359. };
  360. void test_passive_success() {
  361. struct F : passive_fixture_t {
  362. void main() noexcept override {
  363. initiate_active();
  364. io_ctx.run();
  365. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  366. REQUIRE(connected_message);
  367. CHECK(connected_message->payload.proto == "tcp");
  368. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  369. CHECK(valid_handshake);
  370. sup->do_shutdown();
  371. sup->do_process();
  372. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  373. }
  374. };
  375. F().run();
  376. }
  377. void test_passive_garbage() {
  378. struct F : passive_fixture_t {
  379. tcp::socket client_sock;
  380. tcp::resolver::results_type addresses;
  381. F() : client_sock{io_ctx} {}
  382. void active_connect_impl() noexcept override {
  383. tcp::resolver resolver(io_ctx);
  384. addresses = resolver.resolve(host, std::to_string(listening_ep.port()));
  385. asio::async_connect(client_sock, addresses.begin(), addresses.end(), [&](auto ec, auto) {
  386. LOG_INFO(log, "test_passive_garbage/peer connect, ec: {}", ec.message());
  387. auto buff = asio::buffer("garbage-garbage-garbage");
  388. client_sock.write_some(buff);
  389. sup->do_process();
  390. });
  391. }
  392. void main() noexcept override {
  393. initiate_active();
  394. io_ctx.run();
  395. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  396. CHECK(!connected_message);
  397. }
  398. };
  399. F().run();
  400. }
  401. void test_passive_timeout() {
  402. struct F : passive_fixture_t {
  403. void active_connect() noexcept override { LOG_INFO(log, "test_passive_timeout/active_connect NOOP"); }
  404. void main() noexcept override {
  405. initiate_active();
  406. io_ctx.run();
  407. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  408. CHECK(!connected_message);
  409. }
  410. };
  411. F().run();
  412. }
  413. struct passive_relay_fixture_t : fixture_t {
  414. std::string rx_buff;
  415. bool initiate_handshake = true;
  416. passive_relay_fixture_t() {
  417. relay_session = "relay-session-key";
  418. rx_buff.resize(128);
  419. }
  420. void on_read(size_t bytes) noexcept {
  421. LOG_TRACE(log, "read (relay/passive), {} bytes", bytes);
  422. auto r = proto::relay::parse({rx_buff.data(), bytes});
  423. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  424. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  425. CHECK(msg.key == relay_session);
  426. relay_reply();
  427. }
  428. virtual void on_write(size_t bytes) noexcept {
  429. LOG_TRACE(log, "write (relay/passive), {} bytes", bytes);
  430. if (initiate_handshake) {
  431. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  432. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  433. peer_trans = upgradeable->upgrade(ssl, true);
  434. initiate_peer_handshake();
  435. }
  436. }
  437. virtual void relay_reply() noexcept { write(proto::relay::response_t{0, "success"}); }
  438. virtual void write(const proto::relay::message_t &msg) noexcept {
  439. proto::relay::serialize(msg, rx_buff);
  440. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  441. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  442. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  443. }
  444. void accept(const sys::error_code &ec) noexcept override {
  445. LOG_INFO(log, "accept (relay/passive), ec: {}", ec.message());
  446. auto uri = utils::parse("tcp://127.0.0.1:0/");
  447. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  448. peer_trans = transport::initiate_stream(cfg);
  449. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  450. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  451. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  452. }
  453. };
  454. void test_relay_passive_success() {
  455. struct F : passive_relay_fixture_t {
  456. void main() noexcept override {
  457. auto act = create_actor();
  458. io_ctx.run();
  459. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  460. REQUIRE(connected_message);
  461. CHECK(connected_message->payload.proto == "relay");
  462. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  463. CHECK(valid_handshake);
  464. sup->do_shutdown();
  465. sup->do_process();
  466. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  467. CHECK(diff_msgs.size() == 0);
  468. }
  469. };
  470. F().run();
  471. }
  472. void test_relay_passive_garbage() {
  473. struct F : passive_relay_fixture_t {
  474. void write(const proto::relay::message_t &) noexcept override {
  475. rx_buff = "garbage-garbage-garbage";
  476. initiate_handshake = false;
  477. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  478. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  479. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  480. }
  481. void main() noexcept override {
  482. create_actor();
  483. io_ctx.run();
  484. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  485. CHECK(!connected_message);
  486. CHECK(!valid_handshake);
  487. sup->do_shutdown();
  488. sup->do_process();
  489. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  490. CHECK(diff_msgs.size() == 0);
  491. }
  492. };
  493. F().run();
  494. }
  495. void test_relay_passive_wrong_message() {
  496. struct F : passive_relay_fixture_t {
  497. void relay_reply() noexcept override { write(proto::relay::pong_t{}); }
  498. void main() noexcept override {
  499. initiate_handshake = false;
  500. auto act = create_actor();
  501. io_ctx.run();
  502. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  503. CHECK(!connected_message);
  504. CHECK(!valid_handshake);
  505. sup->do_shutdown();
  506. sup->do_process();
  507. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  508. CHECK(diff_msgs.size() == 0);
  509. }
  510. };
  511. F().run();
  512. }
  513. void test_relay_passive_unsuccessful_join() {
  514. struct F : passive_relay_fixture_t {
  515. void relay_reply() noexcept override { write(proto::relay::response_t{5, "some-fail-reason"}); }
  516. void main() noexcept override {
  517. initiate_handshake = false;
  518. auto act = create_actor();
  519. io_ctx.run();
  520. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  521. CHECK(!connected_message);
  522. CHECK(!valid_handshake);
  523. sup->do_shutdown();
  524. sup->do_process();
  525. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  526. CHECK(diff_msgs.size() == 0);
  527. }
  528. };
  529. F().run();
  530. }
  531. void test_relay_malformed_uri() {
  532. struct F : fixture_t {
  533. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  534. return fmt::format("relay://{}", listening_ep);
  535. }
  536. void main() noexcept override {
  537. auto act = create_actor();
  538. io_ctx.run();
  539. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  540. CHECK(!connected_message);
  541. CHECK(!valid_handshake);
  542. sup->do_shutdown();
  543. sup->do_process();
  544. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  545. CHECK(diff_msgs.size() == 2);
  546. }
  547. };
  548. F().run();
  549. }
  550. void test_relay_active_wrong_relay_device_id() {
  551. struct F : fixture_t {
  552. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  553. return fmt::format("relay://{}?id={}", listening_ep, my_device->device_id().get_value());
  554. }
  555. void main() noexcept override {
  556. auto act = create_actor();
  557. io_ctx.run();
  558. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  559. CHECK(!connected_message);
  560. CHECK(!valid_handshake);
  561. sup->do_shutdown();
  562. sup->do_process();
  563. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  564. CHECK(diff_msgs.size() == 2);
  565. }
  566. };
  567. F().run();
  568. }
  569. struct active_relay_fixture_t : fixture_t {
  570. utils::key_pair_t relay_keys;
  571. model::device_id_t relay_device;
  572. std::string rx_buff;
  573. std::string session_key = "lorem-session-dolor";
  574. transport::stream_sp_t relay_trans;
  575. bool session_mode = false;
  576. active_relay_fixture_t() {
  577. relay_keys = utils::generate_pair("relay").value();
  578. relay_device = model::device_id_t::from_cert(relay_keys.cert_data).value();
  579. rx_buff.resize(128);
  580. }
  581. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  582. return fmt::format("relay://{}?id={}", listening_ep, relay_device.get_value());
  583. }
  584. void accept(const sys::error_code &ec) noexcept override {
  585. LOG_INFO(log, "relay/accept, ec: {}", ec.message());
  586. if (!session_mode) {
  587. relay_trans = transport::initiate_tls_passive(*sup, relay_keys, std::move(peer_sock));
  588. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  589. const model::device_id_t *) {
  590. valid_handshake = valid_peer;
  591. on_relay_handshake();
  592. };
  593. transport::error_fn_t on_error = [](const auto &) {};
  594. relay_trans->async_handshake(handshake_fn, on_error);
  595. return;
  596. }
  597. auto uri = utils::parse("tcp://127.0.0.1:0/");
  598. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  599. peer_trans = transport::initiate_stream(cfg);
  600. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  601. transport::io_fn_t read_fn = [this](size_t bytes) { on_read_peer(bytes); };
  602. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  603. }
  604. virtual void on_relay_handshake() noexcept {
  605. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  606. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  607. relay_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  608. }
  609. virtual void relay_reply() noexcept {
  610. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  611. session_key, "", listening_ep.port(), false});
  612. }
  613. virtual void session_reply() noexcept { write(peer_trans, proto::relay::response_t{0, "ok"}); }
  614. virtual void write(transport::stream_sp_t &stream, const proto::relay::message_t &msg) noexcept {
  615. proto::relay::serialize(msg, rx_buff);
  616. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  617. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  618. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  619. }
  620. virtual void on_read_peer(size_t bytes) {
  621. log->debug("(relay/active) read peer {} bytes", bytes);
  622. auto r = proto::relay::parse({rx_buff.data(), bytes});
  623. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  624. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  625. CHECK(msg.key == session_key);
  626. session_reply();
  627. }
  628. virtual void on_read(size_t bytes) {
  629. log->debug("(relay/active) read {} bytes", bytes);
  630. auto r = proto::relay::parse({rx_buff.data(), bytes});
  631. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  632. auto &msg = std::get<proto::relay::connect_request_t>(wrapped.message);
  633. CHECK(msg.device_id == peer_device->device_id().get_sha256());
  634. relay_reply();
  635. }
  636. virtual void on_write(size_t bytes) {
  637. log->debug("(relay/active) write {} bytes", bytes);
  638. if (!session_mode) {
  639. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  640. session_mode = true;
  641. } else {
  642. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  643. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  644. peer_trans = upgradeable->upgrade(ssl, false);
  645. initiate_peer_handshake();
  646. }
  647. }
  648. };
  649. void test_relay_active_success() {
  650. struct F : active_relay_fixture_t {
  651. void main() noexcept override {
  652. auto act = create_actor();
  653. io_ctx.run();
  654. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  655. REQUIRE(connected_message);
  656. CHECK(connected_message->payload.proto == "relay");
  657. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  658. CHECK(valid_handshake);
  659. sup->do_shutdown();
  660. sup->do_process();
  661. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  662. REQUIRE(diff_msgs.size() == 1);
  663. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  664. CHECK(peer_device->get_state() == device_state_t::dialing);
  665. }
  666. };
  667. F().run();
  668. }
  669. void test_relay_active_not_enabled() {
  670. struct F : active_relay_fixture_t {
  671. actor_ptr_t create_actor() noexcept override {
  672. return sup->create_actor<initiator_actor_t>()
  673. .timeout(timeout)
  674. .peer_device_id(peer_device->device_id())
  675. .relay_session(relay_session)
  676. .uris({peer_uri})
  677. .cluster(use_model ? cluster : nullptr)
  678. .sink(sup->get_address())
  679. .ssl_pair(&my_keys)
  680. .router(*sup)
  681. .escalate_failure()
  682. .finish();
  683. }
  684. void main() noexcept override {
  685. auto act = create_actor();
  686. io_ctx.run();
  687. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  688. sup->do_shutdown();
  689. sup->do_process();
  690. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  691. CHECK(peer_device->get_state() == device_state_t::offline);
  692. }
  693. };
  694. F().run();
  695. }
  696. void test_relay_wrong_device() {
  697. struct F : active_relay_fixture_t {
  698. void relay_reply() noexcept override {
  699. write(relay_trans, proto::relay::session_invitation_t{std::string(relay_device.get_sha256()), session_key,
  700. "", listening_ep.port(), false});
  701. }
  702. void on_write(size_t) override {}
  703. void main() noexcept override {
  704. auto act = create_actor();
  705. io_ctx.run();
  706. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  707. CHECK(!connected_message);
  708. CHECK(valid_handshake);
  709. sup->do_shutdown();
  710. sup->do_process();
  711. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  712. CHECK(diff_msgs.size() == 2);
  713. }
  714. };
  715. F().run();
  716. }
  717. void test_relay_non_connectable() {
  718. struct F : active_relay_fixture_t {
  719. void relay_reply() noexcept override {
  720. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  721. session_key, "", 0, false});
  722. }
  723. void main() noexcept override {
  724. auto act = create_actor();
  725. io_ctx.run();
  726. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  727. CHECK(!connected_message);
  728. sup->do_shutdown();
  729. sup->do_process();
  730. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  731. CHECK(diff_msgs.size() == 2);
  732. }
  733. };
  734. F().run();
  735. }
  736. void test_relay_malformed_address() {
  737. struct F : active_relay_fixture_t {
  738. void relay_reply() noexcept override {
  739. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  740. session_key, "8.8.8.8z", listening_ep.port(), false});
  741. }
  742. void main() noexcept override {
  743. auto act = create_actor();
  744. io_ctx.run();
  745. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  746. CHECK(!connected_message);
  747. sup->do_shutdown();
  748. sup->do_process();
  749. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  750. CHECK(diff_msgs.size() == 2);
  751. }
  752. };
  753. F().run();
  754. }
  755. void test_relay_garbage_reply() {
  756. struct F : active_relay_fixture_t {
  757. void write(transport::stream_sp_t &stream, const proto::relay::message_t &) noexcept override {
  758. rx_buff = "garbage-garbage-garbage";
  759. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  760. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  761. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  762. }
  763. void on_write(size_t) override {}
  764. void main() noexcept override {
  765. auto act = create_actor();
  766. io_ctx.run();
  767. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  768. CHECK(!connected_message);
  769. sup->do_shutdown();
  770. sup->do_process();
  771. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  772. CHECK(diff_msgs.size() == 2);
  773. }
  774. };
  775. F().run();
  776. }
  777. void test_relay_non_invitation_reply() {
  778. struct F : active_relay_fixture_t {
  779. void relay_reply() noexcept override { write(relay_trans, proto::relay::pong_t{}); }
  780. void on_write(size_t) override {}
  781. void main() noexcept override {
  782. auto act = create_actor();
  783. io_ctx.run();
  784. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  785. CHECK(!connected_message);
  786. sup->do_shutdown();
  787. sup->do_process();
  788. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  789. CHECK(diff_msgs.size() == 2);
  790. }
  791. };
  792. F().run();
  793. }
  794. int _init() {
  795. REGISTER_TEST_CASE(test_connect_unsupported_proto, "test_connect_unsupported_proto", "[initiator]");
  796. REGISTER_TEST_CASE(test_connect_timeout, "test_connect_timeout", "[initiator]");
  797. REGISTER_TEST_CASE(test_handshake_timeout, "test_handshake_timeout", "[initiator]");
  798. REGISTER_TEST_CASE(test_handshake_garbage, "test_handshake_garbage", "[initiator]");
  799. REGISTER_TEST_CASE(test_connection_refused, "test_connection_refused", "[initiator]");
  800. REGISTER_TEST_CASE(test_connection_refused_no_model, "test_connection_refused_no_model", "[initiator]");
  801. REGISTER_TEST_CASE(test_resolve_failure, "test_resolve_failure", "[initiator]");
  802. REGISTER_TEST_CASE(test_success, "test_success", "[initiator]");
  803. REGISTER_TEST_CASE(test_success_no_model, "test_success_no_model", "[initiator]");
  804. REGISTER_TEST_CASE(test_passive_success, "test_passive_success", "[initiator]");
  805. REGISTER_TEST_CASE(test_passive_garbage, "test_passive_garbage", "[initiator]");
  806. REGISTER_TEST_CASE(test_passive_timeout, "test_passive_timeout", "[initiator]");
  807. REGISTER_TEST_CASE(test_relay_passive_success, "test_relay_passive_success", "[initiator]");
  808. REGISTER_TEST_CASE(test_relay_passive_garbage, "test_relay_passive_garbage", "[initiator]");
  809. REGISTER_TEST_CASE(test_relay_passive_wrong_message, "test_relay_passive_wrong_message", "[initiator]");
  810. REGISTER_TEST_CASE(test_relay_passive_unsuccessful_join, "test_relay_passive_unsuccessful_join", "[initiator]");
  811. REGISTER_TEST_CASE(test_relay_malformed_uri, "test_relay_malformed_uri", "[initiator]");
  812. REGISTER_TEST_CASE(test_relay_active_wrong_relay_device_id, "test_relay_active_wrong_relay_device_id",
  813. "[initiator]");
  814. REGISTER_TEST_CASE(test_relay_active_success, "test_relay_active_success", "[initiator]");
  815. REGISTER_TEST_CASE(test_relay_active_not_enabled, "test_relay_active_not_enabled", "[initiator]");
  816. REGISTER_TEST_CASE(test_relay_wrong_device, "test_relay_wrong_device", "[initiator]");
  817. REGISTER_TEST_CASE(test_relay_non_connectable, "test_relay_non_connectable", "[initiator]");
  818. REGISTER_TEST_CASE(test_relay_malformed_address, "test_relay_malformed_address", "[initiator]");
  819. REGISTER_TEST_CASE(test_relay_garbage_reply, "test_relay_garbage_reply", "[initiator]");
  820. REGISTER_TEST_CASE(test_relay_non_invitation_reply, "test_relay_non_invitation_reply", "[initiator]");
  821. return 1;
  822. }
  823. static int v = _init();