ping-pong-timer.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "rotor.hpp"
  7. #include "rotor/asio.hpp"
  8. #include <boost/asio.hpp>
  9. #include <boost/lexical_cast.hpp>
  10. #include <chrono>
  11. #include <functional>
  12. #include <iomanip>
  13. #include <iostream>
  14. #include <memory>
  15. #include <random>
  16. #include <type_traits>
  17. #include <utility>
  18. #include <optional>
  19. #include <unordered_map>
  20. #include <thread>
  21. namespace asio = boost::asio;
  22. namespace pt = boost::posix_time;
  23. namespace ra = rotor::asio;
  24. namespace constants {
  25. static float failure_probability = 0.70f;
  26. static pt::time_duration ping_timeout = pt::milliseconds{100};
  27. static pt::time_duration ping_reply_base = pt::milliseconds{50};
  28. static pt::time_duration check_interval = pt::milliseconds{3000};
  29. static std::uint32_t ping_reply_scale = 70;
  30. } // namespace constants
  31. namespace resource {
  32. rotor::plugin::resource_id_t timer = 0;
  33. rotor::plugin::resource_id_t ping = 1;
  34. } // namespace resource
  35. namespace payload {
  36. struct pong_t {};
  37. struct ping_t {
  38. using response_t = pong_t;
  39. };
  40. } // namespace payload
  41. namespace message {
  42. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  43. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  44. using cancel_t = rotor::request_traits_t<payload::ping_t>::cancel::message_t;
  45. } // namespace message
  46. struct pinger_t : public rotor::actor_base_t {
  47. using rotor::actor_base_t::actor_base_t;
  48. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  49. rotor::actor_base_t::configure(plugin);
  50. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_pong); });
  51. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  52. [&](auto &p) { p.discover_name("ponger", ponger_addr, true).link(); });
  53. }
  54. void on_start() noexcept override {
  55. rotor::actor_base_t::on_start();
  56. do_ping();
  57. timer_id =
  58. start_timer(constants::check_interval, *this, [](pinger_t *pinger, rotor::request_id_t, bool cancelled) {
  59. pinger->resources->release(resource::timer);
  60. pinger->timer_id.reset();
  61. std::cout << "pinger_t, (" << (void *)pinger << "), on_custom_timeout, cancelled: " << cancelled
  62. << "\n";
  63. if (!cancelled) {
  64. pinger->do_shutdown();
  65. }
  66. });
  67. resources->acquire(resource::timer);
  68. }
  69. void do_ping() noexcept {
  70. resources->acquire(resource::ping);
  71. request_id = request<payload::ping_t>(ponger_addr).send(constants::ping_timeout);
  72. ++attempts;
  73. }
  74. void on_custom_timeout(rotor::request_id_t, bool) {}
  75. void shutdown_start() noexcept override {
  76. std::cout << "pinger_t, (" << (void *)this << ") shutdown_start() \n";
  77. if (request_id)
  78. send<message::cancel_t>(ponger_addr, get_address());
  79. if (timer_id) {
  80. cancel_timer(*timer_id);
  81. timer_id.reset();
  82. }
  83. rotor::actor_base_t::shutdown_start();
  84. }
  85. void shutdown_finish() noexcept override {
  86. std::cout << "pinger_t, (" << (void *)this << ") finished attempts done " << attempts << "\n";
  87. rotor::actor_base_t::shutdown_finish();
  88. }
  89. void on_pong(message::pong_t &msg) noexcept {
  90. resources->release(resource::ping);
  91. request_id.reset();
  92. auto &ec = msg.payload.ee;
  93. if (!ec) {
  94. std::cout << "pinger_t, (" << (void *)this << ") success!, pong received, attempts : " << attempts << "\n";
  95. do_shutdown();
  96. } else {
  97. std::cout << "pinger_t, (" << (void *)this << ") pong failed (" << attempts << ")\n";
  98. if (timer_id) {
  99. do_ping();
  100. }
  101. }
  102. }
  103. std::optional<rotor::request_id_t> timer_id;
  104. std::optional<rotor::request_id_t> request_id;
  105. std::uint32_t attempts = 0;
  106. rotor::address_ptr_t ponger_addr;
  107. };
  108. struct ponger_t : public rotor::actor_base_t {
  109. using generator_t = std::mt19937;
  110. using distribution_t = std::uniform_real_distribution<double>;
  111. using message_ptr_t = rotor::intrusive_ptr_t<message::ping_t>;
  112. using requests_map_t = std::unordered_map<rotor::request_id_t, message_ptr_t>;
  113. std::random_device rd;
  114. generator_t gen;
  115. distribution_t dist;
  116. requests_map_t requests;
  117. explicit ponger_t(config_t &cfg) : rotor::actor_base_t(cfg), gen(rd()) {}
  118. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  119. rotor::actor_base_t::configure(plugin);
  120. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  121. p.subscribe_actor(&ponger_t::on_ping);
  122. p.subscribe_actor(&ponger_t::on_cancel);
  123. });
  124. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  125. [&](auto &p) { p.register_name("ponger", get_address()); });
  126. }
  127. void on_ping(message::ping_t &req) noexcept {
  128. if (state != rotor::state_t::OPERATIONAL) {
  129. auto ec = rotor::make_error_code(rotor::error_code_t::cancelled);
  130. reply_with_error(req, make_error(ec));
  131. return;
  132. }
  133. auto dice = constants::ping_reply_scale * dist(gen);
  134. pt::time_duration reply_after = constants::ping_reply_base + pt::millisec{(int)dice};
  135. auto timer_id = start_timer(reply_after, *this, &ponger_t::on_ping_timer);
  136. resources->acquire(resource::timer);
  137. requests.emplace(timer_id, message_ptr_t(&req));
  138. }
  139. void on_cancel(message::cancel_t &notify) noexcept {
  140. auto request_id = notify.payload.id;
  141. auto &source = notify.payload.source;
  142. std::cout << "cancellation notify\n";
  143. auto predicate = [&](auto &it) {
  144. return it.second->payload.id == request_id && it.second->payload.origin == source;
  145. };
  146. auto it = std::find_if(requests.begin(), requests.end(), predicate);
  147. if (it != requests.end()) {
  148. cancel_timer(it->first);
  149. }
  150. }
  151. void on_ping_timer(rotor::request_id_t timer_id, bool cancelled) noexcept {
  152. resources->release(resource::timer);
  153. auto it = requests.find(timer_id);
  154. if (!cancelled) {
  155. auto dice = dist(gen);
  156. if (dice > constants::failure_probability) {
  157. auto &msg = it->second;
  158. reply_to(*msg);
  159. }
  160. } else {
  161. auto ec = rotor::make_error_code(rotor::error_code_t::cancelled);
  162. reply_with_error(*it->second, make_error(ec));
  163. }
  164. requests.erase(it);
  165. }
  166. void shutdown_start() noexcept override {
  167. while (!requests.empty()) {
  168. auto &timer_id = requests.begin()->first;
  169. cancel_timer(timer_id);
  170. }
  171. rotor::actor_base_t::shutdown_start();
  172. }
  173. void shutdown_finish() noexcept override {
  174. std::cout << "ponger_t, shutdown_finish\n";
  175. rotor::actor_base_t::shutdown_finish();
  176. }
  177. };
  178. struct custom_supervisor_t : ra::supervisor_asio_t {
  179. using ra::supervisor_asio_t::supervisor_asio_t;
  180. void on_child_shutdown(actor_base_t *) noexcept override {
  181. if (state < rotor::state_t::SHUTTING_DOWN) {
  182. do_shutdown();
  183. }
  184. }
  185. void shutdown_finish() noexcept override {
  186. ra::supervisor_asio_t::shutdown_finish();
  187. strand->context().stop();
  188. }
  189. };
  190. std::atomic_bool shutdown_flag = false;
  191. int main() {
  192. asio::io_context io_context;
  193. auto system_context = rotor::asio::system_context_asio_t::ptr_t{new rotor::asio::system_context_asio_t(io_context)};
  194. auto strand = std::make_shared<asio::io_context::strand>(io_context);
  195. auto timeout = pt::milliseconds{50};
  196. auto sup = system_context->create_supervisor<custom_supervisor_t>()
  197. .strand(strand)
  198. .create_registry()
  199. .timeout(timeout)
  200. .guard_context(false)
  201. .finish();
  202. // sup->create_actor<pinger_t>().timeout(timeout).finish();
  203. sup->create_actor<pinger_t>().timeout(timeout).finish();
  204. sup->create_actor<ponger_t>().timeout(timeout).finish();
  205. sup->start();
  206. #ifndef _WIN32
  207. struct sigaction act;
  208. act.sa_handler = [](int) { shutdown_flag = true; };
  209. if (sigaction(SIGINT, &act, nullptr) != 0) {
  210. std::cout << "critical :: cannot set signal handler\n";
  211. return -1;
  212. }
  213. auto console_thread = std::thread([&] {
  214. while (!shutdown_flag) {
  215. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  216. }
  217. sup->shutdown();
  218. });
  219. #endif
  220. io_context.run();
  221. #ifndef _WIN32
  222. shutdown_flag = true;
  223. console_thread.join();
  224. #endif
  225. return 0;
  226. }
  227. /* output samples:
  228. (all ping failed)
  229. ./examples/boost-asio/ping-pong-timer
  230. pinger_t, (0x556d13bbd8a0) pong failed (1)
  231. pinger_t, (0x556d13bbd8a0) pong failed (2)
  232. pinger_t, (0x556d13bbd8a0) pong failed (3)
  233. pinger_t, (0x556d13bbd8a0) pong failed (4)
  234. pinger_t, (0x556d13bbd8a0) pong failed (5)
  235. pinger_t, (0x556d13bbd8a0) pong failed (6)
  236. pinger_t, (0x556d13bbd8a0) pong failed (7)
  237. pinger_t, (0x556d13bbd8a0) pong failed (8)
  238. pinger_t, (0x556d13bbd8a0) pong failed (9)
  239. pinger_t, (0x556d13bbd8a0) pong failed (10)
  240. pinger_t, (0x556d13bbd8a0) pong failed (11)
  241. pinger_t, (0x556d13bbd8a0) pong failed (12)
  242. pinger_t, (0x556d13bbd8a0) pong failed (13)
  243. pinger_t, (0x556d13bbd8a0) pong failed (14)
  244. pinger_t, (0x556d13bbd8a0) pong failed (15)
  245. pinger_t, (0x556d13bbd8a0) pong failed (16)
  246. pinger_t, (0x556d13bbd8a0) pong failed (17)
  247. pinger_t, (0x556d13bbd8a0) pong failed (18)
  248. pinger_t, (0x556d13bbd8a0) pong failed (19)
  249. pinger_t, (0x556d13bbd8a0) pong failed (20)
  250. pinger_t, (0x556d13bbd8a0) pong failed (21)
  251. pinger_t, (0x556d13bbd8a0) pong failed (22)
  252. pinger_t, (0x556d13bbd8a0) pong failed (23)
  253. pinger_t, (0x556d13bbd8a0) pong failed (24)
  254. pinger_t, (0x556d13bbd8a0) pong failed (25)
  255. pinger_t, (0x556d13bbd8a0) pong failed (26)
  256. pinger_t, (0x556d13bbd8a0) pong failed (27)
  257. pinger_t, (0x556d13bbd8a0) pong failed (28)
  258. pinger_t, (0x556d13bbd8a0) pong failed (29)
  259. pinger_t, (0x556d13bbd8a0), on_custom_timeout, cancelled: 0
  260. pinger_t, (0x556d13bbd8a0) shutdown_start()
  261. pinger_t, (0x556d13bbd8a0) pong failed (30)
  262. pinger_t, (0x556d13bbd8a0) finished attempts done 30
  263. ponger_t, shutdown_finish
  264. (11-th ping was successful)
  265. ./examples/boost-asio/ping-pong-timer
  266. pinger_t, (0x55f9f90048a0) pong failed (1)
  267. pinger_t, (0x55f9f90048a0) pong failed (2)
  268. pinger_t, (0x55f9f90048a0) pong failed (3)
  269. pinger_t, (0x55f9f90048a0) pong failed (4)
  270. pinger_t, (0x55f9f90048a0) pong failed (5)
  271. pinger_t, (0x55f9f90048a0) pong failed (6)
  272. pinger_t, (0x55f9f90048a0) pong failed (7)
  273. pinger_t, (0x55f9f90048a0) pong failed (8)
  274. pinger_t, (0x55f9f90048a0) pong failed (9)
  275. pinger_t, (0x55f9f90048a0) pong failed (10)
  276. pinger_t, (0x55f9f90048a0) success!, pong received, attempts : 11
  277. pinger_t, (0x55f9f90048a0) shutdown_start()
  278. pinger_t, (0x55f9f90048a0), on_custom_timeout, cancelled: 1
  279. pinger_t, (0x55f9f90048a0) finished attempts done 11
  280. ponger_t, shutdown_finish
  281. (premature termination via CTRL+C pressing)
  282. ./examples/boost-asio/ping-pong-timer
  283. pinger_t, (0x55d5d95d98a0) pong failed (1)
  284. pinger_t, (0x55d5d95d98a0) pong failed (2)
  285. pinger_t, (0x55d5d95d98a0) pong failed (3)
  286. pinger_t, (0x55d5d95d98a0) pong failed (4)
  287. pinger_t, (0x55d5d95d98a0) pong failed (5)
  288. pinger_t, (0x55d5d95d98a0) pong failed (6)
  289. ^Cpinger_t, (0x55d5d95d98a0) shutdown_start()
  290. pinger_t, (0x55d5d95d98a0), on_custom_timeout, cancelled: 1
  291. pinger_t, (0x55d5d95d98a0) pong failed (7)
  292. pinger_t, (0x55d5d95d98a0) finished attempts done 7
  293. ponger_t, shutdown_finish
  294. */