Skip to content

Commit

Permalink
Optimize parallel use of JSONSchema
Browse files Browse the repository at this point in the history
Some more optimizations to avoid lock contention (on SharedKeys)
when multiple threads validate against the same schema.
Also added the travel-sample schema document to the repo.
  • Loading branch information
snej committed Jan 28, 2025
1 parent 8afccec commit 19ea9fc
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 23 deletions.
20 changes: 11 additions & 9 deletions Experimental/JSONSchema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ namespace fleece {
AnyOf,
Const,
Contains,
Else,
Enum,
ExclusiveMaximum,
ExclusiveMinimum,
Expand All @@ -253,6 +254,7 @@ namespace fleece {
PropertyNames,
Ref,
Required,
Then,
Type,
UniqueItems,
NKeys_
Expand All @@ -265,6 +267,7 @@ namespace fleece {
"anyOf",
"const",
"contains",
"else",
"enum",
"exclusiveMaximum",
"exclusiveMinimum",
Expand All @@ -290,6 +293,7 @@ namespace fleece {
"propertyNames",
"$ref",
"required",
"then",
"type",
"uniqueItems",
};
Expand Down Expand Up @@ -663,16 +667,14 @@ namespace fleece {
,_value(value)
{
_result = check(value, _schema.schema(), _schema.schema().asDict());
if (ok()) {
if (ok())
_result = {}; // ensure _result.value is nullptr
_value = nullptr; // release my reference
}
}


using Result = JSONSchema::Validation::Result;

static Result mkResult(JSONSchema::Error error, Value value, Dict schema, slice schemaKey) {
static Result mkResult(JSONSchema::Error error, Value value, Value schema, slice schemaKey) {
// cerr << "\tError: " << JSONSchema::errorString(error) << " for " << value.toJSONString() << " failed " << string_view(schemaKey)
// << ": " << schema[schemaKey].toJSONString() << endl;
return Result{error, value, schema, schemaKey};
Expand Down Expand Up @@ -705,7 +707,7 @@ namespace fleece {
}
} else if (schemaVal.type() == kFLBoolean) [[likely]] {
// `true` matches anything, `false` matches nothing:
return mkResult(schemaVal.asBool() ? Error::ok : Error::invalid, value, nullptr, nullslice);
return mkResult(schemaVal.asBool() ? Error::ok : Error::invalid, value, schemaVal, nullslice);
} else {
fail<invalid_schema>("invalid value type in schema");
}
Expand Down Expand Up @@ -778,7 +780,7 @@ namespace fleece {

// "if", "then", "else":
if (Value ifSchema = schema[SHARED_KEY(If)]) {
Value thenSchema = schema["then"], elseSchema = schema["else"];
Value thenSchema = schema[SHARED_KEY(Then)], elseSchema = schema[SHARED_KEY(Else)];
if (thenSchema || elseSchema) {
bool ifOK = ok(check(value, ifSchema, schemaBase));
if (Value nextSchema = ifOK ? thenSchema : elseSchema) {
Expand Down Expand Up @@ -957,7 +959,7 @@ namespace fleece {
}

// "properties": Specific property names with their own sub-schemas
for (Dict::iterator i(properties); i; ++i) {
for (Dict::iterator i(properties, sSchemaSharedKeys); i; ++i) {
slice key = i.keyString();
if (Value val = dict[key]) {
if (auto err = check(val, i.value(), schemaBase); !ok(err)) [[unlikely]]
Expand All @@ -969,7 +971,7 @@ namespace fleece {

// "patternProperties": Sub-schemas to apply to properties whose names match patterns
if (patternProperties) {
for (Dict::iterator i(patternProperties); i; ++i) {
for (Dict::iterator i(patternProperties, sSchemaSharedKeys); i; ++i) {
slice pattern = i.keyString();
for (Dict::iterator j(dict); j; ++j) {
slice dictKey = j.keyString();
Expand Down Expand Up @@ -1047,7 +1049,7 @@ namespace fleece {
"outOfRange",
"notMultiple",
"tooShort",
"tooLong"
"tooLong",
"patternMismatch",
"missingProperty",
"unknownProperty",
Expand Down
4 changes: 2 additions & 2 deletions Experimental/JSONSchema.hh
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ namespace fleece {
/// to register it, then call \ref validate again to retry.
std::string const& unknownSchemaID() const noexcept {return _unknownSchema;}

struct Result {Error error; Value value; Dict schema; slice schemaKey;};
struct Result {Error error; Value value; Value schema; slice schemaKey;};
static bool ok(Result const& e) noexcept {return e.error == Error::ok;}
private:
friend class JSONSchema;
Expand All @@ -164,7 +164,7 @@ namespace fleece {
struct pathItem { slice key; int index = -1; };

JSONSchema const& _schema; // Schema (unused after constructor)
RetainedValue _value; // The root Value being validated
Value _value; // The root Value being validated
Result _result {}; // Result of last check
std::string _unknownSchema; // Unknown schema ID found during validation
};
Expand Down
53 changes: 41 additions & 12 deletions Tests/SchemaTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "JSONSchema.hh"
#include "JSON5.hh"
#include "FleeceTests.hh"
#include <future>
#include <iostream>
#include <optional>
#include <string_view>
Expand Down Expand Up @@ -177,10 +178,11 @@ TEST_CASE_METHOD(SchemaTest,"JSON Schema Test Suite", "[Schema]") {


TEST_CASE("JSON Schema benchmark", "[.Perf]") {
static constexpr const char* kDataFile = "/Users/snej/Couchbase/DataSets/travel-sample/travel.json";
vector<Doc> database;
{
Benchmark bench;
FILE* in = fopen("/Users/snej/Couchbase/DataSets/travel-sample/travel.json", "r");
FILE* in = fopen(kDataFile, "r");
REQUIRE(in);
char* lineBuf = nullptr;
size_t bufSize = 0;
Expand All @@ -195,6 +197,9 @@ TEST_CASE("JSON Schema benchmark", "[.Perf]") {
bench.stop();
REQUIRE(doc);
database.push_back(std::move(doc));
#ifndef NDEBUG
//if (database.size() > 2000) {break;} // speeds up debugging
#endif
}
free(lineBuf);
fclose(in);
Expand All @@ -203,19 +208,43 @@ TEST_CASE("JSON Schema benchmark", "[.Perf]") {
bench.printReport(1.0, "document");
}

JSONSchema schema(readFile("/Users/snej/Couchbase/DataSets/travel-sample/travel-schema.json"));
JSONSchema schema(readFile((string(kTestFilesDir) + "travel-schema.json").c_str()));

Benchmark bench;
for (auto& doc : database) {
SECTION("Single-threaded") {
Benchmark bench;
for (auto& doc : database) {
bench.start();
auto result = schema.validate(doc.root());
bench.stop();
if (!result) {
slice id = doc.asDict()["_id"].asString();
FAIL("Doc " << id << " failed: " << result.errorString() << " at " << result.errorPath()
<< " (" << result.errorValue().toJSONString() << "), schema at " << result.errorSchemaURI());
}
}
fprintf(stderr, "Checked %zu documents: ", database.size());
bench.printReport(1.0, "document");
}

SECTION("Parallel") {
static const size_t kBatchSize = (database.size() + 15) / 16;
size_t const n = database.size();
vector<future<void>> futures;
Benchmark bench;
bench.start();
auto result = schema.validate(doc.root());
bench.stop();
if (!result) {
slice id = doc.asDict()["_id"].asString();
FAIL("Doc " << id << " failed: " << result.errorString() << " at " << result.errorPath()
<< ", schema at " << result.errorSchemaURI());
for (size_t taskFirst = 0; taskFirst < n; taskFirst += kBatchSize) {
futures.emplace_back( async(function([&](size_t first) {
size_t last = std::min(first + kBatchSize, n);
for (size_t i = first; i < last; ++i) {
auto result = schema.validate(database[i].root());
if (!result)
throw runtime_error("Validation failed!");
}
}), taskFirst));
}
for (auto& f : futures) f.wait();
bench.stop();
fprintf(stderr, "Checked %zu documents: ", database.size());
bench.printReport(1.0 / n, "document");
}
fprintf(stderr, "Checked %zu documents: ", database.size());
bench.printReport(1.0, "document");
}
Loading

0 comments on commit 19ea9fc

Please sign in to comment.