ping-pong-thread.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. //
  2. // Copyright (c) 2021-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is an example how to do simple ping-pong using different threads
  8. *
  9. */
  10. #include "rotor.hpp"
  11. #include "rotor/thread.hpp"
  12. #include <boost/lexical_cast.hpp>
  13. #include <chrono>
  14. #include <functional>
  15. #include <iomanip>
  16. #include <iostream>
  17. #include <memory>
  18. #include <type_traits>
  19. #include <utility>
  20. #include <vector>
  21. namespace pt = boost::posix_time;
  22. namespace r = rotor;
  23. namespace rth = rotor::thread;
  24. namespace payload {
  25. struct ping_t {};
  26. struct pong_t {};
  27. using announce_t = r::address_ptr_t;
  28. } // namespace payload
  29. namespace message {
  30. using ping_t = r::message_t<payload::ping_t>;
  31. using pong_t = r::message_t<payload::pong_t>;
  32. using announce_t = r::message_t<payload::announce_t>;
  33. } // namespace message
  34. struct pinger_t : public r::actor_base_t {
  35. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  36. using r::actor_base_t::actor_base_t;
  37. void set_pings(std::size_t pings) { pings_left = pings_count = pings; }
  38. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  39. r::actor_base_t::configure(plugin);
  40. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  41. std::cout << "pinger_t::configure(), subscribing to on_pong\n";
  42. p.subscribe_actor(&pinger_t::on_pong);
  43. });
  44. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  45. [&](auto &p) { p.discover_name("ponger", ponger_addr, true).link(true); });
  46. }
  47. void on_start() noexcept override {
  48. r::actor_base_t::on_start();
  49. send<payload::announce_t>(ponger_addr, get_address());
  50. std::cout << "pings start (" << pings_left << ")\n";
  51. start = std::chrono::high_resolution_clock::now();
  52. send_ping();
  53. }
  54. void on_pong(message::pong_t &) noexcept {
  55. // std::cout << "pinger_t::on_pong\n";
  56. send_ping();
  57. }
  58. private:
  59. void send_ping() {
  60. if (pings_left) {
  61. send<payload::ping_t>(ponger_addr);
  62. --pings_left;
  63. } else {
  64. using namespace std::chrono;
  65. auto end = high_resolution_clock::now();
  66. std::chrono::duration<double> diff = end - start;
  67. double freq = ((double)pings_count) / diff.count();
  68. std::cout << "pings finishes (" << pings_left << ") in " << diff.count() << "s"
  69. << ", freq = " << std::fixed << std::setprecision(10) << freq << ", real freq = " << std::fixed
  70. << std::setprecision(10) << freq * 2 << "\n";
  71. do_shutdown();
  72. }
  73. }
  74. timepoint_t start;
  75. r::address_ptr_t ponger_addr;
  76. std::size_t pings_left;
  77. std::size_t pings_count;
  78. };
  79. struct ponger_t : public r::actor_base_t {
  80. using r::actor_base_t::actor_base_t;
  81. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  82. r::actor_base_t::configure(plugin);
  83. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  84. std::cout << "pinger_t::configure, subscribing on_ping\n";
  85. p.subscribe_actor(&ponger_t::on_announce);
  86. p.subscribe_actor(&ponger_t::on_ping);
  87. });
  88. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  89. [&](auto &p) { p.register_name("ponger", get_address()); });
  90. }
  91. void on_announce(message::announce_t &message) noexcept { pinger_addr = message.payload; }
  92. void on_ping(message::ping_t &) noexcept { send<payload::pong_t>(pinger_addr); }
  93. private:
  94. r::address_ptr_t pinger_addr;
  95. };
  96. int main(int argc, char **argv) {
  97. try {
  98. std::uint32_t count = 10000;
  99. if (argc > 1) {
  100. boost::conversion::try_lexical_convert(argv[1], count);
  101. }
  102. rth::system_context_thread_t ctx_ping;
  103. rth::system_context_thread_t ctx_pong;
  104. auto timeout = boost::posix_time::milliseconds{100};
  105. auto sup_ping =
  106. ctx_ping.create_supervisor<rth::supervisor_thread_t>().timeout(timeout).create_registry().finish();
  107. auto pinger = sup_ping->create_actor<pinger_t>().autoshutdown_supervisor().timeout(timeout).finish();
  108. pinger->set_pings(count);
  109. auto sup_pong = ctx_pong.create_supervisor<rth::supervisor_thread_t>()
  110. .timeout(timeout)
  111. .registry_address(sup_ping->get_registry_address())
  112. .finish();
  113. auto ponger = sup_pong->create_actor<ponger_t>().timeout(timeout).finish();
  114. sup_ping->start();
  115. sup_pong->start();
  116. auto pong_thread = std::thread([&] { ctx_pong.run(); });
  117. ctx_ping.run();
  118. sup_pong->shutdown();
  119. pong_thread.join();
  120. } catch (const std::exception &ex) {
  121. std::cout << "exception : " << ex.what();
  122. }
  123. std::cout << "exiting...\n";
  124. return 0;
  125. }