sha512.cpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. //
  2. // Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is an example how to implement interruptible blocking operations using
  8. * std::thread backend. Here, as blocking I/O operation the the reading disk
  9. * file and calculating its sha512 digest is used.
  10. *
  11. * The whole work is split into pieces, and once a piece is complete the
  12. * continuation message (with the whole job state) is send to the next
  13. * piece to be processed and so on. Between continuation messages other
  14. * messages might appear (in the case, the shutdown message) or timers
  15. * might be triggered.
  16. *
  17. * This is an example of blocking messages multiplexing pattern.
  18. *
  19. * The "ctrl+c" can be anytime pressed on the terminal, and the program
  20. * will correctly shutdown (including sanitizer build). Try it!
  21. *
  22. * As the additional std::threads are not spawned, this example is
  23. * ok to compile it with BUILD_THREAD_UNSAFE mode.
  24. *
  25. */
  26. #include "rotor.hpp"
  27. #include "rotor/thread.hpp"
  28. #include <cstdint>
  29. #include <string>
  30. #include <iostream>
  31. #include <fstream>
  32. #include <memory>
  33. #include <atomic>
  34. #include <openssl/sha.h>
  35. #include <openssl/evp.h>
  36. #ifndef _WIN32
  37. #include <signal.h>
  38. #else
  39. #include <windows.h>
  40. #endif
  41. namespace r = rotor;
  42. namespace rth = rotor::thread;
  43. using buffer_t = std::vector<std::byte>;
  44. template <typename T> using guard_t = std::unique_ptr<T, std::function<void(T *)>>;
  45. enum class work_result_t { done, completed, errored };
  46. struct work_t {
  47. using evp_ctx_t = guard_t<EVP_MD_CTX>;
  48. work_t(std::ifstream &&in_, size_t file_size_, size_t buff_sz_)
  49. : in(std::move(in_)), file_size{file_size_}, buff(buff_sz_) {
  50. evp_ctx = evp_ctx_t(EVP_MD_CTX_new(), [](auto ptr) { EVP_MD_CTX_free(ptr); });
  51. if (EVP_DigestInit_ex(evp_ctx.get(), EVP_sha512(), NULL) != 1) {
  52. error = "fail to init sha";
  53. }
  54. }
  55. std::string get_error() const noexcept {
  56. assert(error.size() && "has error");
  57. return error;
  58. }
  59. std::string get_result() const noexcept {
  60. assert(error.size() == 0 && "has no error");
  61. assert(result.size() != 0 && "has result");
  62. return result;
  63. }
  64. work_result_t io() noexcept {
  65. if (error.size()) {
  66. return work_result_t::errored;
  67. }
  68. auto bytes_left = file_size - bytes_read;
  69. auto final = bytes_left < buff.size();
  70. auto bytes_to_read = final ? bytes_left : buff.size();
  71. in.read(reinterpret_cast<char *>(buff.data()), static_cast<std::streamsize>(bytes_to_read));
  72. if (!in) {
  73. error = "reading file error";
  74. return work_result_t::errored;
  75. }
  76. // printf("read %llu bytes\n", bytes_to_read);
  77. bytes_read += bytes_to_read;
  78. auto r = EVP_DigestUpdate(evp_ctx.get(), buff.data(), bytes_to_read);
  79. if (r != 1) {
  80. error = "sha update failed";
  81. return work_result_t::errored;
  82. }
  83. if (!final) {
  84. return work_result_t::done;
  85. }
  86. unsigned int trailing_bytes = SHA512_DIGEST_LENGTH;
  87. unsigned char digest[SHA512_DIGEST_LENGTH];
  88. r = EVP_DigestFinal_ex(evp_ctx.get(), digest, &trailing_bytes);
  89. if (r != 1) {
  90. error = "sha final failed";
  91. return work_result_t::errored;
  92. }
  93. result = std::string((char *)digest, trailing_bytes);
  94. return work_result_t::completed;
  95. }
  96. private:
  97. evp_ctx_t evp_ctx;
  98. std::ifstream in;
  99. size_t file_size;
  100. buffer_t buff;
  101. size_t bytes_read = 0;
  102. std::string error;
  103. std::string result;
  104. };
  105. namespace payload {
  106. struct work_progress_t {
  107. std::unique_ptr<work_t> work;
  108. };
  109. } // namespace payload
  110. namespace message {
  111. using work_progress_t = r::message_t<payload::work_progress_t>;
  112. }
  113. struct sah_actor_config : r::actor_config_t {
  114. std::string path = "";
  115. std::size_t block_size = 0;
  116. };
  117. template <typename Actor> struct sah_actor_config_builder_t : r::actor_config_builder_t<Actor> {
  118. using builder_t = typename Actor::template config_builder_t<Actor>;
  119. using parent_t = r::actor_config_builder_t<Actor>;
  120. using parent_t::parent_t;
  121. builder_t &&path(const std::string &value) &&noexcept {
  122. parent_t::config.path = value;
  123. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  124. }
  125. builder_t &&block_size(std::size_t value) &&noexcept {
  126. parent_t::config.block_size = value;
  127. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  128. }
  129. };
  130. struct sha_actor_t : public r::actor_base_t {
  131. using config_t = sah_actor_config;
  132. template <typename Actor> using config_builder_t = sah_actor_config_builder_t<Actor>;
  133. explicit sha_actor_t(config_t &cfg) : r::actor_base_t{cfg}, path{cfg.path}, block_size{cfg.block_size} {}
  134. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  135. r::actor_base_t::configure(plugin);
  136. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  137. p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // important
  138. });
  139. }
  140. void on_start() noexcept override {
  141. rotor::actor_base_t::on_start();
  142. std::ifstream in(path, std::ifstream::ate | std::ifstream::binary);
  143. if (!in.is_open()) {
  144. std::cout << "failed to open " << path << '\n';
  145. return do_shutdown();
  146. }
  147. auto sz = in.tellg();
  148. in = std::ifstream(path, std::ifstream::binary);
  149. auto work = std::make_unique<work_t>(std::move(in), sz, block_size);
  150. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  151. }
  152. private:
  153. std::string path;
  154. std::size_t block_size;
  155. void print_result(const work_t &work) noexcept {
  156. auto r = work.get_result();
  157. const std::byte *buff = reinterpret_cast<const std::byte *>(r.data());
  158. for (size_t i = 0; i < r.size(); ++i) {
  159. std::cout << std::hex << std::setfill('0') << std::setw(2) << (unsigned)buff[i];
  160. }
  161. std::cout << "\n";
  162. }
  163. void on_process(message::work_progress_t &msg) noexcept {
  164. auto &work = msg.payload.work;
  165. auto result = msg.payload.work->io();
  166. switch (result) {
  167. case work_result_t::done:
  168. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  169. break;
  170. case work_result_t::errored:
  171. std::cout << "error: " << work->get_error() << "\n";
  172. supervisor->do_shutdown();
  173. break;
  174. case work_result_t::completed:
  175. print_result(*work);
  176. supervisor->do_shutdown();
  177. break;
  178. }
  179. }
  180. };
  181. std::atomic_bool shutdown_flag = false;
  182. #ifdef _WIN32
  183. BOOL WINAPI consoleHandler(DWORD signal) {
  184. if (signal == CTRL_C_EVENT) {
  185. shutdown_flag = true;
  186. }
  187. return TRUE; /* ignore */
  188. }
  189. #endif
  190. int main(int argc, char **argv) {
  191. std::string path = argv[0];
  192. if (argc < 2) {
  193. std::cout << "usage:: " << argv[0] << " /path/to/file [block_size = 1048576]\n";
  194. std::cout << "will calculate for " << argv[0] << "\n";
  195. } else {
  196. path = argv[1];
  197. }
  198. size_t block_size = 1048576;
  199. if (argc == 3) {
  200. try {
  201. block_size = static_cast<size_t>(std::stoll(argv[2]));
  202. } catch (...) {
  203. std::cout << "can't convert '" << argv[2] << "', using default one\n";
  204. }
  205. }
  206. rth::system_context_thread_t ctx;
  207. auto timeout = r::pt::milliseconds{100};
  208. auto sup = ctx.create_supervisor<rth::supervisor_thread_t>()
  209. .timeout(timeout)
  210. .shutdown_flag(shutdown_flag, timeout / 2)
  211. .finish();
  212. sup->create_actor<sha_actor_t>()
  213. .block_size(block_size)
  214. .path(path)
  215. .timeout(timeout)
  216. .autoshutdown_supervisor()
  217. .finish();
  218. #ifndef _WIN32
  219. struct sigaction action;
  220. memset(&action, 0, sizeof(action));
  221. action.sa_handler = [](int) { shutdown_flag = true; };
  222. if (sigaction(SIGINT, &action, nullptr) != 0) {
  223. std::cout << "sigaction failed\n";
  224. return -1;
  225. }
  226. #else
  227. if (!SetConsoleCtrlHandler(consoleHandler, true)) {
  228. std::cout << "SetConsoleCtrlHandler failed\n";
  229. return -1;
  230. }
  231. #endif
  232. ctx.run();
  233. if (shutdown_flag) {
  234. std::cout << "terminated due to ctrl+c press\n";
  235. }
  236. std::cout << "normal exit\n";
  237. return 0;
  238. }