sha512.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is an example how to implement interruptible blocking operations using
  8. * std::thread backend. Here, as blocking I/O operation the the reading disk
  9. * file and calculating its sha512 digest is used.
  10. *
  11. * The whole work is split into pieces, and once a piece is complete the
  12. * continuation message (with the whole job state) is send to the next
  13. * piece to be processed and so on. Between continuation messages other
  14. * messages might appear (in the case, the shutdown message) or timers
  15. * might be triggered.
  16. *
  17. * This is an example of blocking messages multiplexing pattern.
  18. *
  19. * The "ctrl+c" can be anytime pressed on the terminal, and the program
  20. * will correctly shutdown (including sanitizer build). Try it!
  21. *
  22. * As the additional std::threads are not spawned, this example is
  23. * ok to compile it with BUILD_THREAD_UNSAFE mode.
  24. *
  25. */
  26. #include "rotor.hpp"
  27. #include "rotor/thread.hpp"
  28. #include <cstdint>
  29. #include <string>
  30. #include <iostream>
  31. #include <fstream>
  32. #include <memory>
  33. #include <atomic>
  34. #include <openssl/sha.h>
  35. #ifndef _WIN32
  36. #include <signal.h>
  37. #else
  38. #include <windows.h>
  39. #endif
  40. namespace r = rotor;
  41. namespace rth = rotor::thread;
  42. using buffer_t = std::vector<std::byte>;
  43. enum class work_result_t { done, completed, errored };
  44. struct work_t {
  45. work_t(std::ifstream &&in_, size_t file_size_, size_t buff_sz_)
  46. : in(std::move(in_)), file_size{file_size_}, buff(buff_sz_) {
  47. if (SHA512_Init(&sha_ctx) != 1) {
  48. error = "fail to init sha";
  49. }
  50. }
  51. std::string get_error() const noexcept {
  52. assert(error.size() && "has error");
  53. return error;
  54. }
  55. std::string get_result() const noexcept {
  56. assert(error.size() == 0 && "has no error");
  57. assert(result.size() != 0 && "has result");
  58. return result;
  59. }
  60. work_result_t io() noexcept {
  61. if (error.size()) {
  62. return work_result_t::errored;
  63. }
  64. auto bytes_left = file_size - bytes_read;
  65. auto final = bytes_left < buff.size();
  66. auto bytes_to_read = final ? bytes_left : buff.size();
  67. in.read(reinterpret_cast<char *>(buff.data()), bytes_to_read);
  68. if (!in) {
  69. error = "reading file error";
  70. return work_result_t::errored;
  71. }
  72. // printf("read %llu bytes\n", bytes_to_read);
  73. bytes_read += bytes_to_read;
  74. auto r = SHA512_Update(&sha_ctx, buff.data(), bytes_to_read);
  75. if (r != 1) {
  76. error = "sha update failed";
  77. return work_result_t::errored;
  78. }
  79. if (!final) {
  80. return work_result_t::done;
  81. }
  82. unsigned char digest[SHA512_DIGEST_LENGTH];
  83. r = SHA512_Final(digest, &sha_ctx);
  84. if (r != 1) {
  85. error = "sha final failed";
  86. return work_result_t::errored;
  87. }
  88. result = std::string((char *)digest, SHA512_DIGEST_LENGTH);
  89. return work_result_t::completed;
  90. }
  91. private:
  92. std::ifstream in;
  93. size_t file_size;
  94. buffer_t buff;
  95. size_t bytes_read = 0;
  96. SHA512_CTX sha_ctx;
  97. std::string error;
  98. std::string result;
  99. };
  100. namespace payload {
  101. struct work_progress_t {
  102. std::unique_ptr<work_t> work;
  103. };
  104. } // namespace payload
  105. namespace message {
  106. using work_progress_t = r::message_t<payload::work_progress_t>;
  107. }
  108. struct sah_actor_config : r::actor_config_t {
  109. std::string path = "";
  110. std::size_t block_size = 0;
  111. };
  112. template <typename Actor> struct sah_actor_config_builder_t : r::actor_config_builder_t<Actor> {
  113. using builder_t = typename Actor::template config_builder_t<Actor>;
  114. using parent_t = r::actor_config_builder_t<Actor>;
  115. using parent_t::parent_t;
  116. builder_t &&path(const std::string &value) &&noexcept {
  117. parent_t::config.path = value;
  118. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  119. }
  120. builder_t &&block_size(std::size_t value) &&noexcept {
  121. parent_t::config.block_size = value;
  122. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  123. }
  124. };
  125. struct sha_actor_t : public r::actor_base_t {
  126. using config_t = sah_actor_config;
  127. template <typename Actor> using config_builder_t = sah_actor_config_builder_t<Actor>;
  128. explicit sha_actor_t(config_t &cfg) : r::actor_base_t{cfg}, path{cfg.path}, block_size{cfg.block_size} {}
  129. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  130. r::actor_base_t::configure(plugin);
  131. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  132. p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // important
  133. });
  134. }
  135. void on_start() noexcept override {
  136. rotor::actor_base_t::on_start();
  137. std::ifstream in(path, std::ifstream::ate | std::ifstream::binary);
  138. if (!in.is_open()) {
  139. std::cout << "failed to open " << path << '\n';
  140. return do_shutdown();
  141. }
  142. auto sz = in.tellg();
  143. in = std::ifstream(path, std::ifstream::binary);
  144. auto work = std::make_unique<work_t>(std::move(in), sz, block_size);
  145. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  146. }
  147. private:
  148. std::string path;
  149. std::size_t block_size;
  150. void print_result(const work_t &work) noexcept {
  151. auto r = work.get_result();
  152. const std::byte *buff = reinterpret_cast<const std::byte *>(r.data());
  153. for (size_t i = 0; i < r.size(); ++i) {
  154. std::cout << std::hex << std::setfill('0') << std::setw(2) << (unsigned)buff[i];
  155. }
  156. std::cout << "\n";
  157. }
  158. void on_process(message::work_progress_t &msg) noexcept {
  159. auto &work = msg.payload.work;
  160. auto result = msg.payload.work->io();
  161. switch (result) {
  162. case work_result_t::done:
  163. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  164. break;
  165. case work_result_t::errored:
  166. std::cout << "error: " << work->get_error() << "\n";
  167. supervisor->do_shutdown();
  168. break;
  169. case work_result_t::completed:
  170. print_result(*work);
  171. supervisor->do_shutdown();
  172. break;
  173. }
  174. }
  175. };
  176. std::atomic_bool shutdown_flag = false;
  177. #ifdef _WIN32
  178. BOOL WINAPI consoleHandler(DWORD signal) {
  179. if (signal == CTRL_C_EVENT) {
  180. shutdown_flag = true;
  181. }
  182. return TRUE; /* ignore */
  183. }
  184. #endif
  185. int main(int argc, char **argv) {
  186. std::string path = argv[0];
  187. if (argc < 2) {
  188. std::cout << "usage:: " << argv[0] << " /path/to/file [block_size = 1048576]\n";
  189. std::cout << "will calculate for " << argv[0] << "\n";
  190. } else {
  191. path = argv[1];
  192. }
  193. size_t block_size = 1048576;
  194. if (argc == 3) {
  195. try {
  196. block_size = static_cast<size_t>(std::stoll(argv[2]));
  197. } catch (...) {
  198. std::cout << "can't convert '" << argv[2] << "', using default one\n";
  199. }
  200. }
  201. rth::system_context_thread_t ctx;
  202. auto timeout = r::pt::milliseconds{100};
  203. auto sup = ctx.create_supervisor<rth::supervisor_thread_t>()
  204. .timeout(timeout)
  205. .shutdown_flag(shutdown_flag, timeout / 2)
  206. .finish();
  207. auto act = sup->create_actor<sha_actor_t>()
  208. .block_size(block_size)
  209. .path(path)
  210. .timeout(timeout)
  211. .autoshutdown_supervisor()
  212. .finish();
  213. #ifndef _WIN32
  214. struct sigaction action;
  215. memset(&action, 0, sizeof(action));
  216. action.sa_handler = [](int) { shutdown_flag = true; };
  217. if (sigaction(SIGINT, &action, nullptr) != 0) {
  218. std::cout << "sigaction failed\n";
  219. return -1;
  220. }
  221. #else
  222. if (!SetConsoleCtrlHandler(consoleHandler, true)) {
  223. std::cout << "SetConsoleCtrlHandler failed\n";
  224. return -1;
  225. }
  226. #endif
  227. ctx.run();
  228. if (shutdown_flag) {
  229. std::cout << "terminated due to ctrl+c press\n";
  230. }
  231. std::cout << "normal exit\n";
  232. return 0;
  233. }