ping-pong-spawner.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. //
  2. // Copyright (c) 2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is an example how to use spawner for simple ping-pong cases.
  8. * The idea is simple: every time it pinger fails to receive pong, shuts self down
  9. * and it's spawner spawns new pinger instance and so on until successful
  10. * pong reply
  11. *
  12. */
  13. #include "rotor.hpp"
  14. #include "rotor/thread.hpp"
  15. #include <random>
  16. #include <iostream>
  17. namespace r = rotor;
  18. namespace rth = rotor::thread;
  19. namespace pt = boost::posix_time;
  20. namespace payload {
  21. struct pong_t {};
  22. struct ping_t {
  23. using response_t = pong_t;
  24. };
  25. } // namespace payload
  26. namespace message {
  27. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  28. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  29. } // namespace message
  30. struct pinger_t : public rotor::actor_base_t {
  31. using rotor::actor_base_t::actor_base_t;
  32. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  33. rotor::actor_base_t::configure(plugin);
  34. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
  35. static int generation = 0;
  36. std::string id = "pinger #";
  37. id += std::to_string(++generation);
  38. p.set_identity(id, true);
  39. });
  40. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_pong); });
  41. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  42. [&](auto &p) { p.discover_name("ponger", ponger, true).link(); });
  43. }
  44. void on_start() noexcept override {
  45. rotor::actor_base_t::on_start();
  46. request<payload::ping_t>(ponger).send(init_timeout);
  47. std::cout << "ping (" << identity << ")\n";
  48. }
  49. void on_pong(message::pong_t &reply) noexcept {
  50. auto &ee = reply.payload.ee;
  51. // fail branch, handled by spawner
  52. if (ee) {
  53. std::cout << "err: " << ee->message() << "\n";
  54. return do_shutdown(ee);
  55. }
  56. std::cout << "pong received\n";
  57. // succesfull branch: manually shutdown supervisor
  58. supervisor->do_shutdown();
  59. }
  60. rotor::address_ptr_t ponger;
  61. };
  62. struct ponger_t : public rotor::actor_base_t {
  63. using rotor::actor_base_t::actor_base_t;
  64. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  65. rotor::actor_base_t::configure(plugin);
  66. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("ponger", true); });
  67. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
  68. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  69. [&](auto &p) { p.register_name("ponger", get_address()); });
  70. }
  71. void on_ping(message::ping_t &request) noexcept {
  72. using generator_t = std::mt19937;
  73. using distribution_t = std::uniform_real_distribution<double>;
  74. std::random_device rd;
  75. generator_t gen(rd());
  76. distribution_t dist;
  77. auto dice = dist(gen);
  78. auto failure_probability = 0.925;
  79. bool ok = dice > failure_probability;
  80. std::cout << "pong, dice = " << dice << ", passes threshold : " << (ok ? "yes" : "no") << "\n";
  81. if (ok) {
  82. reply_to(request);
  83. } else {
  84. auto ec = r::make_error_code(r::error_code_t::request_timeout);
  85. auto ee = make_error(ec);
  86. reply_with_error(request, ee);
  87. }
  88. }
  89. };
  90. int main(int, char **) {
  91. rth::system_context_thread_t ctx;
  92. auto timeout = pt::milliseconds{20};
  93. auto sup = ctx.create_supervisor<rth::supervisor_thread_t>().timeout(timeout).create_registry().finish();
  94. sup->create_actor<ponger_t>().timeout(timeout).finish();
  95. auto pinger_factory = [&](r::supervisor_t &sup, const r::address_ptr_t &spawner) -> r::actor_ptr_t {
  96. return sup.create_actor<pinger_t>().timeout(timeout).spawner_address(spawner).finish();
  97. };
  98. sup->spawn(pinger_factory)
  99. .max_attempts(15) /* don't do that endlessly */
  100. .restart_period(timeout)
  101. .restart_policy(r::restart_policy_t::fail_only) /* case: respawn on single ping fail */
  102. .escalate_failure() /* case: when all pings fail */
  103. .spawn();
  104. ctx.run();
  105. std::cout << "shutdown reason: " << sup->get_shutdown_reason()->message() << "\n";
  106. return 0;
  107. }
  108. /*
  109. sample output
  110. ping (pinger #1 0x55d3b09ab130)
  111. pong, dice = 0.000809254, passes threshold : no
  112. err: ponger 0x55d3b09abe80 request timeout
  113. ping (pinger #2 0x55d3b09af090)
  114. pong, dice = 0.446941, passes threshold : no
  115. err: ponger 0x55d3b09abe80 request timeout
  116. ping (pinger #3 0x55d3b09ae670)
  117. pong, dice = 0.809191, passes threshold : no
  118. err: ponger 0x55d3b09abe80 request timeout
  119. ping (pinger #4 0x55d3b09adfd0)
  120. pong, dice = 0.955792, passes threshold : yes
  121. pong received
  122. shutdown reason: supervisor 0x55d3b09a4350 normal shutdown
  123. */