017-req-res.cpp 27 KB


  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "rotor.hpp"
  7. #include "supervisor_test.h"
  8. #include "access.h"
  9. namespace r = rotor;
  10. namespace rt = r::test;
  11. struct response_sample_t {
  12. int value;
  13. };
  14. struct request_sample_t {
  15. using response_t = response_sample_t;
  16. int value;
  17. };
  18. struct res2_t : r::arc_base_t<res2_t> {
  19. int value;
  20. explicit res2_t(int value_) : value{value_} {}
  21. virtual ~res2_t() {}
  22. };
  23. struct req2_t {
  24. using response_t = r::intrusive_ptr_t<res2_t>;
  25. int value;
  26. };
  27. struct notify_t {};
  28. struct res3_t : r::arc_base_t<res3_t> {
  29. int value;
  30. explicit res3_t(int value_) : value{value_} {}
  31. res3_t(const res3_t &) = delete;
  32. res3_t(res3_t &&) = delete;
  33. virtual ~res3_t() {}
  34. };
  35. struct req3_t : r::arc_base_t<req3_t> {
  36. using response_t = r::intrusive_ptr_t<res3_t>;
  37. int value;
  38. explicit req3_t(int value_) : value{value_} {}
  39. req3_t(const req3_t &) = delete;
  40. req3_t(req3_t &&) = delete;
  41. virtual ~req3_t() {}
  42. };
  43. static_assert(std::is_base_of_v<r::arc_base_t<req3_t>, req3_t>, "zzz");
  44. using traits_t = r::request_traits_t<request_sample_t>;
  45. using req_ptr_t = r::intrusive_ptr_t<traits_t::request::message_t>;
  46. using res_ptr_t = r::intrusive_ptr_t<traits_t::response::message_t>;
  47. using notify_msg_t = r::message_t<notify_t>;
  48. struct good_actor_t : public r::actor_base_t {
  49. using r::actor_base_t::actor_base_t;
  50. int req_val = 0;
  51. int res_val = 0;
  52. r::extended_error_ptr_t ee;
  53. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  54. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  55. p.subscribe_actor(&good_actor_t::on_request);
  56. p.subscribe_actor(&good_actor_t::on_response);
  57. });
  58. }
  59. void on_start() noexcept override {
  60. r::actor_base_t::on_start();
  61. request<request_sample_t>(address, 4).send(r::pt::seconds(1));
  62. }
  63. void on_request(traits_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  64. void on_response(traits_t::response::message_t &msg) noexcept {
  65. req_val += msg.payload.req->payload.request_payload.value;
  66. res_val += msg.payload.res.value;
  67. ee = msg.payload.ee;
  68. }
  69. };
  70. struct bad_actor_t : public r::actor_base_t {
  71. using r::actor_base_t::actor_base_t;
  72. int req_val = 0;
  73. int res_val = 0;
  74. r::extended_error_ptr_t ee;
  75. r::intrusive_ptr_t<traits_t::request::message_t> req_msg;
  76. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  77. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  78. p.subscribe_actor(&bad_actor_t::on_request);
  79. p.subscribe_actor(&bad_actor_t::on_response);
  80. });
  81. }
  82. void shutdown_start() noexcept override {
  83. req_msg.reset();
  84. r::actor_base_t::shutdown_start();
  85. }
  86. void on_start() noexcept override {
  87. r::actor_base_t::on_start();
  88. request<request_sample_t>(address, 4).send(rt::default_timeout);
  89. }
  90. void on_request(traits_t::request::message_t &msg) noexcept { req_msg.reset(&msg); }
  91. void on_response(traits_t::response::message_t &msg) noexcept {
  92. req_val += msg.payload.req->payload.request_payload.value;
  93. ee = msg.payload.ee;
  94. if (!ee) {
  95. res_val += 9;
  96. }
  97. }
  98. };
  99. struct bad_actor2_t : public r::actor_base_t {
  100. using r::actor_base_t::actor_base_t;
  101. int req_val = 0;
  102. int res_val = 0;
  103. r::extended_error_ptr_t ee;
  104. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  105. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  106. p.subscribe_actor(&bad_actor2_t::on_request);
  107. p.subscribe_actor(&bad_actor2_t::on_response);
  108. });
  109. }
  110. void on_start() noexcept override {
  111. r::actor_base_t::on_start();
  112. request<request_sample_t>(address, 4).send(rt::default_timeout);
  113. }
  114. void on_request(traits_t::request::message_t &msg) noexcept {
  115. auto ec = r::make_error_code(r::error_code_t::request_timeout);
  116. reply_with_error(msg, make_error(ec));
  117. }
  118. void on_response(traits_t::response::message_t &msg) noexcept {
  119. req_val += msg.payload.req->payload.request_payload.value;
  120. ee = msg.payload.ee;
  121. if (!ee) {
  122. res_val += 9;
  123. }
  124. }
  125. };
  126. struct good_supervisor_t : rt::supervisor_test_t {
  127. int req_val = 0;
  128. int res_val = 0;
  129. r::extended_error_ptr_t ee;
  130. using rt::supervisor_test_t::supervisor_test_t;
  131. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  132. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  133. p.subscribe_actor(&good_supervisor_t::on_request);
  134. p.subscribe_actor(&good_supervisor_t::on_response);
  135. });
  136. }
  137. void on_start() noexcept override {
  138. rt::supervisor_test_t::on_start();
  139. request<request_sample_t>(this->address, 4).send(rt::default_timeout);
  140. }
  141. void on_request(traits_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  142. void on_response(traits_t::response::message_t &msg) noexcept {
  143. req_val += msg.payload.req->payload.request_payload.value;
  144. res_val += msg.payload.res.value;
  145. ee = msg.payload.ee;
  146. }
  147. };
  148. struct good_actor2_t : public r::actor_base_t {
  149. using traits2_t = r::request_traits_t<req2_t>;
  150. using r::actor_base_t::actor_base_t;
  151. int req_val = 0;
  152. int res_val = 0;
  153. r::address_ptr_t reply_addr;
  154. r::extended_error_ptr_t ee;
  155. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  156. plugin.with_casted<r::plugin::starter_plugin_t>([this](auto &p) {
  157. reply_addr = create_address();
  158. p.subscribe_actor(&good_actor2_t::on_response, reply_addr);
  159. p.subscribe_actor(&good_actor2_t::on_request);
  160. });
  161. }
  162. void on_start() noexcept override {
  163. r::actor_base_t::on_start();
  164. request_via<req2_t>(address, reply_addr, 4).send(rt::default_timeout);
  165. }
  166. void on_request(traits2_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  167. void on_response(traits2_t::response::message_t &msg) noexcept {
  168. req_val += msg.payload.req->payload.request_payload.value;
  169. res_val += msg.payload.res->value;
  170. ee = msg.payload.ee;
  171. }
  172. };
  173. struct good_actor3_t : public r::actor_base_t {
  174. using traits2_t = r::request_traits_t<req2_t>;
  175. using r::actor_base_t::actor_base_t;
  176. int req_left = 1;
  177. int req_val = 0;
  178. int res_val = 0;
  179. r::extended_error_ptr_t ee;
  180. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  181. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  182. p.subscribe_actor(&good_actor3_t::on_response);
  183. p.subscribe_actor(&good_actor3_t::on_request);
  184. });
  185. }
  186. void on_start() noexcept override {
  187. r::actor_base_t::on_start();
  188. request<req2_t>(address, 4).send(rt::default_timeout);
  189. }
  190. void on_request(traits2_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  191. void on_response(traits2_t::response::message_t &msg) noexcept {
  192. req_val += msg.payload.req->payload.request_payload.value;
  193. res_val += msg.payload.res->value;
  194. ee = msg.payload.ee;
  195. if (req_left) {
  196. --req_left;
  197. request<req2_t>(address, 4).send(rt::default_timeout);
  198. }
  199. }
  200. };
  201. struct request_forwarder_t : public r::actor_base_t {
  202. using traits2_t = r::request_traits_t<req2_t>;
  203. using req_ptr_t = traits2_t::request::message_ptr_t;
  204. using r::actor_base_t::actor_base_t;
  205. int req_val = 0;
  206. int res_val = 0;
  207. r::address_ptr_t back_addr;
  208. r::request_id_t back_req1_id = 0;
  209. r::request_id_t back_req2_id = 0;
  210. req_ptr_t req_ptr;
  211. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  212. plugin.with_casted<r::plugin::starter_plugin_t>([this](auto &p) {
  213. back_addr = supervisor->create_address();
  214. p.subscribe_actor(&request_forwarder_t::on_request_front);
  215. p.subscribe_actor(&request_forwarder_t::on_response_front);
  216. p.subscribe_actor(&request_forwarder_t::on_request_back, back_addr);
  217. p.subscribe_actor(&request_forwarder_t::on_response_back, back_addr);
  218. });
  219. }
  220. void shutdown_start() noexcept override {
  221. req_ptr.reset();
  222. r::actor_base_t::shutdown_start();
  223. }
  224. void on_start() noexcept override {
  225. r::actor_base_t::on_start();
  226. request<req2_t>(address, 4).send(rt::default_timeout);
  227. }
  228. void on_request_front(traits2_t::request::message_t &msg) noexcept {
  229. auto &payload = msg.payload.request_payload;
  230. back_req1_id = request_via<req2_t>(back_addr, back_addr, payload).send(r::pt::seconds(1));
  231. req_ptr = &msg;
  232. }
  233. void on_response_front(traits2_t::response::message_t &msg) noexcept {
  234. req_val += msg.payload.req->payload.request_payload.value;
  235. res_val += msg.payload.res->value;
  236. }
  237. void on_request_back(traits2_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  238. void on_response_back(traits2_t::response::message_t &msg) noexcept {
  239. req_val += msg.payload.req->payload.request_payload.value * 2;
  240. res_val += msg.payload.res->value * 2;
  241. back_req2_id = msg.payload.request_id();
  242. reply_to(*req_ptr, msg.payload.ee, std::move(msg.payload.res));
  243. }
  244. };
  245. struct intrusive_actor_t : public r::actor_base_t {
  246. using traits3_t = r::request_traits_t<req3_t>;
  247. using req_ptr_t = traits3_t::request::message_ptr_t;
  248. using r::actor_base_t::actor_base_t;
  249. int req_val = 0;
  250. int res_val = 0;
  251. r::address_ptr_t back_addr;
  252. req_ptr_t req_ptr;
  253. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  254. plugin.with_casted<r::plugin::starter_plugin_t>([this](auto &p) {
  255. back_addr = supervisor->create_address();
  256. p.subscribe_actor(&intrusive_actor_t::on_request_front);
  257. p.subscribe_actor(&intrusive_actor_t::on_response_front);
  258. p.subscribe_actor(&intrusive_actor_t::on_request_back, back_addr);
  259. p.subscribe_actor(&intrusive_actor_t::on_response_back, back_addr);
  260. });
  261. }
  262. void shutdown_start() noexcept override {
  263. req_ptr.reset();
  264. r::actor_base_t::shutdown_start();
  265. }
  266. void on_start() noexcept override {
  267. r::actor_base_t::on_start();
  268. request<req3_t>(address, 4).send(r::pt::seconds(1));
  269. }
  270. void on_request_front(traits3_t::request::message_t &msg) noexcept {
  271. auto &payload = msg.payload.request_payload;
  272. request_via<req3_t>(back_addr, back_addr, payload).send(r::pt::seconds(1));
  273. req_ptr = &msg;
  274. }
  275. void on_response_front(traits3_t::response::message_t &msg) noexcept {
  276. req_val += msg.payload.req->payload.request_payload->value;
  277. res_val += msg.payload.res->value;
  278. }
  279. void on_request_back(traits3_t::request::message_t &msg) noexcept { reply_to(msg, 5); }
  280. void on_response_back(traits3_t::response::message_t &msg) noexcept {
  281. req_val += msg.payload.req->payload.request_payload->value * 2;
  282. res_val += msg.payload.res->value * 2;
  283. reply_to(*req_ptr, msg.payload.ee, std::move(msg.payload.res));
  284. }
  285. };
  286. struct duplicating_actor_t : public r::actor_base_t {
  287. using r::actor_base_t::actor_base_t;
  288. int req_val = 0;
  289. int res_val = 0;
  290. r::extended_error_ptr_t ee;
  291. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  292. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  293. p.subscribe_actor(&duplicating_actor_t::on_request);
  294. p.subscribe_actor(&duplicating_actor_t::on_response);
  295. });
  296. }
  297. void on_start() noexcept override {
  298. r::actor_base_t::on_start();
  299. request<request_sample_t>(address, 4).send(rt::default_timeout);
  300. }
  301. void on_request(traits_t::request::message_t &msg) noexcept {
  302. reply_to(msg, 5);
  303. reply_to(msg, 5);
  304. }
  305. void on_response(traits_t::response::message_t &msg) noexcept {
  306. req_val += msg.payload.req->payload.request_payload.value;
  307. res_val += msg.payload.res.value;
  308. ee = msg.payload.ee;
  309. }
  310. };
  311. struct req_actor_t : r::actor_base_t {
  312. using r::actor_base_t::actor_base_t;
  313. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  314. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&req_actor_t::on_response); });
  315. }
  316. void do_request() { request<request_sample_t>(target, 4).send(rt::default_timeout); }
  317. void on_response(traits_t::response::message_t &msg) noexcept { res = &msg; }
  318. auto &get_state() noexcept { return state; }
  319. r::address_ptr_t target;
  320. res_ptr_t res;
  321. };
  322. struct res_actor_t : r::actor_base_t {
  323. using r::actor_base_t::actor_base_t;
  324. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  325. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&res_actor_t::on_request); });
  326. }
  327. void on_request(traits_t::request::message_t &msg) noexcept { req = &msg; }
  328. auto &get_state() noexcept { return state; }
  329. req_ptr_t req;
  330. };
  331. struct order_actor_t : r::actor_base_t {
  332. using r::actor_base_t::actor_base_t;
  333. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  334. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  335. order = 5;
  336. p.subscribe_actor(&order_actor_t::on_request);
  337. p.subscribe_actor(&order_actor_t::on_response);
  338. p.subscribe_actor(&order_actor_t::on_notify);
  339. });
  340. }
  341. void on_start() noexcept override { request<request_sample_t>(address).send(rt::default_timeout); }
  342. void on_request(traits_t::request::message_t &msg) noexcept {
  343. reply_to(msg);
  344. send<notify_t>(address);
  345. }
  346. void on_response(traits_t::response::message_t &) noexcept { order *= 10; }
  347. void on_notify(notify_msg_t &) noexcept { order += 3; }
  348. int order;
  349. };
  350. TEST_CASE("request-response successful delivery", "[actor]") {
  351. r::system_context_t system_context;
  352. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  353. sup->do_process();
  354. auto init_subs_count = sup->get_subscription().access<rt::to::mine_handlers>().size();
  355. auto init_pts_count = sup->get_points().size();
  356. auto actor = sup->create_actor<good_actor_t>().timeout(rt::default_timeout).finish();
  357. sup->do_process();
  358. REQUIRE(sup->active_timers.size() == 0);
  359. REQUIRE(actor->req_val == 4);
  360. REQUIRE(actor->res_val == 5);
  361. CHECK(!actor->ee);
  362. actor->do_shutdown();
  363. sup->do_process();
  364. REQUIRE(sup->active_timers.size() == 0);
  365. std::size_t delta = 1; /* + shutdown confirmation triggered on self */
  366. REQUIRE(sup->get_points().size() == init_pts_count + delta);
  367. REQUIRE(sup->get_subscription().access<rt::to::mine_handlers>().size() == init_subs_count + delta);
  368. sup->do_shutdown();
  369. sup->do_process();
  370. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  371. REQUIRE(sup->get_leader_queue().size() == 0);
  372. REQUIRE(sup->get_points().size() == 0);
  373. CHECK(rt::empty(sup->get_subscription()));
  374. REQUIRE(sup->get_children_count() == 0);
  375. REQUIRE(sup->get_requests().size() == 0);
  376. REQUIRE(sup->active_timers.size() == 0);
  377. }
  378. TEST_CASE("request-response successful delivery identical message to 2 actors", "[actor]") {
  379. r::system_context_t system_context;
  380. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  381. auto actor1 = sup->create_actor<good_actor_t>().timeout(rt::default_timeout).finish();
  382. auto actor2 = sup->create_actor<good_actor_t>().timeout(rt::default_timeout).finish();
  383. sup->do_process();
  384. REQUIRE(sup->active_timers.size() == 0);
  385. REQUIRE(actor1->req_val == 4);
  386. REQUIRE(actor1->res_val == 5);
  387. CHECK(!actor1->ee);
  388. REQUIRE(actor2->req_val == 4);
  389. REQUIRE(actor2->res_val == 5);
  390. CHECK(!actor2->ee);
  391. sup->do_shutdown();
  392. sup->do_process();
  393. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  394. REQUIRE(sup->get_leader_queue().size() == 0);
  395. REQUIRE(sup->get_points().size() == 0);
  396. CHECK(rt::empty(sup->get_subscription()));
  397. REQUIRE(sup->get_children_count() == 0);
  398. REQUIRE(sup->get_requests().size() == 0);
  399. REQUIRE(sup->active_timers.size() == 0);
  400. }
  401. TEST_CASE("request-response timeout", "[actor]") {
  402. r::system_context_t system_context;
  403. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  404. auto actor = sup->create_actor<bad_actor_t>().timeout(rt::default_timeout).finish();
  405. sup->do_process();
  406. REQUIRE(actor->req_val == 0);
  407. REQUIRE(actor->res_val == 0);
  408. REQUIRE(sup->active_timers.size() == 1);
  409. REQUIRE(!actor->ee);
  410. auto timer_it = *sup->active_timers.begin();
  411. ((r::actor_base_t *)sup.get())
  412. ->access<rt::to::on_timer_trigger, r::request_id_t, bool>(timer_it->request_id, false);
  413. sup->do_process();
  414. REQUIRE(actor->req_msg);
  415. REQUIRE(actor->req_val == 4);
  416. REQUIRE(actor->res_val == 0);
  417. REQUIRE(actor->ee);
  418. REQUIRE(actor->ee->ec == r::error_code_t::request_timeout);
  419. REQUIRE(actor->ee->ec.message() == std::string("request timeout"));
  420. REQUIRE(actor->ee->request);
  421. sup->active_timers.clear();
  422. actor->reply_to(*actor->req_msg, 1);
  423. sup->do_process();
  424. // nothing should be changed, i.e. reply should just be dropped
  425. REQUIRE(actor->req_val == 4);
  426. REQUIRE(actor->res_val == 0);
  427. REQUIRE(actor->ee->ec == r::error_code_t::request_timeout);
  428. sup->do_shutdown();
  429. sup->do_process();
  430. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  431. REQUIRE(sup->get_leader_queue().size() == 0);
  432. REQUIRE(sup->get_points().size() == 0);
  433. CHECK(rt::empty(sup->get_subscription()));
  434. REQUIRE(sup->active_timers.size() == 0);
  435. }
  436. TEST_CASE("response with custom error", "[actor]") {
  437. r::system_context_t system_context;
  438. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  439. auto actor = sup->create_actor<bad_actor2_t>().timeout(rt::default_timeout).finish();
  440. sup->do_process();
  441. REQUIRE(actor->req_val == 4);
  442. REQUIRE(actor->res_val == 0);
  443. REQUIRE(actor->ee);
  444. REQUIRE(actor->ee->ec == r::error_code_t::request_timeout);
  445. REQUIRE(sup->active_timers.size() == 0);
  446. sup->do_shutdown();
  447. sup->do_process();
  448. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  449. REQUIRE(sup->get_leader_queue().size() == 0);
  450. REQUIRE(sup->get_points().size() == 0);
  451. CHECK(rt::empty(sup->get_subscription()));
  452. }
  453. TEST_CASE("request-response successful delivery (supervisor)", "[supervisor]") {
  454. r::system_context_t system_context;
  455. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  456. sup->do_process();
  457. REQUIRE(sup->active_timers.size() == 0);
  458. REQUIRE(sup->req_val == 4);
  459. REQUIRE(sup->res_val == 5);
  460. CHECK(!sup->ee);
  461. sup->do_shutdown();
  462. sup->do_process();
  463. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  464. REQUIRE(sup->get_leader_queue().size() == 0);
  465. REQUIRE(sup->get_points().size() == 0);
  466. CHECK(rt::empty(sup->get_subscription()));
  467. REQUIRE(sup->get_children_count() == 0);
  468. REQUIRE(sup->get_requests().size() == 0);
  469. REQUIRE(sup->active_timers.size() == 0);
  470. }
  471. TEST_CASE("request-response successful delivery, ref-counted response", "[actor]") {
  472. r::system_context_t system_context;
  473. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  474. auto actor = sup->create_actor<good_actor2_t>().timeout(rt::default_timeout).finish();
  475. sup->do_process();
  476. REQUIRE(sup->active_timers.size() == 0);
  477. REQUIRE(actor->req_val == 4);
  478. REQUIRE(actor->res_val == 5);
  479. CHECK(!actor->ee);
  480. sup->do_shutdown();
  481. sup->do_process();
  482. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  483. REQUIRE(sup->get_leader_queue().size() == 0);
  484. REQUIRE(sup->get_points().size() == 0);
  485. CHECK(rt::empty(sup->get_subscription()));
  486. REQUIRE(sup->get_children_count() == 0);
  487. REQUIRE(sup->get_requests().size() == 0);
  488. REQUIRE(sup->active_timers.size() == 0);
  489. }
  490. TEST_CASE("request-response successful delivery, twice", "[actor]") {
  491. r::system_context_t system_context;
  492. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  493. auto actor = sup->create_actor<good_actor3_t>().timeout(rt::default_timeout).finish();
  494. sup->do_process();
  495. REQUIRE(sup->active_timers.size() == 0);
  496. REQUIRE(actor->req_val == 4 * 2);
  497. REQUIRE(actor->res_val == 5 * 2);
  498. CHECK(!actor->ee);
  499. sup->do_shutdown();
  500. sup->do_process();
  501. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  502. REQUIRE(sup->get_leader_queue().size() == 0);
  503. REQUIRE(sup->get_points().size() == 0);
  504. CHECK(rt::empty(sup->get_subscription()));
  505. REQUIRE(sup->get_children_count() == 0);
  506. REQUIRE(sup->get_requests().size() == 0);
  507. REQUIRE(sup->active_timers.size() == 0);
  508. }
  509. TEST_CASE("response is sent twice, but received once", "[supervisor]") {
  510. r::system_context_t system_context;
  511. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  512. auto actor = sup->create_actor<duplicating_actor_t>().timeout(rt::default_timeout).finish();
  513. sup->do_process();
  514. REQUIRE(sup->active_timers.size() == 0);
  515. REQUIRE(actor->req_val == 4);
  516. REQUIRE(actor->res_val == 5);
  517. CHECK(!actor->ee);
  518. sup->do_shutdown();
  519. sup->do_process();
  520. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  521. REQUIRE(sup->get_leader_queue().size() == 0);
  522. REQUIRE(sup->get_points().size() == 0);
  523. CHECK(rt::empty(sup->get_subscription()));
  524. REQUIRE(sup->get_children_count() == 0);
  525. REQUIRE(sup->get_requests().size() == 0);
  526. REQUIRE(sup->active_timers.size() == 0);
  527. }
  528. TEST_CASE("ref-counted response forwarding", "[actor]") {
  529. r::system_context_t system_context;
  530. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  531. auto actor = sup->create_actor<request_forwarder_t>().timeout(rt::default_timeout).finish();
  532. sup->do_process();
  533. REQUIRE(sup->active_timers.size() == 0);
  534. REQUIRE(actor->req_val == 4 + 4 * 2);
  535. REQUIRE(actor->res_val == 5 + 5 * 2);
  536. REQUIRE(actor->back_req1_id == actor->back_req2_id);
  537. sup->do_shutdown();
  538. sup->do_process();
  539. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  540. REQUIRE(sup->get_leader_queue().size() == 0);
  541. REQUIRE(sup->get_points().size() == 0);
  542. CHECK(rt::empty(sup->get_subscription()));
  543. REQUIRE(sup->get_children_count() == 0);
  544. REQUIRE(sup->get_requests().size() == 0);
  545. REQUIRE(sup->active_timers.size() == 0);
  546. }
  547. TEST_CASE("intrusive pointer request/response", "[actor]") {
  548. r::system_context_t system_context;
  549. auto sup = system_context.create_supervisor<good_supervisor_t>().timeout(rt::default_timeout).finish();
  550. auto actor = sup->create_actor<intrusive_actor_t>().timeout(rt::default_timeout).finish();
  551. sup->do_process();
  552. REQUIRE(sup->active_timers.size() == 0);
  553. REQUIRE(actor->req_val == 4 + 4 * 2);
  554. REQUIRE(actor->res_val == 5 + 5 * 2);
  555. sup->do_shutdown();
  556. sup->do_process();
  557. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  558. REQUIRE(sup->get_leader_queue().size() == 0);
  559. REQUIRE(sup->get_points().size() == 0);
  560. CHECK(rt::empty(sup->get_subscription()));
  561. REQUIRE(sup->get_children_count() == 0);
  562. REQUIRE(sup->get_requests().size() == 0);
  563. REQUIRE(sup->active_timers.size() == 0);
  564. }
  565. TEST_CASE("response arrives after requestee shutdown", "[actor]") {
  566. r::system_context_t system_context;
  567. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  568. auto req = sup->create_actor<req_actor_t>().timeout(rt::default_timeout).finish();
  569. auto res = sup->create_actor<res_actor_t>().timeout(rt::default_timeout).finish();
  570. sup->do_process();
  571. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  572. req->target = res->get_address();
  573. req->do_request();
  574. sup->do_process();
  575. REQUIRE(!req->res);
  576. REQUIRE(res->req);
  577. req->do_shutdown();
  578. sup->do_process();
  579. REQUIRE(req->get_state() == r::state_t::SHUT_DOWN);
  580. res->reply_to(*res->req, 5);
  581. sup->do_process();
  582. REQUIRE(!req->res);
  583. sup->do_shutdown();
  584. sup->do_process();
  585. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  586. }
  587. TEST_CASE("response arrives after requestee shutdown (on the same localities)", "[actor]") {
  588. r::system_context_t system_context;
  589. auto sup1 = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  590. auto sup2 = sup1->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  591. auto req = sup2->create_actor<req_actor_t>().timeout(rt::default_timeout).finish();
  592. auto res = sup1->create_actor<res_actor_t>().timeout(rt::default_timeout).finish();
  593. sup1->do_process();
  594. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  595. req->target = res->get_address();
  596. req->do_request();
  597. sup1->do_process();
  598. REQUIRE(!req->res);
  599. REQUIRE(res->req);
  600. req->do_shutdown();
  601. sup1->do_process();
  602. REQUIRE(req->get_state() == r::state_t::SHUT_DOWN);
  603. res->reply_to(*res->req, 5);
  604. sup1->do_process();
  605. REQUIRE(!req->res);
  606. sup1->do_shutdown();
  607. sup1->do_process();
  608. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  609. }
  610. TEST_CASE("request timer should not outlive requestee", "[actor]") {
  611. r::system_context_t system_context;
  612. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  613. auto req = sup->create_actor<req_actor_t>().timeout(rt::default_timeout).finish();
  614. sup->do_process();
  615. auto act = sup->create_actor<r::actor_base_t>().timeout(rt::default_timeout).finish();
  616. sup->do_process();
  617. act->request<request_sample_t>(sup->get_address(), 5).send(rt::default_timeout);
  618. CHECK(!act->access<rt::to::active_requests>().empty());
  619. act->do_shutdown();
  620. sup->do_process();
  621. CHECK(act->access<rt::to::active_requests>().empty());
  622. sup->do_shutdown();
  623. sup->do_process();
  624. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  625. }
  626. TEST_CASE("response and regular messages keep send order", "[actor]") {
  627. r::system_context_t system_context;
  628. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  629. auto act = sup->create_actor<order_actor_t>().timeout(rt::default_timeout).finish();
  630. sup->do_process();
  631. CHECK(act->order == 53);
  632. sup->do_shutdown();
  633. sup->do_process();
  634. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  635. }