075-controller.cpp 84 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2025 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "test_supervisor.h"
  6. #include "model/cluster.h"
  7. #include "model/diff/contact/peer_state.h"
  8. #include "diff-builder.h"
  9. #include "hasher/hasher_proxy_actor.h"
  10. #include "hasher/hasher_actor.h"
  11. #include "net/controller_actor.h"
  12. #include "net/names.h"
  13. #include "fs/messages.h"
  14. #include "utils/error_code.h"
  15. #include "utils/tls.h"
  16. #include "proto/bep_support.h"
  17. #include <boost/core/demangle.hpp>
  18. #include <type_traits>
  19. using namespace syncspirit;
  20. using namespace syncspirit::test;
  21. using namespace syncspirit::model;
  22. using namespace syncspirit::net;
  23. using namespace syncspirit::hasher;
  24. namespace {
  25. struct sample_peer_config_t : public r::actor_config_t {
  26. model::device_id_t peer_device_id;
  27. bool auto_share = false;
  28. };
  29. template <typename Actor> struct sample_peer_config_builder_t : r::actor_config_builder_t<Actor> {
  30. using builder_t = typename Actor::template config_builder_t<Actor>;
  31. using parent_t = r::actor_config_builder_t<Actor>;
  32. using parent_t::parent_t;
  33. builder_t &&peer_device_id(const model::device_id_t &value) && noexcept {
  34. parent_t::config.peer_device_id = value;
  35. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  36. }
  37. builder_t &&auto_share(bool value) && noexcept {
  38. parent_t::config.auto_share = value;
  39. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  40. }
  41. };
  42. struct sample_peer_t : r::actor_base_t {
  43. using config_t = sample_peer_config_t;
  44. template <typename Actor> using config_builder_t = sample_peer_config_builder_t<Actor>;
  45. using remote_message_t = r::intrusive_ptr_t<net::message::forwarded_message_t>;
  46. using remote_messages_t = std::list<remote_message_t>;
  47. struct block_response_t {
  48. std::string name;
  49. size_t block_index;
  50. std::string data;
  51. sys::error_code ec;
  52. };
  53. using allowed_index_updates_t = std::unordered_set<std::string>;
  54. using block_responses_t = std::list<block_response_t>;
  55. using block_request_t = r::intrusive_ptr_t<net::message::block_request_t>;
  56. using block_requests_t = std::list<block_request_t>;
  57. using uploaded_blocks_t = std::list<proto::message::Response>;
  58. sample_peer_t(config_t &config)
  59. : r::actor_base_t{config}, auto_share(config.auto_share), peer_device{config.peer_device_id} {
  60. log = utils::get_logger("test.sample_peer");
  61. }
  62. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  63. r::actor_base_t::configure(plugin);
  64. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("sample_peer", false); });
  65. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  66. p.subscribe_actor(&sample_peer_t::on_start_reading);
  67. p.subscribe_actor(&sample_peer_t::on_termination);
  68. p.subscribe_actor(&sample_peer_t::on_transfer);
  69. p.subscribe_actor(&sample_peer_t::on_block_request);
  70. });
  71. }
  72. void shutdown_start() noexcept override {
  73. LOG_TRACE(log, "{}, shutdown_start", identity);
  74. if (controller) {
  75. send<net::payload::termination_t>(controller, shutdown_reason);
  76. }
  77. r::actor_base_t::shutdown_start();
  78. }
  79. void shutdown_finish() noexcept override {
  80. r::actor_base_t::shutdown_finish();
  81. LOG_TRACE(log, "{}, shutdown_finish, blocks requested = {}", identity, blocks_requested);
  82. if (controller) {
  83. send<net::payload::termination_t>(controller, shutdown_reason);
  84. }
  85. }
  86. void on_start_reading(net::message::start_reading_t &msg) noexcept {
  87. LOG_TRACE(log, "{}, on_start_reading", identity);
  88. controller = msg.payload.controller;
  89. reading = msg.payload.start;
  90. }
  91. void on_termination(net::message::termination_signal_t &msg) noexcept {
  92. LOG_TRACE(log, "{}, on_termination", identity);
  93. if (!shutdown_reason) {
  94. auto &ee = msg.payload.ee;
  95. auto reason = ee->message();
  96. LOG_TRACE(log, "{}, on_termination: {}", identity, reason);
  97. do_shutdown(ee);
  98. }
  99. }
  100. void on_transfer(net::message::transfer_data_t &message) noexcept {
  101. auto &data = message.payload.data;
  102. LOG_TRACE(log, "{}, on_transfer, bytes = {}", identity, data.size());
  103. auto buff = boost::asio::buffer(data.data(), data.size());
  104. auto result = proto::parse_bep(buff);
  105. auto orig = std::move(result.value().message);
  106. auto variant = net::payload::forwarded_message_t();
  107. std::visit(
  108. [&](auto &msg) {
  109. using boost::core::demangle;
  110. using T = std::decay_t<decltype(msg)>;
  111. LOG_TRACE(log, "{}, received '{}' message", identity, demangle(typeid(T).name()));
  112. using V = net::payload::forwarded_message_t;
  113. if constexpr (std::is_constructible_v<V, T>) {
  114. variant = std::move(msg);
  115. } else if constexpr (std::is_same_v<T, proto::message::Response>) {
  116. uploaded_blocks.push_back(std::move(msg));
  117. }
  118. },
  119. orig);
  120. auto fwd_msg = new net::message::forwarded_message_t(address, std::move(variant));
  121. messages.emplace_back(fwd_msg);
  122. for (auto &msg : messages) {
  123. auto &p = msg->payload;
  124. if (auto m = std::get_if<proto::message::Index>(&p); m) {
  125. allowed_index_updates.emplace((*m)->folder());
  126. }
  127. if (auto m = std::get_if<proto::message::IndexUpdate>(&p); m) {
  128. auto &folder_id = (*m)->folder();
  129. if ((allowed_index_updates.count(folder_id) == 0) && !auto_share) {
  130. LOG_WARN(log, "{}, IndexUpdate w/o previously recevied index", identity);
  131. std::abort();
  132. }
  133. }
  134. }
  135. }
  136. void process_block_requests() noexcept {
  137. auto condition = [&]() -> bool {
  138. if (block_requests.size() && block_responses.size()) {
  139. auto &req = block_requests.front();
  140. auto &res = block_responses.front();
  141. auto &req_payload = req->payload.request_payload;
  142. if (req_payload.block_index == res.block_index) {
  143. auto &name = res.name;
  144. return name.empty() || name == req_payload.file_name;
  145. }
  146. }
  147. return false;
  148. };
  149. while (condition()) {
  150. auto &reply = block_responses.front();
  151. auto &request = *block_requests.front();
  152. log->debug("{}, matched '{}', replying..., ec = {}", identity, reply.name, reply.ec.value());
  153. if (!reply.ec) {
  154. reply_to(request, reply.data);
  155. } else {
  156. reply_with_error(request, make_error(reply.ec));
  157. }
  158. block_responses.pop_front();
  159. block_requests.pop_front();
  160. }
  161. }
  162. void on_block_request(net::message::block_request_t &req) noexcept {
  163. block_requests.push_front(&req);
  164. ++blocks_requested;
  165. log->debug("{}, requesting block # {}", identity, block_requests.front()->payload.request_payload.block_index);
  166. if (block_responses.size()) {
  167. log->debug("{}, top response block # {}", identity, block_responses.front().block_index);
  168. }
  169. process_block_requests();
  170. }
  171. void forward(net::payload::forwarded_message_t payload) noexcept {
  172. send<net::payload::forwarded_message_t>(controller, std::move(payload));
  173. }
  174. static const constexpr size_t next_block = 1000000;
  175. void push_block(std::string_view data, size_t index, std::string name = {}) {
  176. if (index == next_block) {
  177. index = block_responses.size();
  178. }
  179. block_responses.push_back(block_response_t{std::move(name), index, std::string(data), {}});
  180. }
  181. void push_block(sys::error_code ec, size_t index) {
  182. if (index == next_block) {
  183. index = block_responses.size();
  184. }
  185. block_responses.push_back(block_response_t{std::string{}, index, std::string{}, ec});
  186. }
  187. int blocks_requested = 0;
  188. bool reading = false;
  189. bool auto_share = false;
  190. remote_messages_t messages;
  191. r::address_ptr_t controller;
  192. model::device_id_t peer_device;
  193. utils::logger_t log;
  194. block_requests_t block_requests;
  195. block_responses_t block_responses;
  196. uploaded_blocks_t uploaded_blocks;
  197. allowed_index_updates_t allowed_index_updates;
  198. };
  199. struct hasher_config_t : hasher::hasher_actor_config_t {
  200. uint32_t index;
  201. bool auto_reply = true;
  202. };
  203. template <typename Actor> struct hasher_config_builder_t : hasher::hasher_actor_config_builder_t<Actor> {
  204. using builder_t = typename Actor::template config_builder_t<Actor>;
  205. using parent_t = ::hasher_actor_config_builder_t<Actor>;
  206. using parent_t::parent_t;
  207. builder_t &&auto_reply(uint32_t value) && noexcept {
  208. parent_t::config.auto_reply = value;
  209. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  210. }
  211. };
  212. struct managed_hasher_t : r::actor_base_t {
  213. using config_t = hasher_config_t;
  214. template <typename Actor> using config_builder_t = hasher_config_builder_t<Actor>;
  215. using validation_request_t = hasher::message::validation_request_t;
  216. using validation_request_ptr_t = model::intrusive_ptr_t<validation_request_t>;
  217. using queue_t = std::deque<validation_request_ptr_t>;
  218. managed_hasher_t(config_t &cfg) : r::actor_base_t{cfg}, index{cfg.index}, auto_reply{cfg.auto_reply} {}
  219. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  220. r::actor_base_t::configure(plugin);
  221. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
  222. p.set_identity(fmt::format("hasher-{}", 1), false);
  223. log = utils::get_logger(fmt::format("test-hasher-{}", 1));
  224. });
  225. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) { p.register_name(identity, get_address()); });
  226. plugin.with_casted<r::plugin::starter_plugin_t>(
  227. [&](auto &p) { p.subscribe_actor(&managed_hasher_t::on_validation); });
  228. }
  229. void on_validation(validation_request_t &req) noexcept {
  230. queue.emplace_back(&req);
  231. if (auto_reply) {
  232. process_requests();
  233. }
  234. }
  235. void process_requests() noexcept {
  236. static const constexpr size_t SZ = SHA256_DIGEST_LENGTH;
  237. LOG_TRACE(log, "{}, process_requests", identity);
  238. while (!queue.empty()) {
  239. auto req = queue.front();
  240. queue.pop_front();
  241. auto &payload = *req->payload.request_payload;
  242. char digest[SZ];
  243. auto &data = payload.data;
  244. utils::digest(data.data(), data.length(), digest);
  245. bool eq = payload.hash == std::string_view(digest, SZ);
  246. reply_to(*req, eq);
  247. }
  248. }
  249. uint32_t index;
  250. bool auto_reply;
  251. utils::logger_t log;
  252. queue_t queue;
  253. };
  254. struct fixture_t {
  255. using peer_ptr_t = r::intrusive_ptr_t<sample_peer_t>;
  256. using target_ptr_t = r::intrusive_ptr_t<net::controller_actor_t>;
  257. using blk_req_t = fs::message::block_request_t;
  258. using blk_req_ptr_t = r::intrusive_ptr_t<blk_req_t>;
  259. using blk_res_t = fs::message::block_response_t;
  260. using blk_res_ptr_t = r::intrusive_ptr_t<blk_res_t>;
  261. using block_requests_t = std::deque<blk_req_ptr_t>;
  262. using block_responses_t = std::deque<r::message_ptr_t>;
  263. fixture_t(bool auto_start_, int64_t max_sequence_, bool auto_share_ = true) noexcept
  264. : auto_start{auto_start_}, auto_share{auto_share_}, max_sequence{max_sequence_} {
  265. test::init_logging();
  266. }
  267. void _start_target(std::string connection_id) {
  268. peer_actor = sup->create_actor<sample_peer_t>().auto_share(auto_share).timeout(timeout).finish();
  269. auto diff = model::diff::contact::peer_state_t::create(*cluster, peer_device->device_id().get_sha256(),
  270. peer_actor->get_address(), device_state_t::online,
  271. connection_id);
  272. sup->send<model::payload::model_update_t>(sup->get_address(), std::move(diff), nullptr);
  273. target = sup->create_actor<controller_actor_t>()
  274. .peer(peer_device)
  275. .peer_addr(peer_actor->get_address())
  276. .connection_id(connection_id)
  277. .request_pool(1024)
  278. .outgoing_buffer_max(1024'000)
  279. .cluster(cluster)
  280. .sequencer(sup->sequencer)
  281. .timeout(timeout)
  282. .request_timeout(timeout)
  283. .blocks_max_requested(get_blocks_max_requested())
  284. .finish();
  285. sup->do_process();
  286. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  287. target_addr = target->get_address();
  288. }
  289. virtual void start_target() noexcept { _start_target("test-common://1.2.3.4:5"); }
  290. virtual void run() noexcept {
  291. auto peer_id =
  292. device_id_t::from_string("VUV42CZ-IQD5A37-RPEBPM4-VVQK6E4-6WSKC7B-PVJQHHD-4PZD44V-ENC6WAZ").value();
  293. peer_device = device_t::create(peer_id, "peer-device").value();
  294. auto my_id =
  295. device_id_t::from_string("KHQNO2S-5QSILRK-YX4JZZ4-7L77APM-QNVGZJT-EKU7IFI-PNEPBMY-4MXFMQD").value();
  296. my_device = device_t::create(my_id, "my-device").value();
  297. cluster = new cluster_t(my_device, 1);
  298. cluster->get_devices().put(my_device);
  299. cluster->get_devices().put(peer_device);
  300. auto folder_id_1 = "1234-5678";
  301. auto folder_id_2 = "5555";
  302. auto builder = diff_builder_t(*cluster);
  303. auto sha256 = peer_id.get_sha256();
  304. builder.upsert_folder(folder_id_1, "")
  305. .upsert_folder(folder_id_2, "")
  306. .configure_cluster(sha256)
  307. .add(sha256, folder_id_1, 123, max_sequence)
  308. .finish();
  309. REQUIRE(builder.apply());
  310. if (auto_share) {
  311. REQUIRE(builder.share_folder(peer_id.get_sha256(), folder_id_1).apply());
  312. }
  313. r::system_context_t ctx;
  314. sup = ctx.create_supervisor<supervisor_t>().timeout(timeout).create_registry().finish();
  315. sup->cluster = cluster;
  316. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  317. plugin.template with_casted<r::plugin::registry_plugin_t>(
  318. [&](auto &p) { p.register_name(net::names::fs_actor, sup->get_address()); });
  319. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  320. p.subscribe_actor(r::lambda<blk_req_t>([&](blk_req_t &msg) {
  321. block_requests.push_back(&msg);
  322. if (block_responses.size()) {
  323. sup->put(block_responses.front());
  324. block_responses.pop_front();
  325. }
  326. }));
  327. });
  328. };
  329. sup->start();
  330. sup->do_process();
  331. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::OPERATIONAL);
  332. create_hasher();
  333. sup->create_actor<hasher::hasher_proxy_actor_t>()
  334. .timeout(timeout)
  335. .hasher_threads(1)
  336. .name(net::names::hasher_proxy)
  337. .finish();
  338. auto &folders = cluster->get_folders();
  339. folder_1 = folders.by_id(folder_id_1);
  340. folder_2 = folders.by_id(folder_id_2);
  341. folder_1_peer = folder_1->get_folder_infos().by_device_id(peer_id.get_sha256());
  342. start_target();
  343. if (auto_start) {
  344. REQUIRE(peer_actor->reading);
  345. REQUIRE(peer_actor->messages.size() == 1);
  346. auto &msg = (*peer_actor->messages.front()).payload;
  347. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  348. peer_actor->messages.pop_front();
  349. }
  350. main(builder);
  351. sup->shutdown();
  352. sup->do_process();
  353. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  354. }
  355. virtual void create_hasher() noexcept { sup->create_actor<hasher_actor_t>().index(1).timeout(timeout).finish(); }
  356. virtual void main(diff_builder_t &) noexcept {}
  357. virtual std::uint32_t get_blocks_max_requested() { return 8; }
  358. bool auto_start;
  359. bool auto_share;
  360. int64_t max_sequence;
  361. peer_ptr_t peer_actor;
  362. target_ptr_t target;
  363. r::address_ptr_t target_addr;
  364. r::pt::time_duration timeout = r::pt::millisec{10};
  365. cluster_ptr_t cluster;
  366. device_ptr_t peer_device;
  367. device_ptr_t my_device;
  368. r::intrusive_ptr_t<supervisor_t> sup;
  369. r::system_context_t ctx;
  370. model::folder_ptr_t folder_1;
  371. model::folder_info_ptr_t folder_1_peer;
  372. model::folder_ptr_t folder_2;
  373. block_requests_t block_requests;
  374. block_responses_t block_responses;
  375. };
  376. } // namespace
  377. void test_startup() {
  378. struct F : fixture_t {
  379. using fixture_t::fixture_t;
  380. void main(diff_builder_t &) noexcept override {
  381. REQUIRE(peer_actor->reading);
  382. REQUIRE(peer_actor->messages.size() == 1);
  383. auto &msg = (*peer_actor->messages.front()).payload;
  384. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  385. peer_actor->messages.pop_front();
  386. CHECK(peer_actor->messages.empty());
  387. auto cc = proto::ClusterConfig{};
  388. auto payload = proto::message::ClusterConfig(new proto::ClusterConfig(cc));
  389. peer_actor->forward(std::move(payload));
  390. sup->do_process();
  391. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  392. CHECK(peer_actor->messages.empty());
  393. }
  394. };
  395. F(false, 10, false).run();
  396. }
  397. void test_overwhelm() {
  398. struct F : fixture_t {
  399. using fixture_t::fixture_t;
  400. void main(diff_builder_t &) noexcept override {
  401. auto msg = &(*peer_actor->messages.front()).payload;
  402. REQUIRE(std::get_if<proto::message::ClusterConfig>(msg));
  403. peer_actor->messages.pop_front();
  404. CHECK(peer_actor->messages.empty());
  405. auto cc = proto::ClusterConfig{};
  406. auto payload = proto::message::ClusterConfig(new proto::ClusterConfig(cc));
  407. peer_actor->forward(std::move(payload));
  408. sup->do_process();
  409. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  410. CHECK(peer_actor->messages.empty());
  411. auto ex_peer = peer_actor;
  412. auto ex_target = target;
  413. _start_target("best://1.2.3.4:5");
  414. sup->do_process();
  415. REQUIRE(ex_peer != peer_actor);
  416. REQUIRE(ex_target != target);
  417. CHECK(static_cast<r::actor_base_t *>(ex_peer.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  418. CHECK(static_cast<r::actor_base_t *>(ex_target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  419. msg = &(*peer_actor->messages.front()).payload;
  420. REQUIRE(std::get_if<proto::message::ClusterConfig>(msg));
  421. peer_actor->messages.pop_front();
  422. payload = proto::message::ClusterConfig(new proto::ClusterConfig(cc));
  423. peer_actor->forward(std::move(payload));
  424. sup->do_process();
  425. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  426. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  427. CHECK(peer_actor->messages.empty());
  428. }
  429. };
  430. F(false, 10, false).run();
  431. }
  432. void test_index_receiving() {
  433. struct F : fixture_t {
  434. using fixture_t::fixture_t;
  435. void main(diff_builder_t &) noexcept override {
  436. auto cc = proto::ClusterConfig{};
  437. auto index = proto::Index{};
  438. SECTION("wrong index") {
  439. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  440. index.set_folder("non-existing-folder");
  441. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  442. sup->do_process();
  443. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  444. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  445. }
  446. SECTION("index is applied") {
  447. auto folder = cc.add_folders();
  448. folder->set_id(std::string(folder_1->get_id()));
  449. auto d_peer = folder->add_devices();
  450. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  451. REQUIRE(cluster->get_pending_folders().size() == 0);
  452. d_peer->set_max_sequence(10);
  453. d_peer->set_index_id(folder_1_peer->get_index());
  454. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  455. index.set_folder(std::string(folder_1->get_id()));
  456. auto file = index.add_files();
  457. file->set_name("some-dir");
  458. file->set_type(proto::FileInfoType::DIRECTORY);
  459. file->set_sequence(10);
  460. auto v = file->mutable_version();
  461. auto c = v->add_counters();
  462. c->set_id(peer_device->device_id().get_uint());
  463. c->set_value(1);
  464. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  465. sup->do_process();
  466. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  467. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  468. auto &folder_infos = folder_1->get_folder_infos();
  469. auto folder_peer = folder_infos.by_device(*peer_device);
  470. REQUIRE(folder_peer);
  471. CHECK(folder_peer->get_max_sequence() == 10ul);
  472. REQUIRE(folder_peer->get_file_infos().size() == 1);
  473. CHECK(folder_peer->get_file_infos().begin()->item->get_name() == file->name());
  474. auto folder_my = folder_infos.by_device(*my_device);
  475. REQUIRE(folder_my);
  476. CHECK(folder_my->get_max_sequence() == 1ul);
  477. REQUIRE(folder_my->get_file_infos().size() == 1);
  478. CHECK(folder_my->get_file_infos().begin()->item->get_name() == file->name());
  479. SECTION("then index update is applied") {
  480. auto index_update = proto::IndexUpdate{};
  481. index_update.set_folder(std::string(folder_1->get_id()));
  482. auto file = index_update.add_files();
  483. file->set_name("some-dir-2");
  484. file->set_type(proto::FileInfoType::DIRECTORY);
  485. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  486. auto v = file->mutable_version();
  487. auto c = v->add_counters();
  488. c->set_id(peer_device->device_id().get_uint());
  489. c->set_value(1);
  490. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  491. sup->do_process();
  492. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  493. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() ==
  494. r::state_t::OPERATIONAL);
  495. CHECK(folder_peer->get_max_sequence() == file->sequence());
  496. REQUIRE(folder_peer->get_file_infos().size() == 2);
  497. CHECK(folder_peer->get_file_infos().by_name("some-dir-2"));
  498. CHECK(folder_my->get_max_sequence() == 2ul);
  499. REQUIRE(folder_my->get_file_infos().size() == 2);
  500. CHECK(folder_my->get_file_infos().by_name("some-dir-2"));
  501. }
  502. }
  503. }
  504. };
  505. F(true, 10).run();
  506. }
  507. void test_index_sending() {
  508. struct F : fixture_t {
  509. using fixture_t::fixture_t;
  510. void main(diff_builder_t &) noexcept override {
  511. proto::FileInfo pr_file_info;
  512. pr_file_info.set_name("link");
  513. pr_file_info.set_type(proto::FileInfoType::SYMLINK);
  514. pr_file_info.set_symlink_target("/some/where");
  515. auto builder = diff_builder_t(*cluster);
  516. builder.local_update(folder_1->get_id(), pr_file_info);
  517. builder.apply(*sup);
  518. auto folder_1_my = folder_1->get_folder_infos().by_device(*my_device);
  519. auto cc = proto::ClusterConfig{};
  520. auto folder = cc.add_folders();
  521. folder->set_id(std::string(folder_1->get_id()));
  522. auto d_peer = folder->add_devices();
  523. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  524. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  525. d_peer->set_index_id(folder_1_peer->get_index());
  526. SECTION("peer has outdated by sequence view") {
  527. auto d_my = folder->add_devices();
  528. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  529. d_my->set_max_sequence(folder_1_my->get_max_sequence() - 1);
  530. d_my->set_index_id(folder_1_my->get_index());
  531. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  532. sup->do_process();
  533. auto &queue = peer_actor->messages;
  534. REQUIRE(queue.size() == 1);
  535. auto msg = &(*queue.front()).payload;
  536. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  537. REQUIRE(my_index_update.files_size() == 1);
  538. }
  539. SECTION("peer has outdated by index view") {
  540. auto d_my = folder->add_devices();
  541. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  542. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  543. d_my->set_index_id(folder_1_my->get_index() + 5);
  544. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  545. sup->do_process();
  546. auto &queue = peer_actor->messages;
  547. REQUIRE(queue.size() == 2);
  548. auto msg = &(*queue.front()).payload;
  549. auto &my_index = *std::get<proto::message::Index>(*msg);
  550. REQUIRE(my_index.files_size() == 0);
  551. queue.pop_front();
  552. msg = &(*queue.front()).payload;
  553. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  554. REQUIRE(my_index_update.files_size() == 1);
  555. }
  556. SECTION("peer has actual view") {
  557. auto d_my = folder->add_devices();
  558. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  559. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  560. d_my->set_index_id(folder_1_my->get_index());
  561. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  562. sup->do_process();
  563. auto &queue = peer_actor->messages;
  564. REQUIRE(queue.size() == 0);
  565. }
  566. }
  567. };
  568. F(true, 10).run();
  569. }
  570. void test_downloading() {
  571. struct F : fixture_t {
  572. using fixture_t::fixture_t;
  573. void main(diff_builder_t &) noexcept override {
  574. auto &folder_infos = folder_1->get_folder_infos();
  575. auto folder_my = folder_infos.by_device(*my_device);
  576. auto cc = proto::ClusterConfig{};
  577. auto folder = cc.add_folders();
  578. folder->set_id(std::string(folder_1->get_id()));
  579. auto d_peer = folder->add_devices();
  580. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  581. d_peer->set_max_sequence(10);
  582. d_peer->set_index_id(folder_1_peer->get_index());
  583. auto d_my = folder->add_devices();
  584. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  585. d_my->set_max_sequence(folder_my->get_max_sequence());
  586. d_my->set_index_id(folder_my->get_index());
  587. SECTION("cluster config & index has a new file => download it") {
  588. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  589. auto index = proto::Index{};
  590. index.set_folder(std::string(folder_1->get_id()));
  591. auto file = index.add_files();
  592. file->set_name("some-file");
  593. file->set_type(proto::FileInfoType::FILE);
  594. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  595. file->set_block_size(5);
  596. file->set_size(5);
  597. auto version = file->mutable_version();
  598. auto counter = version->add_counters();
  599. counter->set_id(1ul);
  600. counter->set_value(1ul);
  601. auto b1 = file->add_blocks();
  602. b1->set_hash(utils::sha256_digest("12345").value());
  603. b1->set_offset(0);
  604. b1->set_size(5);
  605. auto folder_my = folder_infos.by_device(*my_device);
  606. CHECK(folder_my->get_max_sequence() == 0ul);
  607. CHECK(!folder_my->get_folder()->is_synchronizing());
  608. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  609. sup->do_process();
  610. CHECK(folder_my->get_folder()->is_synchronizing());
  611. peer_actor->push_block("12345", 0);
  612. peer_actor->process_block_requests();
  613. sup->do_process();
  614. CHECK(!folder_my->get_folder()->is_synchronizing());
  615. REQUIRE(folder_my);
  616. CHECK(folder_my->get_max_sequence() == 1ul);
  617. REQUIRE(folder_my->get_file_infos().size() == 1);
  618. auto f = folder_my->get_file_infos().begin()->item;
  619. REQUIRE(f);
  620. CHECK(f->get_name() == file->name());
  621. CHECK(f->get_size() == 5);
  622. CHECK(f->get_blocks().size() == 1);
  623. CHECK(f->is_locally_available());
  624. CHECK(!f->is_locked());
  625. CHECK(peer_actor->blocks_requested == 1);
  626. auto &queue = peer_actor->messages;
  627. REQUIRE(queue.size() > 0);
  628. auto msg = &(*queue.back()).payload;
  629. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  630. REQUIRE(my_index_update.files_size() == 1);
  631. SECTION("dont redownload file only if metadata has changed") {
  632. auto index_update = proto::IndexUpdate{};
  633. index_update.set_folder(index.folder());
  634. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  635. counter->set_value(2ul);
  636. *index_update.add_files() = *file;
  637. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  638. sup->do_process();
  639. CHECK(peer_actor->blocks_requested == 1);
  640. CHECK(folder_my->get_max_sequence() == 2ul);
  641. f = folder_my->get_file_infos().begin()->item;
  642. CHECK(f->is_locally_available());
  643. CHECK(f->get_sequence() == 2ul);
  644. }
  645. }
  646. SECTION("download 2 files") {
  647. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  648. auto index = proto::Index{};
  649. index.set_folder(std::string(folder_1->get_id()));
  650. auto file_1 = index.add_files();
  651. file_1->set_name("file-1");
  652. file_1->set_type(proto::FileInfoType::FILE);
  653. file_1->set_sequence(folder_1_peer->get_max_sequence() + 1);
  654. file_1->set_block_size(5);
  655. file_1->set_size(5);
  656. auto version_1 = file_1->mutable_version();
  657. auto counter_1 = version_1->add_counters();
  658. counter_1->set_id(1ul);
  659. counter_1->set_value(1ul);
  660. auto file_2 = index.add_files();
  661. file_2->set_name("file-2");
  662. file_2->set_type(proto::FileInfoType::FILE);
  663. file_2->set_sequence(folder_1_peer->get_max_sequence() + 2);
  664. file_2->set_block_size(5);
  665. file_2->set_size(5);
  666. auto version_2 = file_2->mutable_version();
  667. auto counter_2 = version_2->add_counters();
  668. counter_2->set_id(1ul);
  669. counter_2->set_value(2ul);
  670. auto b1 = file_1->add_blocks();
  671. b1->set_hash(utils::sha256_digest("12345").value());
  672. b1->set_offset(0);
  673. b1->set_size(5);
  674. SECTION("with different blocks") {
  675. auto b2 = file_2->add_blocks();
  676. b2->set_hash(utils::sha256_digest("67890").value());
  677. b2->set_offset(0);
  678. b2->set_size(5);
  679. auto folder_my = folder_infos.by_device(*my_device);
  680. CHECK(folder_my->get_max_sequence() == 0ul);
  681. CHECK(!folder_my->get_folder()->is_synchronizing());
  682. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  683. peer_actor->push_block("12345", 0, file_1->name());
  684. peer_actor->push_block("67890", 0, file_2->name());
  685. sup->do_process();
  686. CHECK(!folder_my->get_folder()->is_synchronizing());
  687. CHECK(peer_actor->blocks_requested == 2);
  688. REQUIRE(folder_my);
  689. CHECK(folder_my->get_max_sequence() == 2ul);
  690. REQUIRE(folder_my->get_file_infos().size() == 2);
  691. {
  692. auto f = folder_my->get_file_infos().by_name(file_1->name());
  693. REQUIRE(f);
  694. CHECK(f->get_size() == 5);
  695. CHECK(f->get_blocks().size() == 1);
  696. CHECK(f->is_locally_available());
  697. CHECK(!f->is_locked());
  698. }
  699. {
  700. auto f = folder_my->get_file_infos().by_name(file_2->name());
  701. REQUIRE(f);
  702. CHECK(f->get_size() == 5);
  703. CHECK(f->get_blocks().size() == 1);
  704. CHECK(f->is_locally_available());
  705. CHECK(!f->is_locked());
  706. }
  707. }
  708. SECTION("with the same block") {
  709. *file_2->add_blocks() = *b1;
  710. auto folder_my = folder_infos.by_device(*my_device);
  711. CHECK(folder_my->get_max_sequence() == 0ul);
  712. CHECK(!folder_my->get_folder()->is_synchronizing());
  713. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  714. peer_actor->push_block("12345", 0, file_1->name());
  715. sup->do_process();
  716. CHECK(!folder_my->get_folder()->is_synchronizing());
  717. CHECK(peer_actor->blocks_requested == 1);
  718. REQUIRE(folder_my);
  719. CHECK(folder_my->get_max_sequence() == 2ul);
  720. REQUIRE(folder_my->get_file_infos().size() == 2);
  721. {
  722. auto f = folder_my->get_file_infos().by_name(file_1->name());
  723. REQUIRE(f);
  724. CHECK(f->get_size() == 5);
  725. CHECK(f->get_blocks().size() == 1);
  726. CHECK(f->is_locally_available());
  727. CHECK(!f->is_locked());
  728. }
  729. {
  730. auto f = folder_my->get_file_infos().by_name(file_2->name());
  731. REQUIRE(f);
  732. CHECK(f->get_size() == 5);
  733. CHECK(f->get_blocks().size() == 1);
  734. CHECK(f->is_locally_available());
  735. CHECK(!f->is_locked());
  736. }
  737. }
  738. SECTION("with the same blocks") {
  739. auto concurrent_writes = GENERATE(1, 5);
  740. cluster->modify_write_requests(concurrent_writes);
  741. *file_2->add_blocks() = *b1;
  742. *file_2->add_blocks() = *b1;
  743. file_2->set_size(10);
  744. auto folder_my = folder_infos.by_device(*my_device);
  745. CHECK(folder_my->get_max_sequence() == 0ul);
  746. CHECK(!folder_my->get_folder()->is_synchronizing());
  747. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  748. peer_actor->push_block("12345", 0, file_1->name());
  749. sup->do_process();
  750. CHECK(!folder_my->get_folder()->is_synchronizing());
  751. CHECK(peer_actor->blocks_requested == 1);
  752. REQUIRE(folder_my);
  753. CHECK(folder_my->get_max_sequence() == 2ul);
  754. REQUIRE(folder_my->get_file_infos().size() == 2);
  755. {
  756. auto f = folder_my->get_file_infos().by_name(file_1->name());
  757. REQUIRE(f);
  758. CHECK(f->get_size() == 5);
  759. CHECK(f->get_blocks().size() == 1);
  760. CHECK(f->is_locally_available());
  761. CHECK(!f->is_locked());
  762. }
  763. {
  764. auto f = folder_my->get_file_infos().by_name(file_2->name());
  765. REQUIRE(f);
  766. CHECK(f->get_size() == 10);
  767. CHECK(f->get_blocks().size() == 2);
  768. CHECK(f->is_locally_available());
  769. CHECK(!f->is_locked());
  770. }
  771. }
  772. }
  773. SECTION("don't attempt to download a file, which is deleted") {
  774. auto folder_peer = folder_infos.by_device(*peer_device);
  775. auto pr_fi = proto::FileInfo{};
  776. pr_fi.set_name("some-file");
  777. pr_fi.set_type(proto::FileInfoType::FILE);
  778. pr_fi.set_sequence(folder_1_peer->get_max_sequence() + 1);
  779. pr_fi.set_block_size(5);
  780. pr_fi.set_size(5);
  781. auto version = pr_fi.mutable_version();
  782. auto counter = version->add_counters();
  783. counter->set_id(1ul);
  784. counter->set_value(1ul);
  785. auto b1 = pr_fi.add_blocks();
  786. b1->set_hash(utils::sha256_digest("12345").value());
  787. b1->set_offset(0);
  788. b1->set_size(5);
  789. auto b = model::block_info_t::create(*b1).value();
  790. auto uuid = sup->sequencer->next_uuid();
  791. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_peer).value();
  792. file_info->assign_block(b, 0);
  793. REQUIRE(folder_peer->add_strict(file_info));
  794. cluster->get_blocks().put(b);
  795. d_peer->set_max_sequence(folder_1_peer->get_max_sequence() + 1);
  796. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  797. sup->do_process();
  798. auto blocks_requested = peer_actor->blocks_requested;
  799. auto index = proto::IndexUpdate{};
  800. index.set_folder(std::string(folder_1->get_id()));
  801. auto file = index.add_files();
  802. file->set_name("some-file");
  803. file->set_type(proto::FileInfoType::FILE);
  804. file->set_deleted(true);
  805. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  806. file->set_block_size(0);
  807. file->set_size(0);
  808. auto v = file->mutable_version();
  809. auto c = v->add_counters();
  810. c->set_id(peer_device->device_id().get_uint());
  811. c->set_value(1);
  812. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index)));
  813. sup->do_process();
  814. CHECK(folder_my->get_max_sequence() == 1ul);
  815. REQUIRE(folder_my->get_file_infos().size() == 1);
  816. auto f = folder_my->get_file_infos().begin()->item;
  817. REQUIRE(f);
  818. CHECK(f->get_name() == pr_fi.name());
  819. CHECK(f->get_size() == 0);
  820. CHECK(f->get_blocks().size() == 0);
  821. CHECK(f->is_locally_available());
  822. CHECK(f->is_deleted());
  823. CHECK(!f->is_locked());
  824. CHECK(f->get_sequence() == 1ul);
  825. CHECK(peer_actor->blocks_requested == blocks_requested);
  826. }
  827. SECTION("new file via index_update => download it") {
  828. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  829. auto index = proto::Index{};
  830. index.set_folder(std::string(folder_1->get_id()));
  831. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  832. auto index_update = proto::IndexUpdate{};
  833. index_update.set_folder(std::string(folder_1->get_id()));
  834. auto file = index_update.add_files();
  835. file->set_name("some-file");
  836. file->set_type(proto::FileInfoType::FILE);
  837. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  838. file->set_block_size(5);
  839. file->set_size(5);
  840. auto version = file->mutable_version();
  841. auto counter = version->add_counters();
  842. counter->set_id(1);
  843. counter->set_value(peer_device->device_id().get_uint());
  844. auto b1 = file->add_blocks();
  845. b1->set_hash(utils::sha256_digest("12345").value());
  846. b1->set_offset(0);
  847. b1->set_size(5);
  848. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  849. peer_actor->push_block("12345", 0);
  850. sup->do_process();
  851. auto folder_my = folder_infos.by_device(*my_device);
  852. CHECK(folder_my->get_max_sequence() == 1);
  853. REQUIRE(folder_my->get_file_infos().size() == 1);
  854. auto f = folder_my->get_file_infos().begin()->item;
  855. REQUIRE(f);
  856. CHECK(f->get_name() == file->name());
  857. CHECK(f->get_size() == 5);
  858. CHECK(f->get_blocks().size() == 1);
  859. CHECK(f->is_locally_available());
  860. CHECK(!f->is_locked());
  861. auto fp = folder_1_peer->get_file_infos().begin()->item;
  862. REQUIRE(fp);
  863. CHECK(!fp->is_locked());
  864. }
  865. SECTION("deleted file, has been restored => download it") {
  866. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  867. sup->do_process();
  868. auto index = proto::Index{};
  869. index.set_folder(std::string(folder_1->get_id()));
  870. auto file_1 = index.add_files();
  871. file_1->set_name("some-file");
  872. file_1->set_type(proto::FileInfoType::FILE);
  873. file_1->set_sequence(folder_1_peer->get_max_sequence() + 1);
  874. file_1->set_deleted(true);
  875. auto v1 = file_1->mutable_version();
  876. auto c1 = v1->add_counters();
  877. c1->set_id(1u);
  878. c1->set_value(1u);
  879. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  880. sup->do_process();
  881. CHECK(!folder_my->get_folder()->is_synchronizing());
  882. auto folder_my = folder_infos.by_device(*my_device);
  883. CHECK(folder_my->get_max_sequence() == 1);
  884. auto index_update = proto::IndexUpdate{};
  885. index_update.set_folder(std::string(folder_1->get_id()));
  886. auto file_2 = index_update.add_files();
  887. file_2->set_name("some-file");
  888. file_2->set_type(proto::FileInfoType::FILE);
  889. file_2->set_sequence(folder_1_peer->get_max_sequence() + 1);
  890. file_2->set_block_size(128 * 1024);
  891. file_2->set_size(5);
  892. auto v2 = file_2->mutable_version();
  893. auto c2 = v2->add_counters();
  894. c2->set_id(1u);
  895. c2->set_value(2u);
  896. auto b1 = file_2->add_blocks();
  897. b1->set_hash(utils::sha256_digest("12345").value());
  898. b1->set_offset(0);
  899. b1->set_size(5);
  900. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  901. peer_actor->push_block("12345", 0);
  902. sup->do_process();
  903. REQUIRE(folder_my->get_file_infos().size() == 1);
  904. auto f = folder_my->get_file_infos().begin()->item;
  905. REQUIRE(f);
  906. CHECK(f->get_name() == file_1->name());
  907. CHECK(f->get_size() == 5);
  908. CHECK(f->get_blocks().size() == 1);
  909. CHECK(f->is_locally_available());
  910. CHECK(!f->is_locked());
  911. CHECK(!f->is_deleted());
  912. }
  913. SECTION("download a file, which has the same blocks locally") {
  914. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  915. sup->do_process();
  916. auto index = proto::Index{};
  917. index.set_folder(std::string(folder_1->get_id()));
  918. auto file_1 = index.add_files();
  919. file_1->set_name("some-file");
  920. file_1->set_type(proto::FileInfoType::FILE);
  921. file_1->set_sequence(folder_1_peer->get_max_sequence() + 1);
  922. auto v1 = file_1->mutable_version();
  923. auto c1 = v1->add_counters();
  924. c1->set_id(1u);
  925. c1->set_value(1u);
  926. file_1->set_block_size(5);
  927. file_1->set_size(10);
  928. auto b1 = file_1->add_blocks();
  929. b1->set_hash(utils::sha256_digest("12345").value());
  930. b1->set_offset(0);
  931. b1->set_size(5);
  932. auto bi_1 = model::block_info_t::create(*b1).value();
  933. auto b2 = file_1->add_blocks();
  934. b2->set_hash(utils::sha256_digest("67890").value());
  935. b2->set_offset(5);
  936. b2->set_size(5);
  937. auto bi_2 = model::block_info_t::create(*b2).value();
  938. auto &blocks = cluster->get_blocks();
  939. blocks.put(bi_1);
  940. blocks.put(bi_2);
  941. auto pr_my = proto::FileInfo{};
  942. pr_my.set_name("some-file.source");
  943. pr_my.set_type(proto::FileInfoType::FILE);
  944. pr_my.set_sequence(2ul);
  945. pr_my.set_block_size(5);
  946. pr_my.set_size(5);
  947. pr_my.mutable_version()->add_counters()->set_id(my_device->device_id().get_uint());
  948. auto uuid = sup->sequencer->next_uuid();
  949. auto file_my = model::file_info_t::create(uuid, pr_my, folder_my).value();
  950. file_my->assign_block(bi_1, 0);
  951. file_my->mark_local_available(0);
  952. REQUIRE(folder_my->add_strict(file_my));
  953. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  954. peer_actor->push_block("67890", 1);
  955. cluster->modify_write_requests(10);
  956. sup->do_process();
  957. REQUIRE(folder_my->get_file_infos().size() == 2);
  958. auto f = folder_my->get_file_infos().by_name(file_1->name());
  959. REQUIRE(f);
  960. CHECK(f->get_name() == file_1->name());
  961. CHECK(f->get_size() == 10);
  962. CHECK(f->get_blocks().size() == 2);
  963. CHECK(f->is_locally_available());
  964. CHECK(!f->is_locked());
  965. }
  966. }
  967. };
  968. F(true, 10).run();
  969. }
  970. void test_downloading_errors() {
  971. struct F : fixture_t {
  972. using fixture_t::fixture_t;
  973. std::uint32_t get_blocks_max_requested() override { return 1; }
  974. void main(diff_builder_t &) noexcept override {
  975. auto &folder_infos = folder_1->get_folder_infos();
  976. auto folder_my = folder_infos.by_device(*my_device);
  977. auto cc = proto::ClusterConfig{};
  978. auto folder = cc.add_folders();
  979. folder->set_id(std::string(folder_1->get_id()));
  980. auto d_peer = folder->add_devices();
  981. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  982. d_peer->set_max_sequence(folder_1_peer->get_max_sequence() + 1);
  983. d_peer->set_index_id(folder_1_peer->get_index());
  984. auto d_my = folder->add_devices();
  985. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  986. d_my->set_max_sequence(folder_my->get_max_sequence());
  987. d_my->set_index_id(folder_my->get_index());
  988. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  989. auto index = proto::Index{};
  990. index.set_folder(std::string(folder_1->get_id()));
  991. auto file = index.add_files();
  992. file->set_name("some-file");
  993. file->set_type(proto::FileInfoType::FILE);
  994. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  995. file->set_block_size(5);
  996. file->set_size(15);
  997. auto version = file->mutable_version();
  998. auto counter = version->add_counters();
  999. counter->set_id(1ul);
  1000. counter->set_value(1ul);
  1001. auto b1 = file->add_blocks();
  1002. b1->set_hash(utils::sha256_digest("12345").value());
  1003. b1->set_offset(0);
  1004. b1->set_size(5);
  1005. auto b2 = file->add_blocks();
  1006. b2->set_hash(utils::sha256_digest("67890").value());
  1007. b2->set_offset(5);
  1008. b2->set_size(5);
  1009. auto b3 = file->add_blocks();
  1010. b3->set_hash(utils::sha256_digest("11111").value());
  1011. b3->set_offset(10);
  1012. b3->set_size(5);
  1013. CHECK(folder_my->get_max_sequence() == 0ul);
  1014. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1015. SECTION("general error, ok, do not shutdown") {
  1016. auto ec = utils::make_error_code(utils::request_error_code_t::generic);
  1017. peer_actor->push_block(ec, 0);
  1018. }
  1019. SECTION("hash mismatch, do not shutdown") {
  1020. peer_actor->push_block("zzz", 0);
  1021. peer_actor->push_block("67890", 1); // needed to terminate/shutdown controller
  1022. }
  1023. sup->do_process();
  1024. CHECK(peer_actor->blocks_requested <= 2);
  1025. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1026. auto folder_peer = folder_infos.by_device(*peer_device);
  1027. REQUIRE(folder_peer->get_file_infos().size() == 1);
  1028. auto f = folder_peer->get_file_infos().begin()->item;
  1029. REQUIRE(f);
  1030. CHECK(f->is_unreachable());
  1031. CHECK(!f->is_synchronizing());
  1032. CHECK(!f->is_locked());
  1033. CHECK(!f->local_file());
  1034. CHECK(!folder_my->get_folder()->is_synchronizing());
  1035. sup->do_process();
  1036. }
  1037. };
  1038. F(true, 10).run();
  1039. }
  1040. void test_download_from_scratch() {
  1041. struct F : fixture_t {
  1042. using fixture_t::fixture_t;
  1043. void main(diff_builder_t &) noexcept override {
  1044. sup->do_process();
  1045. peer_actor->messages.clear();
  1046. auto builder = diff_builder_t(*cluster);
  1047. auto sha256 = peer_device->device_id().get_sha256();
  1048. auto cc = proto::ClusterConfig{};
  1049. auto folder = cc.add_folders();
  1050. folder->set_id(std::string(folder_1->get_id()));
  1051. {
  1052. auto device = folder->add_devices();
  1053. device->set_id(std::string(peer_device->device_id().get_sha256()));
  1054. device->set_max_sequence(15);
  1055. device->set_index_id(12345);
  1056. }
  1057. {
  1058. auto device = folder->add_devices();
  1059. device->set_id(std::string(my_device->device_id().get_sha256()));
  1060. device->set_max_sequence(0);
  1061. device->set_index_id(0);
  1062. }
  1063. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1064. sup->do_process();
  1065. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  1066. auto index = proto::Index{};
  1067. index.set_folder(std::string(folder_1->get_id()));
  1068. auto file = index.add_files();
  1069. file->set_name("some-file");
  1070. file->set_type(proto::FileInfoType::FILE);
  1071. file->set_sequence(154);
  1072. file->set_block_size(5);
  1073. file->set_size(5);
  1074. auto version = file->mutable_version();
  1075. auto counter = version->add_counters();
  1076. counter->set_id(1ul);
  1077. counter->set_value(1ul);
  1078. auto b1 = file->add_blocks();
  1079. b1->set_hash(utils::sha256_digest("12345").value());
  1080. b1->set_offset(0);
  1081. b1->set_size(5);
  1082. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1083. peer_actor->push_block("12345", 0, file->name());
  1084. sup->do_process();
  1085. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1086. CHECK(folder_my->get_max_sequence() == 1ul);
  1087. CHECK(!folder_my->get_folder()->is_synchronizing());
  1088. auto f = folder_my->get_file_infos().by_name(file->name());
  1089. REQUIRE(f);
  1090. CHECK(f->get_size() == 5);
  1091. CHECK(f->get_blocks().size() == 1);
  1092. CHECK(f->is_locally_available());
  1093. CHECK(!f->is_locked());
  1094. REQUIRE(peer_actor->messages.size() == 3);
  1095. {
  1096. auto peer_msg = &peer_actor->messages.front()->payload;
  1097. REQUIRE(std::get_if<proto::message::ClusterConfig>(peer_msg));
  1098. peer_actor->messages.pop_front();
  1099. peer_msg = &peer_actor->messages.front()->payload;
  1100. REQUIRE(std::get_if<proto::message::Index>(peer_msg));
  1101. peer_actor->messages.pop_front();
  1102. peer_msg = &peer_actor->messages.front()->payload;
  1103. REQUIRE(std::get_if<proto::message::IndexUpdate>(peer_msg));
  1104. }
  1105. }
  1106. };
  1107. F(false, 10, false).run();
  1108. }
  1109. void test_download_resuming() {
  1110. struct F : fixture_t {
  1111. using fixture_t::fixture_t;
  1112. void main(diff_builder_t &) noexcept override {
  1113. sup->do_process();
  1114. auto builder = diff_builder_t(*cluster);
  1115. auto sha256 = peer_device->device_id().get_sha256();
  1116. auto cc = proto::ClusterConfig{};
  1117. auto folder = cc.add_folders();
  1118. folder->set_id(std::string(folder_1->get_id()));
  1119. auto d_peer = folder->add_devices();
  1120. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1121. d_peer->set_max_sequence(15);
  1122. d_peer->set_index_id(12345);
  1123. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1124. sup->do_process();
  1125. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  1126. auto folder_peer = folder_1->get_folder_infos().by_device(*peer_device);
  1127. REQUIRE(folder_peer->get_index() == d_peer->index_id());
  1128. auto index = proto::Index{};
  1129. index.set_folder(std::string(folder_1->get_id()));
  1130. auto file = index.add_files();
  1131. file->set_name("some-file");
  1132. file->set_type(proto::FileInfoType::FILE);
  1133. file->set_sequence(154);
  1134. file->set_block_size(5);
  1135. file->set_size(10);
  1136. auto version = file->mutable_version();
  1137. auto counter = version->add_counters();
  1138. counter->set_id(1ul);
  1139. counter->set_value(1ul);
  1140. auto b1 = file->add_blocks();
  1141. b1->set_hash(utils::sha256_digest("12345").value());
  1142. b1->set_offset(0);
  1143. b1->set_size(5);
  1144. auto b2 = file->add_blocks();
  1145. b2->set_hash(utils::sha256_digest("67890").value());
  1146. b2->set_offset(5);
  1147. b2->set_size(5);
  1148. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1149. peer_actor->push_block("12345", 0, file->name());
  1150. sup->do_process();
  1151. target->do_shutdown();
  1152. sup->do_process();
  1153. CHECK(!folder_1->is_synchronizing());
  1154. for (auto &it : cluster->get_blocks()) {
  1155. REQUIRE(!it.item->is_locked());
  1156. }
  1157. start_target();
  1158. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1159. peer_actor->push_block("67890", 1, file->name());
  1160. sup->do_process();
  1161. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1162. CHECK(folder_my->get_max_sequence() == 1ul);
  1163. CHECK(!folder_my->get_folder()->is_synchronizing());
  1164. auto f = folder_my->get_file_infos().by_name(file->name());
  1165. REQUIRE(f);
  1166. CHECK(f->get_size() == 10);
  1167. CHECK(f->get_blocks().size() == 2);
  1168. CHECK(f->is_locally_available());
  1169. CHECK(!f->is_locked());
  1170. }
  1171. };
  1172. F(false, 10, false).run();
  1173. }
  1174. void test_initiate_my_sharing() {
  1175. struct F : fixture_t {
  1176. using fixture_t::fixture_t;
  1177. void main(diff_builder_t &) noexcept override {
  1178. sup->do_process();
  1179. auto cc = proto::ClusterConfig{};
  1180. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1181. // nothing is shared
  1182. sup->do_process();
  1183. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1184. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1185. REQUIRE(peer_actor->messages.size() == 1);
  1186. auto peer_msg = &peer_actor->messages.front()->payload;
  1187. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1188. REQUIRE(peer_cluster_msg);
  1189. REQUIRE(*peer_cluster_msg);
  1190. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  1191. // share folder_1
  1192. peer_actor->messages.clear();
  1193. auto sha256 = peer_device->device_id().get_sha256();
  1194. diff_builder_t(*cluster).share_folder(sha256, folder_1->get_id()).apply(*sup);
  1195. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1196. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1197. REQUIRE(peer_actor->messages.size() == 1);
  1198. {
  1199. auto peer_msg = &peer_actor->messages.front()->payload;
  1200. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1201. REQUIRE((peer_cluster_msg && *peer_cluster_msg));
  1202. auto &msg = *peer_cluster_msg;
  1203. REQUIRE(msg->folders_size() == 1);
  1204. auto f = msg->folders(0);
  1205. REQUIRE(f.devices_size() == 2);
  1206. using f_t = std::remove_reference_t<std::remove_cv_t<decltype(f.devices(0))>>;
  1207. auto f_my = (f_t *){};
  1208. auto f_peer = (f_t *){};
  1209. for (int i = 0; i < f.devices_size(); ++i) {
  1210. auto &d = f.devices(i);
  1211. if (d.id() == my_device->device_id().get_sha256()) {
  1212. f_my = &d;
  1213. } else if (d.id() == peer_device->device_id().get_sha256()) {
  1214. f_peer = &d;
  1215. }
  1216. }
  1217. REQUIRE(f_peer);
  1218. CHECK(!f_peer->index_id());
  1219. CHECK(f_peer->max_sequence() == 0);
  1220. REQUIRE(f_my);
  1221. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1222. CHECK(f_my->index_id() == folder_my->get_index());
  1223. CHECK(f_my->max_sequence() == 0);
  1224. }
  1225. // unshare folder_1
  1226. auto peer_fi = folder_1->get_folder_infos().by_device(*peer_device);
  1227. peer_actor->messages.clear();
  1228. diff_builder_t(*cluster).unshare_folder(*peer_fi).apply(*sup);
  1229. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1230. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1231. REQUIRE(peer_actor->messages.size() == 1);
  1232. peer_msg = &peer_actor->messages.front()->payload;
  1233. peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1234. REQUIRE(peer_cluster_msg);
  1235. REQUIRE(*peer_cluster_msg);
  1236. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  1237. }
  1238. };
  1239. F(false, 10, false).run();
  1240. }
  1241. void test_initiate_peer_sharing() {
  1242. struct F : fixture_t {
  1243. using fixture_t::fixture_t;
  1244. void main(diff_builder_t &) noexcept override {
  1245. sup->do_process();
  1246. auto cc = proto::ClusterConfig{};
  1247. auto folder = cc.add_folders();
  1248. folder->set_id(std::string(folder_1->get_id()));
  1249. {
  1250. auto device = folder->add_devices();
  1251. device->set_id(std::string(peer_device->device_id().get_sha256()));
  1252. device->set_max_sequence(15);
  1253. device->set_index_id(0x12345);
  1254. }
  1255. {
  1256. auto device = folder->add_devices();
  1257. device->set_id(std::string(my_device->device_id().get_sha256()));
  1258. device->set_max_sequence(0);
  1259. device->set_index_id(0x0);
  1260. }
  1261. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1262. sup->do_process();
  1263. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1264. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1265. REQUIRE(peer_actor->messages.size() == 1);
  1266. {
  1267. auto peer_msg = &peer_actor->messages.front()->payload;
  1268. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1269. REQUIRE(peer_cluster_msg);
  1270. REQUIRE(*peer_cluster_msg);
  1271. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  1272. }
  1273. // share folder_1
  1274. peer_actor->messages.clear();
  1275. auto sha256 = peer_device->device_id().get_sha256();
  1276. diff_builder_t(*cluster).share_folder(sha256, folder_1->get_id()).apply(*sup);
  1277. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1278. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1279. REQUIRE(peer_actor->messages.size() == 2);
  1280. {
  1281. auto peer_msg = &peer_actor->messages.front()->payload;
  1282. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1283. REQUIRE((peer_cluster_msg && *peer_cluster_msg));
  1284. auto &msg = *peer_cluster_msg;
  1285. REQUIRE(msg->folders_size() == 1);
  1286. auto f = msg->folders(0);
  1287. REQUIRE(f.devices_size() == 2);
  1288. using f_t = std::remove_reference_t<std::remove_cv_t<decltype(f.devices(0))>>;
  1289. auto f_my = (f_t *){};
  1290. auto f_peer = (f_t *){};
  1291. for (int i = 0; i < f.devices_size(); ++i) {
  1292. auto &d = f.devices(i);
  1293. if (d.id() == my_device->device_id().get_sha256()) {
  1294. f_my = &d;
  1295. } else if (d.id() == peer_device->device_id().get_sha256()) {
  1296. f_peer = &d;
  1297. }
  1298. }
  1299. REQUIRE(f_peer);
  1300. CHECK(f_peer->index_id() == 0x12345);
  1301. CHECK(f_peer->max_sequence() == 0);
  1302. REQUIRE(f_my);
  1303. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1304. CHECK(f_my->index_id() == folder_my->get_index());
  1305. CHECK(f_my->max_sequence() == 0);
  1306. peer_actor->messages.pop_front();
  1307. peer_msg = &peer_actor->messages.front()->payload;
  1308. auto &index_msg = std::get<proto::message::Index>(*peer_msg);
  1309. CHECK(index_msg->folder() == folder_1->get_id());
  1310. CHECK(index_msg->files_size() == 0);
  1311. }
  1312. // unshare folder_1
  1313. auto peer_fi = folder_1->get_folder_infos().by_device(*peer_device);
  1314. peer_actor->messages.clear();
  1315. diff_builder_t(*cluster).unshare_folder(*peer_fi).apply(*sup);
  1316. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1317. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  1318. REQUIRE(peer_actor->messages.size() == 1);
  1319. {
  1320. auto peer_msg = &peer_actor->messages.front()->payload;
  1321. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  1322. REQUIRE(peer_cluster_msg);
  1323. REQUIRE(*peer_cluster_msg);
  1324. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  1325. }
  1326. }
  1327. };
  1328. F(false, 10, false).run();
  1329. }
  1330. void test_sending_index_updates() {
  1331. struct F : fixture_t {
  1332. using fixture_t::fixture_t;
  1333. void main(diff_builder_t &) noexcept override {
  1334. auto &folder_infos = folder_1->get_folder_infos();
  1335. auto folder_my = folder_infos.by_device(*my_device);
  1336. auto cc = proto::ClusterConfig{};
  1337. auto folder = cc.add_folders();
  1338. folder->set_id(std::string(folder_1->get_id()));
  1339. auto d_peer = folder->add_devices();
  1340. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1341. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1342. d_peer->set_index_id(folder_1_peer->get_index());
  1343. auto d_my = folder->add_devices();
  1344. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1345. d_my->set_max_sequence(folder_my->get_max_sequence());
  1346. d_my->set_index_id(folder_my->get_index());
  1347. auto index = proto::Index{};
  1348. auto folder_id = std::string(folder_1->get_id());
  1349. index.set_folder(folder_id);
  1350. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1351. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1352. sup->do_process();
  1353. auto builder = diff_builder_t(*cluster);
  1354. auto pr_file = proto::FileInfo();
  1355. pr_file.set_name("a.txt");
  1356. peer_actor->messages.clear();
  1357. builder.local_update(folder_id, pr_file).apply(*sup);
  1358. REQUIRE(peer_actor->messages.size() == 1);
  1359. auto &msg = peer_actor->messages.front();
  1360. auto &index_update = *std::get<proto::message::IndexUpdate>(msg->payload);
  1361. REQUIRE(index_update.files_size() == 1);
  1362. CHECK(index_update.files(0).name() == "a.txt");
  1363. }
  1364. };
  1365. F(true, 10).run();
  1366. }
  1367. void test_uploading() {
  1368. struct F : fixture_t {
  1369. using fixture_t::fixture_t;
  1370. void main(diff_builder_t &) noexcept override {
  1371. auto &folder_infos = folder_1->get_folder_infos();
  1372. auto folder_my = folder_infos.by_device(*my_device);
  1373. auto cc = proto::ClusterConfig{};
  1374. auto folder = cc.add_folders();
  1375. folder->set_id(std::string(folder_1->get_id()));
  1376. auto d_peer = folder->add_devices();
  1377. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1378. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1379. d_peer->set_index_id(folder_1_peer->get_index());
  1380. auto d_my = folder->add_devices();
  1381. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1382. d_my->set_max_sequence(folder_my->get_max_sequence());
  1383. d_my->set_index_id(folder_my->get_index());
  1384. auto pr_fi = proto::FileInfo{};
  1385. pr_fi.set_name("data.bin");
  1386. pr_fi.set_type(proto::FileInfoType::FILE);
  1387. pr_fi.set_sequence(folder_1_peer->get_max_sequence());
  1388. pr_fi.set_block_size(5);
  1389. pr_fi.set_size(5);
  1390. auto version = pr_fi.mutable_version();
  1391. auto counter = version->add_counters();
  1392. counter->set_id(1);
  1393. counter->set_value(my_device->device_id().get_uint());
  1394. auto b1 = pr_fi.add_blocks();
  1395. b1->set_hash(utils::sha256_digest("12345").value());
  1396. b1->set_offset(0);
  1397. b1->set_size(5);
  1398. auto b = model::block_info_t::create(*b1).value();
  1399. auto uuid = sup->sequencer->next_uuid();
  1400. pr_fi.set_sequence(folder_my->get_max_sequence() + 1);
  1401. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_my).value();
  1402. file_info->assign_block(b, 0);
  1403. REQUIRE(folder_my->add_strict(file_info));
  1404. auto req = proto::Request();
  1405. req.set_id(1);
  1406. req.set_folder(std::string(folder_1->get_id()));
  1407. req.set_name("data.bin");
  1408. req.set_offset(0);
  1409. req.set_size(5);
  1410. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1411. SECTION("upload regular file, no hash") {
  1412. peer_actor->forward(proto::message::Request(new proto::Request(req)));
  1413. auto req_ptr = proto::message::Request(new proto::Request(req));
  1414. auto res = r::make_message<fs::payload::block_response_t>(target->get_address(), std::move(req_ptr),
  1415. sys::error_code{}, std::string("12345"));
  1416. block_responses.push_back(res);
  1417. sup->do_process();
  1418. REQUIRE(block_requests.size() == 1);
  1419. CHECK(block_requests[0]->payload.remote_request->id() == 1);
  1420. CHECK(block_requests[0]->payload.remote_request->name() == "data.bin");
  1421. REQUIRE(peer_actor->uploaded_blocks.size() == 1);
  1422. auto &peer_res = *peer_actor->uploaded_blocks.front();
  1423. CHECK(peer_res.id() == 1);
  1424. CHECK(peer_res.code() == proto::ErrorCode::NO_BEP_ERROR);
  1425. CHECK(peer_res.data() == "12345");
  1426. }
  1427. }
  1428. };
  1429. F(true, 10).run();
  1430. }
  1431. void test_peer_removal() {
  1432. struct F : fixture_t {
  1433. using fixture_t::fixture_t;
  1434. void main(diff_builder_t &builder) noexcept override {
  1435. builder.remove_peer(*peer_device).apply(*sup);
  1436. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1437. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1438. CHECK(target->get_shutdown_reason()->root()->ec == utils::error_code_t::peer_has_been_removed);
  1439. }
  1440. };
  1441. F(true, 10).run();
  1442. }
  1443. void test_conflicts() {
  1444. struct F : fixture_t {
  1445. using fixture_t::fixture_t;
  1446. void main(diff_builder_t &) noexcept override {
  1447. sup->do_process();
  1448. auto builder = diff_builder_t(*cluster);
  1449. auto sha256 = peer_device->device_id().get_sha256();
  1450. auto cc = proto::ClusterConfig{};
  1451. auto folder = cc.add_folders();
  1452. folder->set_id(std::string(folder_1->get_id()));
  1453. auto d_peer = folder->add_devices();
  1454. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1455. d_peer->set_max_sequence(15);
  1456. d_peer->set_index_id(12345);
  1457. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1458. sup->do_process();
  1459. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  1460. auto folder_peer = folder_1->get_folder_infos().by_device(*peer_device);
  1461. REQUIRE(folder_peer->get_index() == d_peer->index_id());
  1462. auto index = proto::Index{};
  1463. index.set_folder(std::string(folder_1->get_id()));
  1464. auto file = index.add_files();
  1465. file->set_name("some-file.txt");
  1466. file->set_type(proto::FileInfoType::FILE);
  1467. file->set_sequence(154);
  1468. file->set_block_size(5);
  1469. file->set_size(5);
  1470. auto version = file->mutable_version();
  1471. auto c1 = version->add_counters();
  1472. c1->set_id(1ul);
  1473. c1->set_value(1ul);
  1474. auto b1 = file->add_blocks();
  1475. b1->set_hash(utils::sha256_digest("12345").value());
  1476. b1->set_offset(0);
  1477. b1->set_size(5);
  1478. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1479. peer_actor->push_block("12345", 0, file->name());
  1480. sup->do_process();
  1481. auto &folder_infos = folder_1->get_folder_infos();
  1482. auto local_folder = folder_infos.by_device(*my_device);
  1483. auto local_file = local_folder->get_file_infos().by_name(file->name());
  1484. auto pr_file = local_file->as_proto(false);
  1485. auto bi_2 = pr_file.mutable_blocks()->Add();
  1486. bi_2->set_hash(utils::sha256_digest("67890").value());
  1487. bi_2->set_size(5);
  1488. pr_file.set_modified_s(1734680000);
  1489. builder.local_update(folder_1->get_id(), pr_file);
  1490. builder.apply(*sup);
  1491. file->clear_blocks();
  1492. file->set_sequence(155);
  1493. c1->set_id(peer_device->device_id().get_uint());
  1494. auto b3 = file->add_blocks();
  1495. b3->set_hash(utils::sha256_digest("12346").value());
  1496. b3->set_offset(0);
  1497. b3->set_size(5);
  1498. auto index_update = proto::IndexUpdate{};
  1499. index_update.set_folder(std::string(folder_1->get_id()));
  1500. SECTION("local win") {
  1501. file->set_modified_s(1734670000);
  1502. c1->set_value(local_file->get_version()->get_best().value() - 1);
  1503. auto local_seq = local_file->get_sequence();
  1504. *index_update.add_files() = *file;
  1505. peer_actor->messages.clear();
  1506. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  1507. sup->do_process();
  1508. REQUIRE(local_folder->get_file_infos().size() == 1);
  1509. auto lf = local_folder->get_file_infos().by_sequence(local_seq);
  1510. REQUIRE(local_seq == lf->get_sequence());
  1511. CHECK(cluster->get_blocks().size() == 2);
  1512. CHECK(peer_actor->messages.size() == 0);
  1513. }
  1514. SECTION("remote win") {
  1515. file->set_modified_s(1734690000);
  1516. c1->set_value(local_file->get_version()->get_best().value() + 1);
  1517. *index_update.add_files() = *file;
  1518. peer_actor->push_block("12346", 0, file->name());
  1519. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  1520. sup->do_process();
  1521. auto local_folder = folder_infos.by_device(*my_device);
  1522. auto local_conflict = local_folder->get_file_infos().by_name(local_file->make_conflicting_name());
  1523. REQUIRE(local_conflict);
  1524. CHECK(local_conflict->get_size() == 5);
  1525. REQUIRE(local_conflict->get_blocks().size() == 1);
  1526. CHECK(local_conflict->get_blocks()[0]->get_hash() == bi_2->hash());
  1527. auto file = local_folder->get_file_infos().by_name(local_file->get_name());
  1528. REQUIRE(file);
  1529. CHECK(file->get_size() == 5);
  1530. REQUIRE(file->get_blocks().size() == 1);
  1531. CHECK(file->get_blocks()[0]->get_hash() == b3->hash());
  1532. CHECK(cluster->get_blocks().size() == 2);
  1533. auto &msg = peer_actor->messages.back();
  1534. auto &index_update_sent = *std::get<proto::message::IndexUpdate>(msg->payload);
  1535. REQUIRE(index_update_sent.files_size() == 2);
  1536. CHECK(index_update_sent.files(0).name() == local_conflict->get_name());
  1537. CHECK(index_update_sent.files(1).name() == file->get_name());
  1538. }
  1539. }
  1540. };
  1541. F(false, 10, false).run();
  1542. }
  1543. void test_download_interrupting() {
  1544. struct F : fixture_t {
  1545. using fixture_t::fixture_t;
  1546. void create_hasher() noexcept override {
  1547. hasher = sup->create_actor<managed_hasher_t>()
  1548. .index(1)
  1549. .auto_reply(hasher_auto_reply)
  1550. .timeout(timeout)
  1551. .finish()
  1552. .get();
  1553. }
  1554. void main(diff_builder_t &) noexcept override {
  1555. sup->do_process();
  1556. auto builder = diff_builder_t(*cluster);
  1557. auto sha256 = peer_device->device_id().get_sha256();
  1558. auto cc = proto::ClusterConfig{};
  1559. auto folder = cc.add_folders();
  1560. folder->set_id(std::string(folder_1->get_id()));
  1561. auto d_peer = folder->add_devices();
  1562. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1563. d_peer->set_max_sequence(15);
  1564. d_peer->set_index_id(12345);
  1565. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1566. sup->do_process();
  1567. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  1568. auto folder_peer = folder_1->get_folder_infos().by_device(*peer_device);
  1569. REQUIRE(folder_peer->get_index() == d_peer->index_id());
  1570. auto index = proto::Index{};
  1571. index.set_folder(std::string(folder_1->get_id()));
  1572. auto file = index.add_files();
  1573. file->set_name("some-file");
  1574. file->set_type(proto::FileInfoType::FILE);
  1575. file->set_sequence(154);
  1576. file->set_block_size(5);
  1577. file->set_size(10);
  1578. auto version = file->mutable_version();
  1579. auto counter = version->add_counters();
  1580. counter->set_id(1ul);
  1581. counter->set_value(1ul);
  1582. auto b1 = file->add_blocks();
  1583. b1->set_hash(utils::sha256_digest("12345").value());
  1584. b1->set_offset(0);
  1585. b1->set_size(5);
  1586. auto b2 = file->add_blocks();
  1587. b2->set_hash(utils::sha256_digest("67890").value());
  1588. b2->set_offset(5);
  1589. b2->set_size(5);
  1590. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1591. sup->do_process();
  1592. SECTION("block from peer") {
  1593. SECTION("folder is kept") {
  1594. SECTION("suspend folder") { builder.suspend(*folder_1).apply(*sup); }
  1595. SECTION("unshare folder") { builder.unshare_folder(*folder_peer).apply(*sup); }
  1596. peer_actor->push_block("12345", 0, file->name());
  1597. peer_actor->push_block("67890", 1, file->name());
  1598. peer_actor->process_block_requests();
  1599. sup->do_process();
  1600. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1601. CHECK(folder_my->get_file_infos().size() == 0);
  1602. }
  1603. SECTION("remove folder") {
  1604. sup->auto_ack_blocks = false;
  1605. peer_actor->push_block("67890", 1, file->name());
  1606. peer_actor->process_block_requests();
  1607. sup->do_process();
  1608. builder.remove_folder(*folder_1).apply(*sup);
  1609. sup->do_process();
  1610. hasher->process_requests();
  1611. sup->do_process();
  1612. peer_actor->push_block("12345", 0, file->name());
  1613. peer_actor->process_block_requests();
  1614. sup->do_process();
  1615. CHECK(peer_actor->blocks_requested == file->blocks_size());
  1616. CHECK(!cluster->get_folders().by_id(folder->id()));
  1617. }
  1618. }
  1619. SECTION("hash validation replies") {
  1620. SECTION("folder is kept") {
  1621. peer_actor->push_block("12345", 0, file->name());
  1622. peer_actor->process_block_requests();
  1623. sup->do_process();
  1624. SECTION("suspend folder") { builder.suspend(*folder_1).apply(*sup); }
  1625. SECTION("unshare folder") { builder.unshare_folder(*folder_peer).apply(*sup); }
  1626. hasher->process_requests();
  1627. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1628. CHECK(folder_my->get_file_infos().size() == 0);
  1629. }
  1630. SECTION("remove folder") {
  1631. builder.remove_folder(*folder_1).apply(*sup);
  1632. peer_actor->push_block("12345", 0, file->name());
  1633. peer_actor->process_block_requests();
  1634. hasher->process_requests();
  1635. sup->do_process();
  1636. CHECK(!cluster->get_folders().by_id(folder->id()));
  1637. }
  1638. }
  1639. SECTION("block acks from fs") {
  1640. sup->auto_ack_blocks = false;
  1641. hasher->auto_reply = true;
  1642. peer_actor->push_block("67890", 1, file->name());
  1643. peer_actor->push_block("12345", 0, file->name());
  1644. peer_actor->process_block_requests();
  1645. sup->do_process();
  1646. auto diff = sup->delayed_ack_holder;
  1647. REQUIRE(diff);
  1648. SECTION("suspend") {
  1649. builder.suspend(*folder_1);
  1650. sup->send<model::payload::model_update_t>(sup->get_address(), std::move(diff));
  1651. builder.apply(*sup);
  1652. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  1653. CHECK(folder_my->get_file_infos().size() == 0);
  1654. }
  1655. SECTION("remove") {
  1656. builder.remove_folder(*folder_1).apply(*sup);
  1657. sup->send<model::payload::model_update_t>(sup->get_address(), std::move(diff));
  1658. sup->do_process();
  1659. CHECK(!cluster->get_folders().by_id(folder->id()));
  1660. }
  1661. }
  1662. }
  1663. bool hasher_auto_reply = false;
  1664. managed_hasher_t *hasher;
  1665. };
  1666. F(false, 10, false).run();
  1667. }
  1668. int _init() {
  1669. REGISTER_TEST_CASE(test_startup, "test_startup", "[net]");
  1670. REGISTER_TEST_CASE(test_overwhelm, "test_overwhelm", "[net]");
  1671. REGISTER_TEST_CASE(test_index_receiving, "test_index_receiving", "[net]");
  1672. REGISTER_TEST_CASE(test_index_sending, "test_index_sending", "[net]");
  1673. REGISTER_TEST_CASE(test_downloading, "test_downloading", "[net]");
  1674. REGISTER_TEST_CASE(test_downloading_errors, "test_downloading_errors", "[net]");
  1675. REGISTER_TEST_CASE(test_download_from_scratch, "test_download_from_scratch", "[net]");
  1676. REGISTER_TEST_CASE(test_download_resuming, "test_download_resuming", "[net]");
  1677. REGISTER_TEST_CASE(test_initiate_my_sharing, "test_initiate_my_sharing", "[net]");
  1678. REGISTER_TEST_CASE(test_initiate_peer_sharing, "test_initiate_peer_sharing", "[net]");
  1679. REGISTER_TEST_CASE(test_sending_index_updates, "test_sending_index_updates", "[net]");
  1680. REGISTER_TEST_CASE(test_uploading, "test_uploading", "[net]");
  1681. REGISTER_TEST_CASE(test_peer_removal, "test_peer_removal", "[net]");
  1682. REGISTER_TEST_CASE(test_conflicts, "test_conflicts", "[net]");
  1683. REGISTER_TEST_CASE(test_download_interrupting, "test_download_interrupting", "[net]");
  1684. return 1;
  1685. }
  1686. static int v = _init();