077-initiator.cpp 34 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2023 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "net/names.h"
  10. #include "net/initiator_actor.h"
  11. #include "net/resolver_actor.h"
  12. #include "proto/relay_support.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. using namespace syncspirit;
  16. using namespace syncspirit::test;
  17. using namespace syncspirit::model;
  18. using namespace syncspirit::net;
  19. namespace asio = boost::asio;
  20. namespace sys = boost::system;
  21. namespace r = rotor;
  22. namespace ra = r::asio;
  23. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  24. using finish_callback_t = std::function<void()>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{2000}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using ra::supervisor_asio_t::supervisor_asio_t;
  29. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  30. ra::supervisor_asio_t::configure(plugin);
  31. plugin.with_casted<r::plugin::registry_plugin_t>(
  32. [&](auto &p) { p.register_name(names::coordinator, get_address()); });
  33. if (configure_callback) {
  34. configure_callback(plugin);
  35. }
  36. }
  37. void shutdown_finish() noexcept override {
  38. ra::supervisor_asio_t::shutdown_finish();
  39. if (finish_callback) {
  40. finish_callback();
  41. }
  42. }
  43. auto get_state() noexcept { return state; }
  44. finish_callback_t finish_callback;
  45. configure_callback_t configure_callback;
  46. };
  47. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  48. using actor_ptr_t = r::intrusive_ptr_t<initiator_actor_t>;
  49. struct fixture_t {
  50. using acceptor_t = asio::ip::tcp::acceptor;
  51. using ready_ptr_t = r::intrusive_ptr_t<net::message::peer_connected_t>;
  52. using diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  53. using diff_msgs_t = std::vector<diff_ptr_t>;
  54. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  55. utils::set_default("trace");
  56. log = utils::get_logger("fixture");
  57. }
  58. virtual void finish() {
  59. acceptor.cancel();
  60. if (peer_trans) {
  61. peer_trans->cancel();
  62. }
  63. }
  64. void run() noexcept {
  65. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  66. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  67. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  68. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  69. using connected_t = typename ready_ptr_t::element_type;
  70. using diff_t = typename diff_ptr_t::element_type;
  71. p.subscribe_actor(r::lambda<connected_t>([&](connected_t &msg) {
  72. connected_message = &msg;
  73. LOG_INFO(log, "received message::peer_connected_t");
  74. }));
  75. p.subscribe_actor(r::lambda<diff_t>([&](diff_t &msg) {
  76. diff_msgs.emplace_back(&msg);
  77. LOG_INFO(log, "received diff message");
  78. }));
  79. });
  80. };
  81. sup->finish_callback = [&]() { finish(); };
  82. sup->start();
  83. sup->create_actor<resolver_actor_t>().resolve_timeout(timeout / 2).timeout(timeout).finish();
  84. sup->do_process();
  85. my_keys = utils::generate_pair("me").value();
  86. peer_keys = utils::generate_pair("peer").value();
  87. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  88. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  89. my_device = device_t::create(md, "my-device").value();
  90. peer_device = device_t::create(pd, "peer-device").value();
  91. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  92. acceptor.open(ep.protocol());
  93. acceptor.bind(ep);
  94. acceptor.listen();
  95. listening_ep = acceptor.local_endpoint();
  96. peer_uri = utils::parse(get_uri(listening_ep)).value();
  97. log->debug("listening on {}", peer_uri.full);
  98. initiate_accept();
  99. cluster = new cluster_t(my_device, 1, 1);
  100. cluster->get_devices().put(my_device);
  101. cluster->get_devices().put(peer_device);
  102. main();
  103. }
  104. virtual void initiate_accept() noexcept {
  105. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  106. }
  107. virtual std::string get_uri(const asio::ip::tcp::endpoint &) noexcept {
  108. return fmt::format("tcp://{}", listening_ep);
  109. }
  110. virtual void accept(const sys::error_code &ec) noexcept {
  111. LOG_INFO(log, "accept, ec: {}", ec.message());
  112. peer_trans = transport::initiate_tls_passive(*sup, peer_keys, std::move(peer_sock));
  113. initiate_peer_handshake();
  114. }
  115. virtual void initiate_peer_handshake() noexcept {
  116. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  117. const model::device_id_t *) {
  118. valid_handshake = valid_peer;
  119. on_peer_handshake();
  120. };
  121. transport::error_fn_t on_error = [](const auto &) {};
  122. peer_trans->async_handshake(handshake_fn, on_error);
  123. }
  124. virtual void on_peer_handshake() noexcept { LOG_INFO(log, "peer handshake"); }
  125. void initiate_active() noexcept {
  126. tcp::resolver resolver(io_ctx);
  127. auto addresses = resolver.resolve(host, std::to_string(listening_ep.port()));
  128. peer_trans = transport::initiate_tls_active(*sup, peer_keys, my_device->device_id(), peer_uri);
  129. transport::error_fn_t on_error = [&](auto &ec) {
  130. LOG_WARN(log, "initiate_active/connect, err: {}", ec.message());
  131. };
  132. transport::connect_fn_t on_connect = [&](auto) {
  133. LOG_INFO(log, "initiate_active/peer connect");
  134. active_connect();
  135. };
  136. peer_trans->async_connect(addresses, on_connect, on_error);
  137. }
  138. virtual void active_connect() {
  139. LOG_TRACE(log, "active_connect");
  140. transport::handshake_fn_t handshake_fn = [this](bool, utils::x509_t &, const tcp::endpoint &,
  141. const model::device_id_t *) {
  142. valid_handshake = true;
  143. LOG_INFO(log, "test_passive_success/peer handshake");
  144. };
  145. transport::error_fn_t on_hs_error = [&](const auto &ec) {
  146. LOG_WARN(log, "test_passive_success/peer handshake, err: {}", ec.message());
  147. };
  148. peer_trans->async_handshake(handshake_fn, on_hs_error);
  149. }
  150. virtual void main() noexcept {}
  151. virtual actor_ptr_t create_actor() noexcept {
  152. return sup->create_actor<initiator_actor_t>()
  153. .timeout(timeout)
  154. .peer_device_id(peer_device->device_id())
  155. .relay_session(relay_session)
  156. .relay_enabled(true)
  157. .uris({peer_uri})
  158. .cluster(use_model ? cluster : nullptr)
  159. .sink(sup->get_address())
  160. .ssl_pair(&my_keys)
  161. .router(*sup)
  162. .escalate_failure()
  163. .finish();
  164. }
  165. virtual actor_ptr_t create_passive_actor() noexcept {
  166. return sup->create_actor<initiator_actor_t>()
  167. .timeout(timeout)
  168. .sock(std::move(peer_sock))
  169. .ssl_pair(&my_keys)
  170. .router(*sup)
  171. .cluster(cluster)
  172. .sink(sup->get_address())
  173. .escalate_failure()
  174. .finish();
  175. }
  176. cluster_ptr_t cluster;
  177. asio::io_context io_ctx{1};
  178. ra::system_context_asio_t ctx;
  179. acceptor_t acceptor;
  180. supervisor_ptr_t sup;
  181. asio::ip::tcp::endpoint listening_ep;
  182. utils::logger_t log;
  183. asio::ip::tcp::socket peer_sock;
  184. config::bep_config_t bep_config;
  185. utils::key_pair_t my_keys;
  186. utils::key_pair_t peer_keys;
  187. utils::URI peer_uri;
  188. model::device_ptr_t my_device;
  189. model::device_ptr_t peer_device;
  190. transport::stream_sp_t peer_trans;
  191. ready_ptr_t connected_message;
  192. diff_msgs_t diff_msgs;
  193. std::string relay_session;
  194. bool use_model = true;
  195. bool valid_handshake = false;
  196. };
  197. void test_connect_timeout() {
  198. struct F : fixture_t {
  199. void initiate_accept() noexcept override {}
  200. void main() noexcept override {
  201. auto act = create_actor();
  202. io_ctx.run();
  203. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  204. CHECK(!connected_message);
  205. }
  206. };
  207. F().run();
  208. }
  209. void test_connect_unsupported_proto() {
  210. struct F : fixture_t {
  211. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  212. return fmt::format("xxx://{}", listening_ep);
  213. }
  214. void main() noexcept override {
  215. create_actor();
  216. io_ctx.run();
  217. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  218. CHECK(!connected_message);
  219. }
  220. };
  221. F().run();
  222. }
  223. void test_handshake_timeout() {
  224. struct F : fixture_t {
  225. void accept(const sys::error_code &ec) noexcept override { LOG_INFO(log, "accept (ignoring)", ec.message()); }
  226. void main() noexcept override {
  227. auto act = create_actor();
  228. io_ctx.run();
  229. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  230. CHECK(!connected_message);
  231. REQUIRE(diff_msgs.size() == 2);
  232. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  233. CHECK(peer_device->get_state() == device_state_t::dialing);
  234. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  235. CHECK(peer_device->get_state() == device_state_t::offline);
  236. }
  237. };
  238. F().run();
  239. }
  240. void test_handshake_garbage() {
  241. struct F : fixture_t {
  242. void accept(const sys::error_code &) noexcept override {
  243. auto buff = asio::buffer("garbage-garbage-garbage");
  244. peer_sock.write_some(buff);
  245. }
  246. void main() noexcept override {
  247. auto act = create_actor();
  248. io_ctx.run();
  249. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  250. CHECK(!connected_message);
  251. REQUIRE(diff_msgs.size() == 2);
  252. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  253. CHECK(peer_device->get_state() == device_state_t::dialing);
  254. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  255. CHECK(peer_device->get_state() == device_state_t::offline);
  256. }
  257. };
  258. F().run();
  259. }
  260. void test_connection_refused() {
  261. struct F : fixture_t {
  262. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  263. return fmt::format("tcp://{}:0", host);
  264. }
  265. void main() noexcept override {
  266. auto act = create_actor();
  267. io_ctx.run();
  268. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  269. CHECK(!connected_message);
  270. }
  271. };
  272. F().run();
  273. }
  274. void test_connection_refused_no_model() {
  275. struct F : fixture_t {
  276. F() { use_model = false; }
  277. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  278. return fmt::format("tcp://{}:0", host);
  279. }
  280. void main() noexcept override {
  281. auto act = create_actor();
  282. io_ctx.run();
  283. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  284. CHECK(!connected_message);
  285. }
  286. };
  287. F().run();
  288. }
  289. void test_resolve_failure() {
  290. struct F : fixture_t {
  291. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override { return "tcp://x.example.com"; }
  292. void main() noexcept override {
  293. auto act = create_actor();
  294. io_ctx.run();
  295. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  296. CHECK(!connected_message);
  297. }
  298. };
  299. F().run();
  300. }
  301. void test_success() {
  302. struct F : fixture_t {
  303. void main() noexcept override {
  304. auto act = create_actor();
  305. io_ctx.run();
  306. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  307. REQUIRE(connected_message);
  308. CHECK(connected_message->payload.proto == "tcp");
  309. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  310. CHECK(valid_handshake);
  311. sup->do_shutdown();
  312. sup->do_process();
  313. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  314. REQUIRE(diff_msgs.size() == 1);
  315. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  316. CHECK(peer_device->get_state() == device_state_t::dialing);
  317. }
  318. };
  319. F().run();
  320. }
  321. void test_success_no_model() {
  322. struct F : fixture_t {
  323. F() { use_model = false; }
  324. void main() noexcept override {
  325. auto act = create_actor();
  326. io_ctx.run();
  327. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  328. CHECK(connected_message);
  329. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  330. CHECK(valid_handshake);
  331. sup->do_shutdown();
  332. sup->do_process();
  333. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  334. REQUIRE(diff_msgs.size() == 0);
  335. }
  336. };
  337. F().run();
  338. }
  339. struct passive_fixture_t : fixture_t {
  340. actor_ptr_t act;
  341. bool active_connect_invoked = false;
  342. void active_connect() override {
  343. LOG_TRACE(log, "active_connect");
  344. if (!act || active_connect_invoked) {
  345. return;
  346. }
  347. active_connect_invoked = true;
  348. active_connect_impl();
  349. }
  350. virtual void active_connect_impl() { fixture_t::active_connect(); }
  351. void accept(const sys::error_code &ec) noexcept override {
  352. LOG_INFO(log, "test_passive_success/accept, ec: {}", ec.message());
  353. act = create_passive_actor();
  354. sup->do_process();
  355. active_connect();
  356. }
  357. };
  358. void test_passive_success() {
  359. struct F : passive_fixture_t {
  360. void main() noexcept override {
  361. initiate_active();
  362. io_ctx.run();
  363. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  364. REQUIRE(connected_message);
  365. CHECK(connected_message->payload.proto == "tcp");
  366. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  367. CHECK(valid_handshake);
  368. sup->do_shutdown();
  369. sup->do_process();
  370. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  371. }
  372. };
  373. F().run();
  374. }
  375. void test_passive_garbage() {
  376. struct F : passive_fixture_t {
  377. tcp::socket client_sock;
  378. tcp::resolver::results_type addresses;
  379. F() : client_sock{io_ctx} {}
  380. void active_connect_impl() noexcept override {
  381. tcp::resolver resolver(io_ctx);
  382. addresses = resolver.resolve(host, std::to_string(listening_ep.port()));
  383. asio::async_connect(client_sock, addresses.begin(), addresses.end(), [&](auto ec, auto) {
  384. LOG_INFO(log, "test_passive_garbage/peer connect, ec: {}", ec.message());
  385. auto buff = asio::buffer("garbage-garbage-garbage");
  386. client_sock.write_some(buff);
  387. sup->do_process();
  388. });
  389. }
  390. void main() noexcept override {
  391. initiate_active();
  392. io_ctx.run();
  393. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  394. CHECK(!connected_message);
  395. }
  396. };
  397. F().run();
  398. }
  399. void test_passive_timeout() {
  400. struct F : passive_fixture_t {
  401. void active_connect() noexcept override { LOG_INFO(log, "test_passive_timeout/active_connect NOOP"); }
  402. void main() noexcept override {
  403. initiate_active();
  404. io_ctx.run();
  405. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  406. CHECK(!connected_message);
  407. }
  408. };
  409. F().run();
  410. }
  411. struct passive_relay_fixture_t : fixture_t {
  412. std::string rx_buff;
  413. bool initiate_handshake = true;
  414. passive_relay_fixture_t() {
  415. relay_session = "relay-session-key";
  416. rx_buff.resize(128);
  417. }
  418. void on_read(size_t bytes) noexcept {
  419. LOG_TRACE(log, "read (relay/passive), {} bytes", bytes);
  420. auto r = proto::relay::parse({rx_buff.data(), bytes});
  421. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  422. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  423. CHECK(msg.key == relay_session);
  424. relay_reply();
  425. }
  426. virtual void on_write(size_t bytes) noexcept {
  427. LOG_TRACE(log, "write (relay/passive), {} bytes", bytes);
  428. if (initiate_handshake) {
  429. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  430. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  431. peer_trans = upgradeable->upgrade(ssl, true);
  432. initiate_peer_handshake();
  433. }
  434. }
  435. virtual void relay_reply() noexcept { write(proto::relay::response_t{0, "success"}); }
  436. virtual void write(const proto::relay::message_t &msg) noexcept {
  437. proto::relay::serialize(msg, rx_buff);
  438. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  439. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  440. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  441. }
  442. void accept(const sys::error_code &ec) noexcept override {
  443. LOG_INFO(log, "accept (relay/passive), ec: {}", ec.message());
  444. auto uri = utils::parse("tcp://127.0.0.1:0/").value();
  445. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  446. peer_trans = transport::initiate_stream(cfg);
  447. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  448. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  449. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  450. }
  451. };
  452. void test_relay_passive_success() {
  453. struct F : passive_relay_fixture_t {
  454. void main() noexcept override {
  455. auto act = create_actor();
  456. io_ctx.run();
  457. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  458. REQUIRE(connected_message);
  459. CHECK(connected_message->payload.proto == "relay");
  460. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  461. CHECK(valid_handshake);
  462. sup->do_shutdown();
  463. sup->do_process();
  464. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  465. CHECK(diff_msgs.size() == 0);
  466. }
  467. };
  468. F().run();
  469. }
  470. void test_relay_passive_garbage() {
  471. struct F : passive_relay_fixture_t {
  472. void write(const proto::relay::message_t &) noexcept override {
  473. rx_buff = "garbage-garbage-garbage";
  474. initiate_handshake = false;
  475. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  476. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  477. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  478. }
  479. void main() noexcept override {
  480. create_actor();
  481. io_ctx.run();
  482. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  483. CHECK(!connected_message);
  484. CHECK(!valid_handshake);
  485. sup->do_shutdown();
  486. sup->do_process();
  487. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  488. CHECK(diff_msgs.size() == 0);
  489. }
  490. };
  491. F().run();
  492. }
  493. void test_relay_passive_wrong_message() {
  494. struct F : passive_relay_fixture_t {
  495. void relay_reply() noexcept override { write(proto::relay::pong_t{}); }
  496. void main() noexcept override {
  497. initiate_handshake = false;
  498. auto act = create_actor();
  499. io_ctx.run();
  500. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  501. CHECK(!connected_message);
  502. CHECK(!valid_handshake);
  503. sup->do_shutdown();
  504. sup->do_process();
  505. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  506. CHECK(diff_msgs.size() == 0);
  507. }
  508. };
  509. F().run();
  510. }
  511. void test_relay_passive_unsuccessful_join() {
  512. struct F : passive_relay_fixture_t {
  513. void relay_reply() noexcept override { write(proto::relay::response_t{5, "some-fail-reason"}); }
  514. void main() noexcept override {
  515. initiate_handshake = false;
  516. auto act = create_actor();
  517. io_ctx.run();
  518. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  519. CHECK(!connected_message);
  520. CHECK(!valid_handshake);
  521. sup->do_shutdown();
  522. sup->do_process();
  523. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  524. CHECK(diff_msgs.size() == 0);
  525. }
  526. };
  527. F().run();
  528. }
  529. void test_relay_malformed_uri() {
  530. struct F : fixture_t {
  531. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  532. return fmt::format("relay://{}", listening_ep);
  533. }
  534. void main() noexcept override {
  535. auto act = create_actor();
  536. io_ctx.run();
  537. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  538. CHECK(!connected_message);
  539. CHECK(!valid_handshake);
  540. sup->do_shutdown();
  541. sup->do_process();
  542. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  543. CHECK(diff_msgs.size() == 2);
  544. }
  545. };
  546. F().run();
  547. }
  548. void test_relay_active_wrong_relay_device_id() {
  549. struct F : fixture_t {
  550. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  551. return fmt::format("relay://{}?id={}", listening_ep, my_device->device_id().get_value());
  552. }
  553. void main() noexcept override {
  554. auto act = create_actor();
  555. io_ctx.run();
  556. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  557. CHECK(!connected_message);
  558. CHECK(!valid_handshake);
  559. sup->do_shutdown();
  560. sup->do_process();
  561. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  562. CHECK(diff_msgs.size() == 2);
  563. }
  564. };
  565. F().run();
  566. }
  567. struct active_relay_fixture_t : fixture_t {
  568. utils::key_pair_t relay_keys;
  569. model::device_id_t relay_device;
  570. std::string rx_buff;
  571. std::string session_key = "lorem-session-dolor";
  572. transport::stream_sp_t relay_trans;
  573. bool session_mode = false;
  574. active_relay_fixture_t() {
  575. relay_keys = utils::generate_pair("relay").value();
  576. relay_device = model::device_id_t::from_cert(relay_keys.cert_data).value();
  577. rx_buff.resize(128);
  578. }
  579. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  580. return fmt::format("relay://{}?id={}", listening_ep, relay_device.get_value());
  581. }
  582. void accept(const sys::error_code &ec) noexcept override {
  583. LOG_INFO(log, "relay/accept, ec: {}", ec.message());
  584. if (!session_mode) {
  585. relay_trans = transport::initiate_tls_passive(*sup, relay_keys, std::move(peer_sock));
  586. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  587. const model::device_id_t *) {
  588. valid_handshake = valid_peer;
  589. on_relay_handshake();
  590. };
  591. transport::error_fn_t on_error = [](const auto &) {};
  592. relay_trans->async_handshake(handshake_fn, on_error);
  593. return;
  594. }
  595. auto uri = utils::parse("tcp://127.0.0.1:0/").value();
  596. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  597. peer_trans = transport::initiate_stream(cfg);
  598. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  599. transport::io_fn_t read_fn = [this](size_t bytes) { on_read_peer(bytes); };
  600. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  601. }
  602. virtual void on_relay_handshake() noexcept {
  603. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  604. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  605. relay_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  606. }
  607. virtual void relay_reply() noexcept {
  608. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  609. session_key, "", listening_ep.port(), false});
  610. }
  611. virtual void session_reply() noexcept { write(peer_trans, proto::relay::response_t{0, "ok"}); }
  612. virtual void write(transport::stream_sp_t &stream, const proto::relay::message_t &msg) noexcept {
  613. proto::relay::serialize(msg, rx_buff);
  614. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  615. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  616. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  617. }
  618. virtual void on_read_peer(size_t bytes) {
  619. log->debug("(relay/active) read peer {} bytes", bytes);
  620. auto r = proto::relay::parse({rx_buff.data(), bytes});
  621. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  622. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  623. CHECK(msg.key == session_key);
  624. session_reply();
  625. }
  626. virtual void on_read(size_t bytes) {
  627. log->debug("(relay/active) read {} bytes", bytes);
  628. auto r = proto::relay::parse({rx_buff.data(), bytes});
  629. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  630. auto &msg = std::get<proto::relay::connect_request_t>(wrapped.message);
  631. CHECK(msg.device_id == peer_device->device_id().get_sha256());
  632. relay_reply();
  633. }
  634. virtual void on_write(size_t bytes) {
  635. log->debug("(relay/active) write {} bytes", bytes);
  636. if (!session_mode) {
  637. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  638. session_mode = true;
  639. } else {
  640. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  641. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  642. peer_trans = upgradeable->upgrade(ssl, false);
  643. initiate_peer_handshake();
  644. }
  645. }
  646. };
  647. void test_relay_active_success() {
  648. struct F : active_relay_fixture_t {
  649. void main() noexcept override {
  650. auto act = create_actor();
  651. io_ctx.run();
  652. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  653. REQUIRE(connected_message);
  654. CHECK(connected_message->payload.proto == "relay");
  655. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  656. CHECK(valid_handshake);
  657. sup->do_shutdown();
  658. sup->do_process();
  659. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  660. REQUIRE(diff_msgs.size() == 1);
  661. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  662. CHECK(peer_device->get_state() == device_state_t::dialing);
  663. }
  664. };
  665. F().run();
  666. }
  667. void test_relay_active_not_enabled() {
  668. struct F : active_relay_fixture_t {
  669. actor_ptr_t create_actor() noexcept override {
  670. return sup->create_actor<initiator_actor_t>()
  671. .timeout(timeout)
  672. .peer_device_id(peer_device->device_id())
  673. .relay_session(relay_session)
  674. .uris({peer_uri})
  675. .cluster(use_model ? cluster : nullptr)
  676. .sink(sup->get_address())
  677. .ssl_pair(&my_keys)
  678. .router(*sup)
  679. .escalate_failure()
  680. .finish();
  681. }
  682. void main() noexcept override {
  683. auto act = create_actor();
  684. io_ctx.run();
  685. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  686. sup->do_shutdown();
  687. sup->do_process();
  688. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  689. CHECK(peer_device->get_state() == device_state_t::offline);
  690. }
  691. };
  692. F().run();
  693. }
  694. void test_relay_wrong_device() {
  695. struct F : active_relay_fixture_t {
  696. void relay_reply() noexcept override {
  697. write(relay_trans, proto::relay::session_invitation_t{std::string(relay_device.get_sha256()), session_key,
  698. "", listening_ep.port(), false});
  699. }
  700. void on_write(size_t) override {}
  701. void main() noexcept override {
  702. auto act = create_actor();
  703. io_ctx.run();
  704. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  705. CHECK(!connected_message);
  706. CHECK(valid_handshake);
  707. sup->do_shutdown();
  708. sup->do_process();
  709. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  710. CHECK(diff_msgs.size() == 2);
  711. }
  712. };
  713. F().run();
  714. }
  715. void test_relay_non_connectable() {
  716. struct F : active_relay_fixture_t {
  717. void relay_reply() noexcept override {
  718. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  719. session_key, "", 0, false});
  720. }
  721. void main() noexcept override {
  722. auto act = create_actor();
  723. io_ctx.run();
  724. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  725. CHECK(!connected_message);
  726. sup->do_shutdown();
  727. sup->do_process();
  728. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  729. CHECK(diff_msgs.size() == 2);
  730. }
  731. };
  732. F().run();
  733. }
  734. void test_relay_malformed_address() {
  735. struct F : active_relay_fixture_t {
  736. void relay_reply() noexcept override {
  737. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  738. session_key, "8.8.8.8z", listening_ep.port(), false});
  739. }
  740. void main() noexcept override {
  741. auto act = create_actor();
  742. io_ctx.run();
  743. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  744. CHECK(!connected_message);
  745. sup->do_shutdown();
  746. sup->do_process();
  747. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  748. CHECK(diff_msgs.size() == 2);
  749. }
  750. };
  751. F().run();
  752. }
  753. void test_relay_garbage_reply() {
  754. struct F : active_relay_fixture_t {
  755. void write(transport::stream_sp_t &stream, const proto::relay::message_t &) noexcept override {
  756. rx_buff = "garbage-garbage-garbage";
  757. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  758. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  759. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  760. }
  761. void on_write(size_t) override {}
  762. void main() noexcept override {
  763. auto act = create_actor();
  764. io_ctx.run();
  765. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  766. CHECK(!connected_message);
  767. sup->do_shutdown();
  768. sup->do_process();
  769. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  770. CHECK(diff_msgs.size() == 2);
  771. }
  772. };
  773. F().run();
  774. }
  775. void test_relay_non_invitation_reply() {
  776. struct F : active_relay_fixture_t {
  777. void relay_reply() noexcept override { write(relay_trans, proto::relay::pong_t{}); }
  778. void on_write(size_t) override {}
  779. void main() noexcept override {
  780. auto act = create_actor();
  781. io_ctx.run();
  782. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  783. CHECK(!connected_message);
  784. sup->do_shutdown();
  785. sup->do_process();
  786. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  787. CHECK(diff_msgs.size() == 2);
  788. }
  789. };
  790. F().run();
  791. }
  792. int _init() {
  793. REGISTER_TEST_CASE(test_connect_unsupported_proto, "test_connect_unsupported_proto", "[initiator]");
  794. REGISTER_TEST_CASE(test_connect_timeout, "test_connect_timeout", "[initiator]");
  795. REGISTER_TEST_CASE(test_handshake_timeout, "test_handshake_timeout", "[initiator]");
  796. REGISTER_TEST_CASE(test_handshake_garbage, "test_handshake_garbage", "[initiator]");
  797. REGISTER_TEST_CASE(test_connection_refused, "test_connection_refused", "[initiator]");
  798. REGISTER_TEST_CASE(test_connection_refused_no_model, "test_connection_refused_no_model", "[initiator]");
  799. REGISTER_TEST_CASE(test_resolve_failure, "test_resolve_failure", "[initiator]");
  800. REGISTER_TEST_CASE(test_success, "test_success", "[initiator]");
  801. REGISTER_TEST_CASE(test_success_no_model, "test_success_no_model", "[initiator]");
  802. REGISTER_TEST_CASE(test_passive_success, "test_passive_success", "[initiator]");
  803. REGISTER_TEST_CASE(test_passive_garbage, "test_passive_garbage", "[initiator]");
  804. REGISTER_TEST_CASE(test_passive_timeout, "test_passive_timeout", "[initiator]");
  805. REGISTER_TEST_CASE(test_relay_passive_success, "test_relay_passive_success", "[initiator]");
  806. REGISTER_TEST_CASE(test_relay_passive_garbage, "test_relay_passive_garbage", "[initiator]");
  807. REGISTER_TEST_CASE(test_relay_passive_wrong_message, "test_relay_passive_wrong_message", "[initiator]");
  808. REGISTER_TEST_CASE(test_relay_passive_unsuccessful_join, "test_relay_passive_unsuccessful_join", "[initiator]");
  809. REGISTER_TEST_CASE(test_relay_malformed_uri, "test_relay_malformed_uri", "[initiator]");
  810. REGISTER_TEST_CASE(test_relay_active_wrong_relay_device_id, "test_relay_active_wrong_relay_device_id",
  811. "[initiator]");
  812. REGISTER_TEST_CASE(test_relay_active_success, "test_relay_active_success", "[initiator]");
  813. REGISTER_TEST_CASE(test_relay_active_not_enabled, "test_relay_active_not_enabled", "[initiator]");
  814. REGISTER_TEST_CASE(test_relay_wrong_device, "test_relay_wrong_device", "[initiator]");
  815. REGISTER_TEST_CASE(test_relay_non_connectable, "test_relay_non_connectable", "[initiator]");
  816. REGISTER_TEST_CASE(test_relay_malformed_address, "test_relay_malformed_address", "[initiator]");
  817. REGISTER_TEST_CASE(test_relay_garbage_reply, "test_relay_garbage_reply", "[initiator]");
  818. REGISTER_TEST_CASE(test_relay_non_invitation_reply, "test_relay_non_invitation_reply", "[initiator]");
  819. return 1;
  820. }
  821. static int v = _init();