Skip to content
This repository has been archived by the owner on Aug 19, 2019. It is now read-only.

Streaming JSON parsing cleanup. #115

Merged
merged 8 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 56 additions & 70 deletions src/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,6 @@ class Context {
Context* parent_;
};

class TopLevelContext : public Context {
public:
TopLevelContext() : Context(nullptr) {}
void AddValue(std::unique_ptr<Value> value) override {
values_.emplace_back(std::move(value));
}
std::vector<std::unique_ptr<Value>> values() {
return std::move(values_);
}
private:
std::vector<std::unique_ptr<Value>> values_;
};

class ArrayContext : public Context {
public:
ArrayContext(Context* parent) : Context(parent) {}
Expand Down Expand Up @@ -276,7 +263,6 @@ class CallbackContext : public Context {
// A builder context that allows building up a JSON object.
class JSONBuilder {
public:
JSONBuilder() : context_(new TopLevelContext()) {}
JSONBuilder(std::function<void(std::unique_ptr<Value>)> callback)
: context_(new CallbackContext(callback)) {}
~JSONBuilder() { delete context_; }
Expand Down Expand Up @@ -326,15 +312,6 @@ class JSONBuilder {
return true;
}

// Top-level context only.
std::vector<std::unique_ptr<Value>> values() throw(Exception) {
TopLevelContext* top_level = dynamic_cast<TopLevelContext*>(context_);
if (top_level == nullptr) {
throw Exception("values() called for an inner context");
}
return top_level->values();
}

private:
Context* context_;
};
Expand Down Expand Up @@ -414,7 +391,7 @@ int handle_end_map(void* arg) {
return 1;
}

yajl_callbacks callbacks = {
const yajl_callbacks callbacks = {
.yajl_null = &handle_null,
.yajl_boolean = &handle_bool,
.yajl_integer = &handle_integer,
Expand All @@ -428,38 +405,51 @@ yajl_callbacks callbacks = {
.yajl_end_array = &handle_end_array,
};

class YajlHandle {
public:
YajlHandle(JSONBuilder* builder)
: handle_(yajl_alloc(&callbacks, NULL, builder)) {
yajl_config(handle_, yajl_allow_comments, 1);
yajl_config(handle_, yajl_allow_multiple_values, 1);
//yajl_config(handle_, yajl_allow_partial_values, 1);
//yajl_config(handle_, yajl_allow_trailing_garbage, 1);
//yajl_config(handle_, yajl_dont_validate_strings, 1);
}
~YajlHandle() {
yajl_free(handle_);
}
operator yajl_handle() { return handle_; }
private:
yajl_handle handle_;
};

class YajlError {
public:
YajlError(yajl_handle handle, bool verbose,
const unsigned char* json_text, size_t json_len)
: handle_(handle),
str_(yajl_get_error(handle_, (int)verbose, json_text, json_len)) {}
~YajlError() {
yajl_free_error(handle_, str_);
}
const char* c_str() { return reinterpret_cast<const char*>(str_); }
private:
yajl_handle handle_;
unsigned char* str_;
};

} // namespace

std::vector<std::unique_ptr<Value>> Parser::AllFromStream(std::istream& stream)
throw(Exception)
{
JSONBuilder builder;

const int kMax = 65536;
unsigned char data[kMax];
yajl_handle handle = yajl_alloc(&callbacks, NULL, (void*) &builder);
yajl_config(handle, yajl_allow_comments, 1);
yajl_config(handle, yajl_allow_multiple_values, 1);
//yajl_config(handle, yajl_allow_trailing_garbage, 1);
//yajl_config(handle, yajl_dont_validate_strings, 1);

while (!stream.eof()) {
stream.read(reinterpret_cast<char*>(&data[0]), kMax);
size_t count = stream.gcount();
yajl_parse(handle, data, count);
}

yajl_status stat = yajl_complete_parse(handle);

if (stat != yajl_status_ok) {
unsigned char* str = yajl_get_error(handle, 1, data, kMax);
std::string error_str((const char*)str);
yajl_free_error(handle, str);
throw Exception(error_str);
}

yajl_free(handle);
return builder.values();
std::vector<std::unique_ptr<Value>> values;
Parser p([&values](std::unique_ptr<Value> r){
values.emplace_back(std::move(r));
});
p.ParseStream(stream);
p.NotifyEOF();
return values;
}

std::unique_ptr<Value> Parser::FromStream(std::istream& stream)
Expand All @@ -480,45 +470,39 @@ std::unique_ptr<Value> Parser::FromStream(std::istream& stream)
std::vector<std::unique_ptr<Value>> Parser::AllFromString(
const std::string& input) throw(Exception)
{
std::istringstream stream(input);
return AllFromStream(stream);
return AllFromStream(std::istringstream(input));
}

std::unique_ptr<Value> Parser::FromString(const std::string& input)
throw(Exception)
{
std::istringstream stream(input);
return FromStream(stream);
return FromStream(std::istringstream(input));
}

class Parser::ParseState {
public:
ParseState(std::function<void(std::unique_ptr<Value>)> callback)
: builder_(callback),
handle_(yajl_alloc(&callbacks, NULL, (void*) &builder_)) {
: builder_(callback), handle_(&builder_) {
yajl_config(handle_, yajl_allow_comments, 1);
yajl_config(handle_, yajl_allow_multiple_values, 1);
//yajl_config(handle_, yajl_allow_partial_values, 1);
//yajl_config(handle_, yajl_allow_trailing_garbage, 1);
//yajl_config(handle_, yajl_dont_validate_strings, 1);
}

~ParseState() {
void Done() throw(Exception) {
yajl_status stat = yajl_complete_parse(handle_);
if (stat != yajl_status_ok) {
unsigned char* str = yajl_get_error(handle_, 0, nullptr, 0);
std::string error_str((const char*)str);
yajl_free_error(handle_, str);
throw Exception(error_str);
YajlError err(handle_, 0, nullptr, 0);
throw Exception(err.c_str());
}
yajl_free(handle_);
}

yajl_handle& handle() { return handle_; }
yajl_handle handle() { return handle_; }

private:
JSONBuilder builder_;
yajl_handle handle_;
YajlHandle handle_;
};

Parser::Parser(std::function<void(std::unique_ptr<Value>)> callback)
Expand All @@ -530,18 +514,16 @@ std::size_t Parser::ParseStream(std::istream& stream) throw(Exception) {
const int kMax = 65536;
unsigned char data[kMax];
size_t total_bytes_consumed = 0;
yajl_handle& handle = state_->handle();
yajl_handle handle = state_->handle();

while (!stream.eof()) {
stream.read(reinterpret_cast<char*>(&data[0]), kMax);
size_t count = stream.gcount();

yajl_status stat = yajl_parse(handle, data, count);
if (stat != yajl_status_ok) {
unsigned char* str = yajl_get_error(handle, 1, data, kMax);
std::string error_str((const char*)str);
yajl_free_error(handle, str);
throw Exception(error_str);
YajlError err(handle, 1, data, kMax);
throw Exception(err.c_str());
}

total_bytes_consumed += yajl_get_bytes_consumed(handle);
Expand All @@ -550,4 +532,8 @@ std::size_t Parser::ParseStream(std::istream& stream) throw(Exception) {
return total_bytes_consumed;
}

void Parser::NotifyEOF() throw(Exception) {
state_->Done();
}

} // json
11 changes: 11 additions & 0 deletions src/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,18 @@ class Parser {
throw(Exception);

size_t ParseStream(std::istream& stream) throw(Exception);
// Notifies the parser that no more data is available.
void NotifyEOF() throw(Exception);

// Used to accept inline construction of streams.
static std::vector<std::unique_ptr<Value>> AllFromStream(
std::istream&& stream) throw(Exception) {
return AllFromStream(stream);
}
static std::unique_ptr<Value> FromStream(std::istream&& stream)
throw(Exception) {
return FromStream(stream);
}
size_t ParseStream(std::istream&& stream) throw(Exception) {
return ParseStream(stream);
}
Expand Down
7 changes: 6 additions & 1 deletion src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,10 @@ struct Watcher {
const boost::system::error_code& error) {
const std::string body(std::begin(range), std::end(range));
if (!body.empty()) {
try {
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => Parsing '" << body << "'";
#endif
try {
std::istringstream input(body);
event_parser_.ParseStream(input);
} catch (const json::Exception& e) {
Expand All @@ -636,6 +636,11 @@ struct Watcher {
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => Watch callback: EOF";
#endif
try {
event_parser_.NotifyEOF();
} catch (const json::Exception& e) {
LOG(ERROR) << "Error while processing last event: " << e.what();
}
} else {
LOG(ERROR) << name_ << " => Callback got error " << error;
}
Expand Down
Loading