parsebyparts.cpp

// Example of parsing JSON to document by parts.

// Using C++11 threads
// Temporarily disable for clang (older version) due to incompatibility with libstdc++
#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)

#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/writer.h"
#include "rapidjson/ostreamwrapper.h"
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

using namespace rapidjson;

template<unsigned parseFlags = kParseDefaultFlags>
class AsyncDocumentParser {
public:
    AsyncDocumentParser(Document& d)
        : stream_(*this)
        , d_(d)
        , parseThread_()
        , mutex_()
        , notEmpty_()
        , finish_()
        , completed_()
    {
        // Create and execute thread after all member variables are initialized.
        parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
    }

    ~AsyncDocumentParser() {
        if (!parseThread_.joinable())
            return;

        {        
            std::unique_lock<std::mutex> lock(mutex_);

            // Wait until the buffer is read up (or parsing is completed)
            while (!stream_.Empty() && !completed_)
                finish_.wait(lock);

            // Automatically append '\0' as the terminator in the stream.
            static const char terminator[] = "";
            stream_.src_ = terminator;
            stream_.end_ = terminator + 1;
            notEmpty_.notify_one(); // unblock the AsyncStringStream
        }

        parseThread_.join();
    }

    void ParsePart(const char* buffer, size_t length) {
        std::unique_lock<std::mutex> lock(mutex_);
        
        // Wait until the buffer is read up (or parsing is completed)
        while (!stream_.Empty() && !completed_)
            finish_.wait(lock);

        // Stop further parsing if the parsing process is completed.
        if (completed_)
            return;

        // Set the buffer to stream and unblock the AsyncStringStream
        stream_.src_ = buffer;
        stream_.end_ = buffer + length;
        notEmpty_.notify_one();
    }

private:
    void Parse() {
        d_.ParseStream<parseFlags>(stream_);

        // The stream may not be fully read, notify finish anyway to unblock ParsePart()
        std::unique_lock<std::mutex> lock(mutex_);
        completed_ = true;      // Parsing process is completed
        finish_.notify_one();   // Unblock ParsePart() or destructor if they are waiting.
    }

    struct AsyncStringStream {
        typedef char Ch;

        AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}

        char Peek() const {
            std::unique_lock<std::mutex> lock(parser_.mutex_);

            // If nothing in stream, block to wait.
            while (Empty())
                parser_.notEmpty_.wait(lock);

            return *src_;
        }

        char Take() {
            std::unique_lock<std::mutex> lock(parser_.mutex_);

            // If nothing in stream, block to wait.
            while (Empty())
                parser_.notEmpty_.wait(lock);

            count_++;
            char c = *src_++;

            // If all stream is read up, notify that the stream is finish.
            if (Empty())
                parser_.finish_.notify_one();

            return c;
        }

        size_t Tell() const { return count_; }

        // Not implemented
        char* PutBegin() { return 0; }
        void Put(char) {}
        void Flush() {}
        size_t PutEnd(char*) { return 0; }

        bool Empty() const { return src_ == end_; }

        AsyncDocumentParser& parser_;
        const char* src_;     //!< Current read position.
        const char* end_;     //!< End of buffer
        size_t count_;        //!< Number of characters taken so far.
    };

    AsyncStringStream stream_;
    Document& d_;
    std::thread parseThread_;
    std::mutex mutex_;
    std::condition_variable notEmpty_;
    std::condition_variable finish_;
    bool completed_;
};

int main() {
    Document d;

    {
        AsyncDocumentParser<> parser(d);

        const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
        //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error
        const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
        const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";

        parser.ParsePart(json1, sizeof(json1) - 1);
        parser.ParsePart(json2, sizeof(json2) - 1);
        parser.ParsePart(json3, sizeof(json3) - 1);
    }

    if (d.HasParseError()) {
        std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
        return EXIT_FAILURE;
    }
    
    // Stringify the JSON to cout
    OStreamWrapper os(std::cout);
    Writer<OStreamWrapper> writer(os);
    d.Accept(writer);
    std::cout << std::endl;

    return EXIT_SUCCESS;
}

#else // Not supporting C++11 

#include <iostream>
int main() {
    std::cout << "This example requires C++11 compiler" << std::endl;
}

#endif





Add Discussion as Guest

Log in