beast-scrapper.cpp 34 KB


  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. The current example in some extent mimics curl usage example in CAF
  8. ( https://github.com/actor-framework/actor-framework/blob/master/examples/curl/curl_fuse.cpp ),
  9. but instead of CAF + curl pair, the rotor + beast pair is used.
  10. There is a single client (can be multiple), there is a single http-manager (supervisor),
  11. which holds a pool of multiple http-workers and a caching resolver-actor.
  12. client -> request_1 -> http-manager -> request_1 -> http-worker_1
  13. request_2 -> -> request_1 -> http-worker_2
  14. ... ... ...
  15. request_n -> -> request_1 -> http-worker_n
  16. 1. The client makes an request, which contains the URL of the remote resource, and the buffer
  17. where the parsed reply is put.
  18. 2. The reply is intrusive pointer to the real http-response, i.e. to avoid unnecessary copying.
  19. 3. The client makes as many requests as it needs, and the manager just for forwards them
  20. to free http-workers. The http-manager spawns workers on demand, i.e. it will be naturally
  21. coordinated with the load concurrency. If it is not desirable, it can be impoved:
  22. 3.1. If MAX_FAILURES != 1, then it might be the case when timeout timer just has triggered
  23. on the client, but corresponding timer wasn't triggered on the manager. In the current code
  24. there is just an assert, but the situation where client asks more then http-manager is
  25. capable to serve can be handled, i.e. via queueing.
  26. 3.2. If there is no need of spawning a lot of http workers, the http-manager can queue requests.
  27. However, some *back-pressure* mechanisms should be imposed into http-manager to prevent the
  28. queue to grow infinitely.
  29. 3.3. The http-requests are stateless (i.e. no http/1.1). This cannot be improved whitout
  30. internal protocol change, as the socket should not be closed, the same http-client
  31. should continue serving the requests on the same host from the client etc.
  32. 3.4. The http-requests do not contain headers/cookies etc. It can be improved via additional
  33. fields in the http-requests.
  34. 3.5. Since v0.11 there are cancellation facilities. You can press ctrl+c on console,
  35. and everything will be cleanly shutted down, no leaks etc.
  36. 3.6. Currenlty only single thread is used. However, it is not a problem to use all threads,
  37. just the actors architecture should be redesigend a litte bit, to let them be launched
  38. on different threads.
  39. 4. There are 2 timers per request: the 1st one validates manager's responsibilities
  40. on client, the 2nd one validates worker's responsibilities on manager.
  41. Technically, there could be only one timer per request, however, the main question
  42. is how reliable do you treat your actors. If they are unreliable (i.e. might have
  43. bugs etc.), then there should be 2 timers. On the later stage of the development,
  44. it might be switched to one timer per request if reliability has been proven
  45. and it is desirable to get rid of additional timer from performance point of view.
  46. 5. There should be no crashes, no memory leaks, including on emergency shutdown (ctrl+c)
  47. The output sample for the localhost is:
  48. ./beast-scrapper --workers_count=50 --timeout=5000 --max_requests=50000 --url=http://127.0.0.1:80/index.html
  49. using 50 workers for 127.0.0.1:80/index.html, timeout: 00:00:05
  50. starting shutdown
  51. client_t::shutdown_start
  52. client_t::shutdown_finish, stats: 50000/50000 requests, in 2.96149s, rps = 16883.4
  53. client_t::shutdown_finish
  54. http_manager_t::shutdown_finish()
  55. */
  56. #include "rotor.hpp"
  57. #include "rotor/asio.hpp"
  58. #include <iostream>
  59. #include <chrono>
  60. #include <regex>
  61. #include <unordered_set>
  62. #include <unordered_map>
  63. #include <deque>
  64. #include <memory>
  65. #include <optional>
  66. #include <boost/program_options.hpp>
  67. #include <boost/beast/core.hpp>
  68. #include <boost/beast/http.hpp>
  69. #include <boost/beast/version.hpp>
  70. #include <boost/lexical_cast.hpp>
  71. namespace po = boost::program_options;
  72. namespace http = boost::beast::http;
  73. namespace sys = boost::system;
  74. namespace asio = boost::asio;
  75. namespace pt = boost::posix_time;
  76. namespace ra = rotor::asio;
  77. namespace r = rotor;
  78. using tcp = asio::ip::tcp;
  79. struct URL {
  80. std::string host;
  81. std::string port;
  82. std::string path;
  83. };
  84. using raw_http_response_t = http::response<http::string_body>;
  85. constexpr const std::uint32_t RX_BUFF_SZ = 10 * 1024;
  86. constexpr const std::uint32_t MAX_HTTP_FAILURES = 1;
  87. struct endpoint_t {
  88. std::string host;
  89. std::string port;
  90. inline bool operator==(const endpoint_t &other) const noexcept { return host == other.host && port == other.port; }
  91. };
  92. namespace std {
  93. template <> struct hash<endpoint_t> {
  94. inline size_t operator()(const endpoint_t &endpoint) const noexcept {
  95. auto h1 = std::hash<std::string>()(endpoint.host);
  96. auto h2 = std::hash<std::string>()(endpoint.port);
  97. return h1 ^ (h2 << 4);
  98. }
  99. };
  100. } // namespace std
  101. namespace payload {
  102. struct http_response_t : public r::arc_base_t<http_response_t> {
  103. raw_http_response_t response;
  104. std::size_t bytes;
  105. http_response_t(raw_http_response_t &&response_, std::size_t bytes_)
  106. : response(std::move(response_)), bytes{bytes_} {}
  107. };
  108. struct http_request_t : public r::arc_base_t<http_request_t> {
  109. using rx_buff_t = boost::beast::flat_buffer;
  110. using rx_buff_ptr_t = std::shared_ptr<rx_buff_t>;
  111. using duration_t = r::pt::time_duration;
  112. using response_t = r::intrusive_ptr_t<http_response_t>;
  113. http_request_t(const URL &url_, rx_buff_ptr_t rx_buff_, std::size_t rx_buff_size_, duration_t timeout_)
  114. : url{url_}, rx_buff{rx_buff_}, rx_buff_size{rx_buff_size_}, timeout{timeout_} {}
  115. URL url;
  116. rx_buff_ptr_t rx_buff;
  117. std::size_t rx_buff_size;
  118. duration_t timeout;
  119. };
  120. struct address_response_t : public r::arc_base_t<address_response_t> {
  121. using resolve_results_t = tcp::resolver::results_type;
  122. explicit address_response_t(resolve_results_t results_) : results{results_} {};
  123. resolve_results_t results;
  124. };
  125. struct address_request_t : public r::arc_base_t<address_request_t> {
  126. using response_t = r::intrusive_ptr_t<address_response_t>;
  127. endpoint_t endpoint;
  128. explicit address_request_t(const endpoint_t &endpoint_) : endpoint{endpoint_} {}
  129. };
  130. } // namespace payload
  131. namespace message {
  132. using http_request_t = r::request_traits_t<payload::http_request_t>::request::message_t;
  133. using http_response_t = r::request_traits_t<payload::http_request_t>::response::message_t;
  134. using http_cancel_t = r::request_traits_t<payload::http_request_t>::cancel::message_t;
  135. using resolve_request_t = r::request_traits_t<payload::address_request_t>::request::message_t;
  136. using resolve_response_t = r::request_traits_t<payload::address_request_t>::response::message_t;
  137. using resolve_cancel_t = r::request_traits_t<payload::address_request_t>::cancel::message_t;
  138. } // namespace message
  139. namespace resource {
  140. static const constexpr r::plugin::resource_id_t timer = 0;
  141. static const constexpr r::plugin::resource_id_t io = 1;
  142. static const constexpr r::plugin::resource_id_t resolve = 2;
  143. static const constexpr r::plugin::resource_id_t request = 3;
  144. } // namespace resource
  145. namespace service {
  146. static const char *resolver = "service:resolver";
  147. static const char *manager = "service:manager";
  148. } // namespace service
  149. static_assert(r::details::is_constructible_v<payload::http_response_t, raw_http_response_t, std::size_t>, "valid");
  150. static_assert(std::is_constructible_v<payload::http_response_t, raw_http_response_t, std::size_t>, "valid");
  151. struct resolver_worker_config_t : public r::actor_config_t {
  152. r::pt::time_duration resolve_timeout;
  153. using r::actor_config_t::actor_config_t;
  154. };
  155. struct http_worker_config_t : public r::actor_config_t {
  156. r::pt::time_duration resolve_timeout;
  157. r::pt::time_duration request_timeout;
  158. using r::actor_config_t::actor_config_t;
  159. };
  160. template <typename Actor> struct resolver_worker_config_builder_t : r::actor_config_builder_t<Actor> {
  161. using builder_t = typename Actor::template config_builder_t<Actor>;
  162. using parent_t = r::actor_config_builder_t<Actor>;
  163. using parent_t::parent_t;
  164. builder_t &&resolve_timeout(const pt::time_duration &value) &&noexcept {
  165. parent_t::config.resolve_timeout = value;
  166. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  167. }
  168. };
  169. template <typename Actor> struct http_worker_config_builder_t : r::actor_config_builder_t<Actor> {
  170. using builder_t = typename Actor::template config_builder_t<Actor>;
  171. using parent_t = r::actor_config_builder_t<Actor>;
  172. using parent_t::parent_t;
  173. builder_t &&resolve_timeout(const pt::time_duration &value) &&noexcept {
  174. parent_t::config.resolve_timeout = value;
  175. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  176. }
  177. builder_t &&request_timeout(const pt::time_duration &value) &&noexcept {
  178. parent_t::config.request_timeout = value;
  179. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  180. }
  181. };
  182. struct resolver_worker_t : public r::actor_base_t {
  183. using r::actor_base_t::actor_base_t;
  184. using request_ptr_t = r::intrusive_ptr_t<message::resolve_request_t>;
  185. using resolve_results_t = payload::address_response_t::resolve_results_t;
  186. using Queue = std::list<request_ptr_t>;
  187. using Cache = std::unordered_map<endpoint_t, resolve_results_t>;
  188. using config_t = resolver_worker_config_t;
  189. template <typename Actor> using config_builder_t = resolver_worker_config_builder_t<Actor>;
  190. explicit resolver_worker_t(config_t &config)
  191. : r::actor_base_t{config}, io_timeout{config.resolve_timeout},
  192. strand{static_cast<ra::supervisor_asio_t *>(config.supervisor)->get_strand()}, backend{strand.context()} {}
  193. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  194. r::actor_base_t::configure(plugin);
  195. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  196. p.subscribe_actor(&resolver_worker_t::on_request);
  197. p.subscribe_actor(&resolver_worker_t::on_cancel);
  198. });
  199. plugin.with_casted<r::plugin::registry_plugin_t>(
  200. [&](auto &p) { p.register_name(service::resolver, get_address()); });
  201. }
  202. void on_request(message::resolve_request_t &req) noexcept {
  203. if (state == r::state_t::SHUTTING_DOWN) {
  204. auto ec = r::make_error_code(r::error_code_t::cancelled);
  205. reply_with_error(req, make_error(ec));
  206. return;
  207. }
  208. queue.emplace_back(&req);
  209. if (!resources->has_any())
  210. process();
  211. }
  212. void on_cancel(message::resolve_cancel_t &message) noexcept {
  213. auto &request_id = message.payload.id;
  214. auto &origin = message.payload.source;
  215. auto predicate = [&](request_ptr_t &req) -> bool {
  216. return req->payload.id == request_id && req->payload.origin == origin;
  217. };
  218. if (request && predicate(request)) {
  219. cancel_request_timer();
  220. if (resources->has(resource::io)) {
  221. backend.cancel();
  222. }
  223. } else {
  224. auto ec = r::make_error_code(r::error_code_t::cancelled);
  225. for (auto &it : queue) {
  226. if (predicate(it)) {
  227. reply_with_error(*it, make_error(ec));
  228. it.reset();
  229. }
  230. }
  231. }
  232. }
  233. template <typename ReplyFn> void reply(const endpoint_t &endpoint, ReplyFn &&fn) noexcept {
  234. if (request) {
  235. fn(*request);
  236. auto it = queue.begin();
  237. while (it != queue.end()) {
  238. auto &message_ptr = *it;
  239. if (message_ptr->payload.request_payload->endpoint == endpoint) {
  240. fn(*message_ptr);
  241. it = queue.erase(it);
  242. } else {
  243. ++it;
  244. }
  245. }
  246. request.reset();
  247. }
  248. }
  249. void mass_reply(const endpoint_t &endpoint, const resolve_results_t &results) noexcept {
  250. reply(endpoint, [&](auto &message) { reply_to(message, results); });
  251. }
  252. void mass_reply(const endpoint_t &endpoint, const r::extended_error_ptr_t &ee) noexcept {
  253. reply(endpoint, [&](auto &message) { reply_with_error(message, ee); });
  254. }
  255. void process() noexcept {
  256. if (resources->has_any())
  257. return;
  258. if (queue.empty())
  259. return;
  260. auto queue_it = queue.begin();
  261. auto endpoint = (*queue_it)->payload.request_payload->endpoint;
  262. auto cache_it = cache.find(endpoint);
  263. if (cache_it != cache.end()) {
  264. mass_reply(endpoint, cache_it->second);
  265. }
  266. while (!queue.empty() && !request) {
  267. request = queue.front();
  268. queue.pop_front();
  269. }
  270. if (!request) {
  271. return;
  272. }
  273. resolve_start();
  274. }
  275. void resolve_start() noexcept {
  276. auto &endpoint = request->payload.request_payload->endpoint;
  277. auto fwd_resolver =
  278. ra::forwarder_t(*this, &resolver_worker_t::on_resolve, &resolver_worker_t::on_resolve_error);
  279. backend.async_resolve(endpoint.host, endpoint.port, std::move(fwd_resolver));
  280. resources->acquire(resource::io);
  281. timer_request = start_timer(io_timeout, *this, &resolver_worker_t::on_timer);
  282. resources->acquire(resource::timer);
  283. }
  284. void on_resolve(resolve_results_t results) noexcept {
  285. resources->release(resource::io);
  286. if (request) {
  287. auto &endpoint = request->payload.request_payload->endpoint;
  288. auto pair = cache.emplace(endpoint, results);
  289. auto &it = pair.first;
  290. mass_reply(it->first, it->second);
  291. }
  292. cancel_request_timer();
  293. process();
  294. }
  295. void on_resolve_error(const sys::error_code &ec) noexcept {
  296. resources->release(resource::io);
  297. if (request) {
  298. auto endpoint = request->payload.request_payload->endpoint;
  299. mass_reply(endpoint, make_error(ec));
  300. }
  301. cancel_request_timer();
  302. process();
  303. }
  304. void on_timer(r::request_id_t, bool cancelled) noexcept {
  305. resources->release(resource::timer);
  306. if (request) {
  307. auto ec = r::make_error_code(cancelled ? r::error_code_t::cancelled : r::error_code_t::request_timeout);
  308. auto endpoint = request->payload.request_payload->endpoint;
  309. mass_reply(endpoint, make_error(ec));
  310. }
  311. process();
  312. }
  313. void cancel_request_timer() noexcept {
  314. if (timer_request) {
  315. cancel_timer(*timer_request);
  316. timer_request.reset();
  317. }
  318. }
  319. std::optional<r::request_id_t> timer_request;
  320. pt::time_duration io_timeout;
  321. asio::io_context::strand &strand;
  322. tcp::resolver backend;
  323. request_ptr_t request;
  324. Queue queue;
  325. Cache cache;
  326. };
  327. struct http_worker_t : public r::actor_base_t {
  328. using request_ptr_t = r::intrusive_ptr_t<message::http_request_t>;
  329. using tcp_socket_ptr_t = std::unique_ptr<tcp::socket>;
  330. using resolve_it_t = payload::address_response_t::resolve_results_t::iterator;
  331. using config_t = http_worker_config_t;
  332. template <typename Actor> using config_builder_t = http_worker_config_builder_t<Actor>;
  333. explicit http_worker_t(config_t &config)
  334. : r::actor_base_t{config}, resolve_timeout(config.resolve_timeout), request_timeout(config.request_timeout),
  335. strand{static_cast<ra::supervisor_asio_t *>(config.supervisor)->get_strand()} {}
  336. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  337. r::actor_base_t::configure(plugin);
  338. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  339. p.subscribe_actor(&http_worker_t::on_request);
  340. p.subscribe_actor(&http_worker_t::on_resolve);
  341. p.subscribe_actor(&http_worker_t::on_cancel);
  342. });
  343. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  344. p.discover_name(service::resolver, resolver).link(false).callback([](auto phase, auto &ee) {
  345. if (ee) {
  346. auto p = (phase == rotor::plugin::registry_plugin_t::phase_t::linking) ? "link" : "discovery";
  347. std::cout << "cannot link with resolver (" << p << "): " << ee->message() << "\n";
  348. }
  349. });
  350. });
  351. }
  352. void make_response(const r::extended_error_ptr_t &ee) noexcept {
  353. if (!response) {
  354. response = r::actor_base_t::make_response(*orig_req, ee);
  355. }
  356. }
  357. void make_response() noexcept {
  358. if (!response) {
  359. response = r::actor_base_t::make_response(*orig_req, std::move(http_response), response_size);
  360. }
  361. }
  362. void finish_response() {
  363. if (!resources->has_any()) {
  364. supervisor->put(std::move(response));
  365. orig_req.reset();
  366. }
  367. }
  368. void on_request(message::http_request_t &req) noexcept {
  369. if (state == r::state_t::SHUTTING_DOWN) {
  370. make_response(make_error(r::make_error_code(r::error_code_t::cancelled)));
  371. return finish_response();
  372. }
  373. assert(!sock);
  374. orig_req.reset(&req);
  375. http_response.clear();
  376. response_size = 0;
  377. auto &url = req.payload.request_payload->url;
  378. endpoint_t endpoint{url.host, url.port};
  379. resolve_request = request<payload::address_request_t>(resolver, std::move(endpoint)).send(resolve_timeout);
  380. resources->acquire(resource::resolve);
  381. }
  382. void on_cancel(message::http_cancel_t &) noexcept {
  383. if (!orig_req) {
  384. cancel_timer();
  385. cancel_sock();
  386. }
  387. }
  388. void on_resolve(message::resolve_response_t &res) noexcept {
  389. resources->release(resource::resolve);
  390. auto &ee = res.payload.ee;
  391. if (ee) {
  392. make_response(ee);
  393. return finish_response();
  394. }
  395. if (state == r::state_t::SHUTTING_DOWN) {
  396. make_response(make_error(r::make_error_code(r::error_code_t::cancelled)));
  397. return finish_response();
  398. }
  399. sys::error_code ec_sock;
  400. sock = std::make_unique<tcp::socket>(strand.context());
  401. sock->open(tcp::v4(), ec_sock);
  402. if (ec_sock) {
  403. make_response(make_error(ec_sock));
  404. return finish_response();
  405. }
  406. auto &addresses = res.payload.res->results;
  407. auto fwd_connect = ra::forwarder_t(*this, &http_worker_t::on_connect, &http_worker_t::on_tcp_error);
  408. asio::async_connect(*sock, addresses.begin(), addresses.end(), std::move(fwd_connect));
  409. resources->acquire(resource::io);
  410. timer_request = start_timer(request_timeout, *this, &http_worker_t::on_timer);
  411. resources->acquire(resource::timer);
  412. }
  413. void on_connect(resolve_it_t) noexcept {
  414. if (response) {
  415. resources->release(resource::io);
  416. return finish_response();
  417. }
  418. auto &url = orig_req->payload.request_payload->url;
  419. http_request.method(http::verb::get);
  420. http_request.version(11);
  421. http_request.target(url.path);
  422. http_request.set(http::field::host, url.host);
  423. http_request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
  424. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_request_sent, &http_worker_t::on_tcp_error);
  425. http::async_write(*sock, http_request, std::move(fwd));
  426. }
  427. void on_request_sent(std::size_t /* bytes */) noexcept {
  428. if (response) {
  429. resources->release(resource::io);
  430. return finish_response();
  431. }
  432. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_request_read, &http_worker_t::on_tcp_error);
  433. auto &rx_buff = orig_req->payload.request_payload->rx_buff;
  434. rx_buff->prepare(orig_req->payload.request_payload->rx_buff_size);
  435. http::async_read(*sock, *rx_buff, http_response, std::move(fwd));
  436. }
  437. void on_request_read(std::size_t bytes) noexcept {
  438. resources->release(resource::io);
  439. response_size = bytes;
  440. make_response();
  441. sock.reset();
  442. finish_response();
  443. cancel_timer();
  444. }
  445. void on_tcp_error(const sys::error_code &ec) noexcept {
  446. resources->release(resource::io);
  447. if (!response) {
  448. make_response(make_error(ec));
  449. }
  450. finish_response();
  451. cancel_timer();
  452. }
  453. void on_timer(r::request_id_t, bool cancelled) noexcept {
  454. resources->release(resource::timer);
  455. timer_request.reset();
  456. if (cancelled && !response) {
  457. make_response(make_error(r::make_error_code(r::error_code_t::cancelled)));
  458. }
  459. cancel_sock();
  460. finish_response();
  461. }
  462. void cancel_sock() noexcept {
  463. if (sock) {
  464. sys::error_code ec;
  465. sock->cancel(ec);
  466. if (ec) {
  467. get_supervisor().do_shutdown();
  468. }
  469. }
  470. }
  471. void cancel_timer() noexcept {
  472. if (timer_request) {
  473. r::actor_base_t::cancel_timer(*timer_request);
  474. }
  475. }
  476. std::optional<r::request_id_t> timer_request;
  477. std::optional<r::request_id_t> resolve_request;
  478. pt::time_duration resolve_timeout;
  479. pt::time_duration request_timeout;
  480. asio::io_context::strand &strand;
  481. r::address_ptr_t resolver;
  482. request_ptr_t orig_req;
  483. r::message_ptr_t response;
  484. tcp_socket_ptr_t sock;
  485. http::request<http::empty_body> http_request;
  486. http::response<http::string_body> http_response;
  487. size_t response_size = 0;
  488. };
  489. struct http_manager_config_t : public ra::supervisor_config_asio_t {
  490. r::pt::time_duration worker_timeout;
  491. r::pt::time_duration resolve_timeout;
  492. r::pt::time_duration request_timeout;
  493. using ra::supervisor_config_asio_t::supervisor_config_asio_t;
  494. };
  495. template <typename Supervisor> struct http_manager_asio_builder_t : ra::supervisor_config_asio_builder_t<Supervisor> {
  496. using builder_t = typename Supervisor::template config_builder_t<Supervisor>;
  497. using parent_t = ra::supervisor_config_asio_builder_t<Supervisor>;
  498. using parent_t::parent_t;
  499. constexpr static const std::uint32_t WORKER_TIMEOUT = 1 << 3;
  500. constexpr static const std::uint32_t REQUEST_TIMEOUT = 1 << 4;
  501. constexpr static const std::uint32_t RESOLVE_TIMEOUT = 1 << 5;
  502. constexpr static const std::uint32_t requirements_mask =
  503. parent_t::requirements_mask | WORKER_TIMEOUT | REQUEST_TIMEOUT | RESOLVE_TIMEOUT;
  504. builder_t &&worker_timeout(const r::pt::time_duration &value) && {
  505. parent_t::config.worker_timeout = value;
  506. parent_t::mask = (parent_t::mask & ~WORKER_TIMEOUT);
  507. return std::move(*static_cast<builder_t *>(this));
  508. }
  509. builder_t &&resolve_timeout(const r::pt::time_duration &value) && {
  510. parent_t::config.resolve_timeout = value;
  511. parent_t::mask = (parent_t::mask & ~RESOLVE_TIMEOUT);
  512. return std::move(*static_cast<builder_t *>(this));
  513. }
  514. builder_t &&request_timeout(const r::pt::time_duration &value) && {
  515. parent_t::config.request_timeout = value;
  516. parent_t::mask = (parent_t::mask & ~REQUEST_TIMEOUT);
  517. return std::move(*static_cast<builder_t *>(this));
  518. }
  519. };
  520. struct http_manager_t : public ra::supervisor_asio_t {
  521. using request_ptr_t = r::intrusive_ptr_t<message::http_request_t>;
  522. struct job_t {
  523. request_ptr_t request;
  524. r::address_ptr_t worker;
  525. };
  526. using workers_set_t = std::unordered_set<r::address_ptr_t>;
  527. using req_mapping_t = std::unordered_map<r::request_id_t, job_t>;
  528. using delayed_queue_t = std::deque<request_ptr_t>;
  529. using config_t = http_manager_config_t;
  530. template <typename Supervisor> using config_builder_t = http_manager_asio_builder_t<Supervisor>;
  531. explicit http_manager_t(http_manager_config_t &config_)
  532. : ra::supervisor_asio_t{config_}, worker_timeout{config_.worker_timeout},
  533. request_timeout{config_.request_timeout}, resolve_timeout{config_.resolve_timeout} {}
  534. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  535. r::actor_base_t::configure(plugin);
  536. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  537. p.subscribe_actor(&http_manager_t::on_request);
  538. p.subscribe_actor(&http_manager_t::on_response);
  539. p.subscribe_actor(&http_manager_t::on_cancel);
  540. });
  541. plugin.with_casted<r::plugin::registry_plugin_t>(
  542. [&](auto &p) { p.register_name(service::manager, get_address()); });
  543. }
  544. void shutdown_finish() noexcept override {
  545. std::cerr << "http_manager_t::shutdown_finish()\n";
  546. ra::supervisor_asio_t::shutdown_finish();
  547. }
  548. void on_child_init(actor_base_t *actor, const r::extended_error_ptr_t &ee) noexcept override {
  549. ra::supervisor_asio_t::on_child_init(actor, ee);
  550. assert(!ee);
  551. auto &req = delayed_queue.front();
  552. forward(*req, actor->get_address());
  553. delayed_queue.pop_front();
  554. }
  555. r::address_ptr_t get_worker() noexcept {
  556. auto it = free_workers.begin();
  557. if (it == free_workers.end()) {
  558. return r::address_ptr_t{};
  559. }
  560. auto worker_addr = *it;
  561. free_workers.erase(it);
  562. return worker_addr;
  563. }
  564. void on_cancel(message::http_cancel_t &req) noexcept {
  565. auto request_id = req.payload.id;
  566. auto it = req_mapping.find(request_id);
  567. if (it != req_mapping.end()) {
  568. send<message::http_cancel_t::payload_t>(it->second.worker);
  569. }
  570. }
  571. void forward(message::http_request_t &req, const r::address_ptr_t &worker_addr) noexcept {
  572. auto &payload = req.payload.request_payload;
  573. auto request_id = request<payload::http_request_t>(worker_addr, payload).send(payload->timeout);
  574. req_mapping.emplace(request_id, job_t{&req, std::move(worker_addr)});
  575. }
  576. void on_request(message::http_request_t &req) noexcept {
  577. auto worker_addr = get_worker();
  578. if (worker_addr) {
  579. forward(req, worker_addr);
  580. } else {
  581. delayed_queue.emplace_back(&req);
  582. create_actor<http_worker_t>()
  583. .timeout(worker_timeout)
  584. .resolve_timeout(resolve_timeout)
  585. .request_timeout(request_timeout)
  586. .finish();
  587. }
  588. }
  589. void on_response(message::http_response_t &res) noexcept {
  590. auto it = req_mapping.find(res.payload.request_id());
  591. auto worker_addr = res.payload.req->address;
  592. free_workers.emplace(std::move(worker_addr));
  593. reply_to(*it->second.request, res.payload.ee, std::move(res.payload.res));
  594. req_mapping.erase(it);
  595. }
  596. r::pt::time_duration worker_timeout;
  597. r::pt::time_duration request_timeout;
  598. r::pt::time_duration resolve_timeout;
  599. workers_set_t free_workers;
  600. req_mapping_t req_mapping;
  601. delayed_queue_t delayed_queue;
  602. };
  603. struct client_t : r::actor_base_t {
  604. using r::actor_base_t::actor_base_t;
  605. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  606. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  607. rotor::actor_base_t::configure(plugin);
  608. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&client_t::on_response); });
  609. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  610. p.discover_name(service::manager, manager_addr, true).link(false).callback([](auto phase, auto &ee) {
  611. auto p = (phase == rotor::plugin::registry_plugin_t::phase_t::linking) ? "link" : "discovery";
  612. if (!ee) {
  613. std::cerr << "manager has been found (" << p << ")\n";
  614. } else {
  615. std::cerr << "manager has not been found (" << ee->message() << " (" << p << ")\n";
  616. }
  617. });
  618. });
  619. }
  620. void shutdown_start() noexcept override {
  621. r::actor_base_t::shutdown_start();
  622. std::cerr << "client_t, shutdown_start\n";
  623. for (auto &req : active_requests) {
  624. send<message::http_cancel_t::payload_t>(manager_addr, req, get_address());
  625. }
  626. }
  627. void shutdown_finish() noexcept override {
  628. auto end = std::chrono::high_resolution_clock::now();
  629. std::chrono::duration<double> diff = end - start;
  630. r::actor_base_t::shutdown_finish();
  631. double rps = success_requests / diff.count();
  632. std::cerr << "client_t::shutdown_finish, stats: " << success_requests << "/" << total_requests
  633. << " requests, in " << diff.count() << "s, rps = " << rps << "\n";
  634. manager_addr.reset();
  635. }
  636. void make_request() noexcept {
  637. if (!shutdown_request) {
  638. if (active_requests.size() < concurrency && total_requests < max_requests) {
  639. auto rx_buff = std::make_shared<payload::http_request_t::rx_buff_t>();
  640. auto request_id =
  641. request<payload::http_request_t>(manager_addr, url, std::move(rx_buff), RX_BUFF_SZ, timeout)
  642. .send(timeout);
  643. active_requests.emplace(request_id);
  644. ++total_requests;
  645. resources->acquire(resource::request);
  646. }
  647. }
  648. }
  649. void on_start() noexcept override {
  650. r::actor_base_t::on_start();
  651. start = std::chrono::high_resolution_clock::now();
  652. for (std::size_t i = 0; i < concurrency; ++i) {
  653. make_request();
  654. }
  655. }
  656. void on_response(message::http_response_t &msg) noexcept {
  657. auto it = active_requests.find(msg.payload.req->payload.id);
  658. assert(it != active_requests.end());
  659. active_requests.erase(it);
  660. resources->release(resource::request);
  661. bool err = false;
  662. if (!msg.payload.ee) {
  663. auto &res = msg.payload.res->response;
  664. if (res.result() == http::status::ok) {
  665. // std::cerr << "." << std::flush;
  666. ++success_requests;
  667. } else {
  668. std::cerr << "http error: " << res.result_int() << "\n";
  669. err = true;
  670. }
  671. } else {
  672. std::cerr << "request error: " << msg.payload.ee->message() << "\n";
  673. err = true;
  674. }
  675. if (err) {
  676. ++http_errors;
  677. }
  678. if (http_errors < MAX_HTTP_FAILURES) {
  679. make_request();
  680. }
  681. if (active_requests.empty()) {
  682. std::cerr << "starting shutdown\n";
  683. do_shutdown();
  684. }
  685. }
  686. std::size_t poll_attempts = 0;
  687. std::size_t http_errors = 0;
  688. std::unordered_set<r::request_id_t> active_requests;
  689. std::size_t success_requests = 0;
  690. std::size_t total_requests = 0;
  691. std::size_t max_requests = 0;
  692. r::address_ptr_t manager_addr;
  693. URL url;
  694. r::pt::time_duration timeout;
  695. std::size_t concurrency;
  696. timepoint_t start;
  697. };
  698. std::atomic_bool shutdown_flag = false;
  699. int main(int argc, char **argv) {
  700. using url_ptr_t = std::unique_ptr<URL>;
  701. // clang-format off
  702. /* parse command-line & config options */
  703. po::options_description cmdline_descr("Allowed options");
  704. cmdline_descr.add_options()
  705. ("help", "show this help message")
  706. ("url", po::value<std::string>()->default_value("http://www.example.com:80/index.html"), "URL to poll")
  707. ("workers_count", po::value<std::size_t>()->default_value(10), "concurrency (number of http workers)")
  708. ("timeout", po::value<std::size_t>()->default_value(5000), "generic timeout (in milliseconds)")
  709. ("resolve_timeout", po::value<std::size_t>()->default_value(1000), "resolve timeout (in milliseconds)")
  710. ("rotor_timeout", po::value<std::size_t>()->default_value(50), "rotor timeout (in milliseconds)")
  711. ("max_requests", po::value<std::size_t>()->default_value(20), "maximum amount of requests before shutting down");
  712. // clang-format on
  713. po::variables_map vm;
  714. po::store(po::parse_command_line(argc, argv, cmdline_descr), vm);
  715. po::notify(vm);
  716. bool show_help = vm.count("help");
  717. if (show_help) {
  718. std::cout << cmdline_descr << "\n";
  719. return 1;
  720. }
  721. url_ptr_t url;
  722. auto url_str = vm["url"].as<std::string>();
  723. std::regex re("(\\w+)://([^/ :]+):(\\d+)(/.+)");
  724. std::smatch what;
  725. if (regex_match(url_str, what, re)) {
  726. auto host = std::string(what[2].first, what[2].second);
  727. auto port = std::string(what[3].first, what[3].second);
  728. auto path = std::string(what[4].first, what[4].second);
  729. url = std::make_unique<URL>(URL{std::move(host), std::move(port), std::move(path)});
  730. } else {
  731. std::cout << "wrong url format. It should be like http://www.example.com:80/index.html"
  732. << "\n";
  733. return 1;
  734. }
  735. auto request_timeout = r::pt::milliseconds{vm["timeout"].as<std::size_t>()};
  736. auto resolve_timeout = r::pt::milliseconds{vm["resolve_timeout"].as<std::size_t>()};
  737. auto rotor_timeout = r::pt::milliseconds{vm["rotor_timeout"].as<std::size_t>()};
  738. auto workers_count = vm["workers_count"].as<std::size_t>();
  739. std::cerr << "using " << workers_count << " workers for " << url->host << ":" << url->port << url->path
  740. << ", timeout: " << request_timeout << "\n";
  741. asio::io_context io_context;
  742. auto system_context = ra::system_context_asio_t::ptr_t{new ra::system_context_asio_t(io_context)};
  743. auto strand = std::make_shared<asio::io_context::strand>(io_context);
  744. auto man_timeout = request_timeout + (rotor_timeout * static_cast<int>(workers_count) * 2);
  745. auto sup = system_context->create_supervisor<ra::supervisor_asio_t>()
  746. .timeout(man_timeout)
  747. .strand(strand)
  748. .create_registry()
  749. .synchronize_start()
  750. .finish();
  751. sup->create_actor<resolver_worker_t>()
  752. .timeout(resolve_timeout + rotor_timeout)
  753. .resolve_timeout(resolve_timeout)
  754. .finish();
  755. auto worker_timeout = request_timeout * 2;
  756. sup->create_actor<http_manager_t>()
  757. .strand(strand)
  758. .timeout(man_timeout)
  759. .worker_timeout(worker_timeout)
  760. .request_timeout(request_timeout)
  761. .resolve_timeout(resolve_timeout + rotor_timeout * 2)
  762. .synchronize_start()
  763. .shutdown_flag(shutdown_flag, rotor_timeout / 2)
  764. .finish();
  765. auto client = sup->create_actor<client_t>().timeout(worker_timeout).autoshutdown_supervisor().finish();
  766. client->timeout = request_timeout + rotor_timeout * 2;
  767. client->concurrency = workers_count;
  768. client->url = *url;
  769. client->max_requests = vm["max_requests"].as<std::size_t>();
  770. sup->start();
  771. #ifndef _WIN32
  772. struct sigaction act;
  773. act.sa_handler = [](int) { shutdown_flag = true; };
  774. if (sigaction(SIGINT, &act, nullptr) != 0) {
  775. std::cout << "critical :: cannot set signal handler\n";
  776. return -1;
  777. }
  778. #endif
  779. io_context.run();
  780. #ifndef _WIN32
  781. shutdown_flag = true;
  782. #endif
  783. return 0;
  784. }