parsebyparts.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Example of parsing JSON to document by parts.
  2. // Using C++11 threads
  3. // Temporarily disable for clang (older version) due to incompatibility with libstdc++
  4. #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
  5. #include "rapidjson/document.h"
  6. #include "rapidjson/error/en.h"
  7. #include "rapidjson/writer.h"
  8. #include "rapidjson/ostreamwrapper.h"
  9. #include <condition_variable>
  10. #include <iostream>
  11. #include <mutex>
  12. #include <thread>
  13. using namespace rapidjson;
  14. template<unsigned parseFlags = kParseDefaultFlags>
  15. class AsyncDocumentParser {
  16. public:
  17. AsyncDocumentParser(Document& d)
  18. : stream_(*this)
  19. , d_(d)
  20. , parseThread_(&AsyncDocumentParser::Parse, this)
  21. , mutex_()
  22. , notEmpty_()
  23. , finish_()
  24. , completed_()
  25. {}
  26. ~AsyncDocumentParser() {
  27. if (!parseThread_.joinable())
  28. return;
  29. {
  30. std::unique_lock<std::mutex> lock(mutex_);
  31. // Wait until the buffer is read up (or parsing is completed)
  32. while (!stream_.Empty() && !completed_)
  33. finish_.wait(lock);
  34. // Automatically append '\0' as the terminator in the stream.
  35. static const char terminator[] = "";
  36. stream_.src_ = terminator;
  37. stream_.end_ = terminator + 1;
  38. notEmpty_.notify_one(); // unblock the AsyncStringStream
  39. }
  40. parseThread_.join();
  41. }
  42. void ParsePart(const char* buffer, size_t length) {
  43. std::unique_lock<std::mutex> lock(mutex_);
  44. // Wait until the buffer is read up (or parsing is completed)
  45. while (!stream_.Empty() && !completed_)
  46. finish_.wait(lock);
  47. // Stop further parsing if the parsing process is completed.
  48. if (completed_)
  49. return;
  50. // Set the buffer to stream and unblock the AsyncStringStream
  51. stream_.src_ = buffer;
  52. stream_.end_ = buffer + length;
  53. notEmpty_.notify_one();
  54. }
  55. private:
  56. void Parse() {
  57. d_.ParseStream<parseFlags>(stream_);
  58. // The stream may not be fully read, notify finish anyway to unblock ParsePart()
  59. std::unique_lock<std::mutex> lock(mutex_);
  60. completed_ = true; // Parsing process is completed
  61. finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
  62. }
  63. struct AsyncStringStream {
  64. typedef char Ch;
  65. AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
  66. char Peek() const {
  67. std::unique_lock<std::mutex> lock(parser_.mutex_);
  68. // If nothing in stream, block to wait.
  69. while (Empty())
  70. parser_.notEmpty_.wait(lock);
  71. return *src_;
  72. }
  73. char Take() {
  74. std::unique_lock<std::mutex> lock(parser_.mutex_);
  75. // If nothing in stream, block to wait.
  76. while (Empty())
  77. parser_.notEmpty_.wait(lock);
  78. count_++;
  79. char c = *src_++;
  80. // If all stream is read up, notify that the stream is finish.
  81. if (Empty())
  82. parser_.finish_.notify_one();
  83. return c;
  84. }
  85. size_t Tell() const { return count_; }
  86. // Not implemented
  87. char* PutBegin() { return 0; }
  88. void Put(char) {}
  89. void Flush() {}
  90. size_t PutEnd(char*) { return 0; }
  91. bool Empty() const { return src_ == end_; }
  92. AsyncDocumentParser& parser_;
  93. const char* src_; //!< Current read position.
  94. const char* end_; //!< End of buffer
  95. size_t count_; //!< Number of characters taken so far.
  96. };
  97. AsyncStringStream stream_;
  98. Document& d_;
  99. std::thread parseThread_;
  100. std::mutex mutex_;
  101. std::condition_variable notEmpty_;
  102. std::condition_variable finish_;
  103. bool completed_;
  104. };
  105. int main() {
  106. Document d;
  107. {
  108. AsyncDocumentParser<> parser(d);
  109. const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
  110. //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
  111. const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
  112. const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
  113. parser.ParsePart(json1, sizeof(json1) - 1);
  114. parser.ParsePart(json2, sizeof(json2) - 1);
  115. parser.ParsePart(json3, sizeof(json3) - 1);
  116. }
  117. if (d.HasParseError()) {
  118. std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
  119. return EXIT_FAILURE;
  120. }
  121. // Stringify the JSON to cout
  122. OStreamWrapper os(std::cout);
  123. Writer<OStreamWrapper> writer(os);
  124. d.Accept(writer);
  125. std::cout << std::endl;
  126. return EXIT_SUCCESS;
  127. }
  128. #else // Not supporting C++11
  129. #include <iostream>
  130. int main() {
  131. std::cout << "This example requires C++11 compiler" << std::endl;
  132. }
  133. #endif