Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forbid adding schema properties if they existed before #5130

Merged
merged 8 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,16 +573,17 @@ SchemaVer MetaKeyUtils::parseEdgeVersion(folly::StringPiece key) {
*reinterpret_cast<const SchemaVer*>(key.begin() + offset);
}

SchemaVer MetaKeyUtils::getLatestEdgeScheInfo(kvstore::KVIterator* iter, folly::StringPiece& val) {
SchemaVer MetaKeyUtils::getLatestEdgeScheInfo(
kvstore::KVIterator* iter, std::unordered_map<SchemaVer, folly::StringPiece>& schemasRaw) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to schemaHistory or something else?

SchemaVer maxVer = MetaKeyUtils::parseEdgeVersion(iter->key());
val = iter->val();
schemasRaw.emplace(maxVer, iter->val());
iter->next();
while (iter->valid()) {
SchemaVer curVer = MetaKeyUtils::parseEdgeVersion(iter->key());
if (curVer > maxVer) {
maxVer = curVer;
val = iter->val();
}
schemasRaw.emplace(curVer, iter->val());
iter->next();
}
return maxVer;
Expand Down Expand Up @@ -613,16 +614,17 @@ SchemaVer MetaKeyUtils::parseTagVersion(folly::StringPiece key) {
*reinterpret_cast<const SchemaVer*>(key.begin() + offset);
}

SchemaVer MetaKeyUtils::getLatestTagScheInfo(kvstore::KVIterator* iter, folly::StringPiece& val) {
SchemaVer MetaKeyUtils::getLatestTagScheInfo(
kvstore::KVIterator* iter, std::unordered_map<SchemaVer, folly::StringPiece>& schemasRaw) {
SchemaVer maxVer = MetaKeyUtils::parseTagVersion(iter->key());
val = iter->val();
schemasRaw.emplace(maxVer, iter->val());
iter->next();
while (iter->valid()) {
SchemaVer curVer = MetaKeyUtils::parseTagVersion(iter->key());
if (curVer > maxVer) {
maxVer = curVer;
val = iter->val();
}
schemasRaw.emplace(curVer, iter->val());
iter->next();
}
return maxVer;
Expand Down
6 changes: 4 additions & 2 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,17 @@ class MetaKeyUtils final {

static SchemaVer parseEdgeVersion(folly::StringPiece key);

static SchemaVer getLatestEdgeScheInfo(kvstore::KVIterator* iter, folly::StringPiece& val);
static SchemaVer getLatestEdgeScheInfo(
kvstore::KVIterator* iter, std::unordered_map<SchemaVer, folly::StringPiece>& schemasRaw);

static std::string schemaTagKey(GraphSpaceID spaceId, TagID tagId, SchemaVer version);

static TagID parseTagId(folly::StringPiece key);

static SchemaVer parseTagVersion(folly::StringPiece key);

static SchemaVer getLatestTagScheInfo(kvstore::KVIterator* iter, folly::StringPiece& val);
static SchemaVer getLatestTagScheInfo(
kvstore::KVIterator* iter, std::unordered_map<SchemaVer, folly::StringPiece>& schemasRaw);

static std::string schemaTagPrefix(GraphSpaceID spaceId, TagID tagId);

Expand Down
22 changes: 13 additions & 9 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,21 @@ bool isLegalTypeConversion(cpp2::ColumnTypeDef from, cpp2::ColumnTypeDef to) {
}
} // namespace

nebula::cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vector<cpp2::ColumnDef>& cols,
cpp2::SchemaProp& prop,
const cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op,
bool isEdge) {
nebula::cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(
std::vector<cpp2::ColumnDef>& cols,
cpp2::SchemaProp& prop,
const cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op,
const std::vector<std::vector<cpp2::ColumnDef>>& allVersionedCols,
bool isEdge) {
switch (op) {
case cpp2::AlterSchemaOp::ADD:
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (it->get_name() == col.get_name()) {
LOG(INFO) << "Column existing: " << col.get_name();
return nebula::cpp2::ErrorCode::E_EXISTED;
for (auto& versionedCols : allVersionedCols) {
for (auto it = versionedCols.begin(); it != versionedCols.end(); ++it) {
if (it->get_name() == col.get_name()) {
LOG(ERROR) << "Column currently or previously existing: " << col.get_name();
return nebula::cpp2::ErrorCode::E_EXISTED;
}
}
}
cols.emplace_back(std::move(col));
Expand Down
13 changes: 8 additions & 5 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ class MetaServiceUtils final {
* ADD: add col to cols
* CHANGE: replace the column in cols with col
* DROP: remove the col from cols
* @param allVersionedCols All columns from all versioned schemas
* @param isEdge Is edge or tag
* @return
*/
static nebula::cpp2::ErrorCode alterColumnDefs(std::vector<cpp2::ColumnDef>& cols,
cpp2::SchemaProp& prop,
const cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op,
bool isEdge = false);
static nebula::cpp2::ErrorCode alterColumnDefs(
std::vector<cpp2::ColumnDef>& cols,
cpp2::SchemaProp& prop,
const cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op,
const std::vector<std::vector<cpp2::ColumnDef>>& allVersionedCols,
bool isEdge = false);

/**
* @brief Change schema property, mainly set ttl_col
Expand Down
20 changes: 13 additions & 7 deletions src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
}

// Parse version from edge type key
folly::StringPiece iterVal;
auto version = MetaKeyUtils::getLatestEdgeScheInfo(iter, iterVal) + 1;
auto schema = MetaKeyUtils::parseSchema(iterVal);
std::unordered_map<SchemaVer, folly::StringPiece> schemasRaw;
auto latestVersion = MetaKeyUtils::getLatestEdgeScheInfo(iter, schemasRaw);
auto newVersion = latestVersion + 1;
auto schema = MetaKeyUtils::parseSchema(schemasRaw[latestVersion]);
auto columns = schema.get_columns();
auto prop = schema.get_schema_prop();

std::vector<std::vector<cpp2::ColumnDef>> allVersionedColumns;
for (auto entry : schemasRaw) {
allVersionedColumns.emplace_back(MetaKeyUtils::parseSchema(entry.second).get_columns());
}

// Update schema column
auto& edgeItems = req.get_edge_items();

Expand Down Expand Up @@ -118,8 +124,8 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
for (auto& edgeItem : edgeItems) {
auto& cols = edgeItem.get_schema().get_columns();
for (auto& col : cols) {
auto retCode =
MetaServiceUtils::alterColumnDefs(columns, prop, col, *edgeItem.op_ref(), true);
auto retCode = MetaServiceUtils::alterColumnDefs(
columns, prop, col, *edgeItem.op_ref(), std::move(allVersionedColumns), true);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Alter edge column error " << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
Expand Down Expand Up @@ -150,8 +156,8 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {

std::vector<kvstore::KV> data;
LOG(INFO) << "Alter edge " << edgeName << ", edgeType " << edgeType << ", new version "
<< version;
data.emplace_back(MetaKeyUtils::schemaEdgeKey(spaceId, edgeType, version),
<< newVersion;
data.emplace_back(MetaKeyUtils::schemaEdgeKey(spaceId, edgeType, newVersion),
MetaKeyUtils::schemaVal(edgeName, schema));
resp_.id_ref() = to(edgeType, EntryType::EDGE);
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
Expand Down
19 changes: 13 additions & 6 deletions src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
return;
}

folly::StringPiece iterVal;
auto version = MetaKeyUtils::getLatestTagScheInfo(iter, iterVal) + 1;
auto schema = MetaKeyUtils::parseSchema(iterVal);
std::unordered_map<SchemaVer, folly::StringPiece> schemasRaw;
auto latestVersion = MetaKeyUtils::getLatestTagScheInfo(iter, schemasRaw);
auto newVersion = latestVersion + 1;
auto schema = MetaKeyUtils::parseSchema(schemasRaw[latestVersion]);
auto columns = schema.get_columns();
auto prop = schema.get_schema_prop();

std::vector<std::vector<cpp2::ColumnDef>> allVersionedColumns;
for (auto entry : schemasRaw) {
allVersionedColumns.emplace_back(MetaKeyUtils::parseSchema(entry.second).get_columns());
}

// Update schema column
auto& tagItems = req.get_tag_items();

Expand Down Expand Up @@ -115,7 +121,8 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
for (auto& tagItem : tagItems) {
auto& cols = tagItem.get_schema().get_columns();
for (auto& col : cols) {
auto retCode = MetaServiceUtils::alterColumnDefs(columns, prop, col, *tagItem.op_ref());
auto retCode = MetaServiceUtils::alterColumnDefs(
columns, prop, col, *tagItem.op_ref(), std::move(allVersionedColumns));
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Alter tag column error " << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
Expand Down Expand Up @@ -144,8 +151,8 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
schema.columns_ref() = std::move(columns);

std::vector<kvstore::KV> data;
LOG(INFO) << "Alter Tag " << tagName << ", tagId " << tagId << ", new version " << version;
data.emplace_back(MetaKeyUtils::schemaTagKey(spaceId, tagId, version),
LOG(INFO) << "Alter Tag " << tagName << ", tagId " << tagId << ", new version " << newVersion;
data.emplace_back(MetaKeyUtils::schemaTagKey(spaceId, tagId, newVersion),
MetaKeyUtils::schemaVal(tagName, schema));
resp_.id_ref() = to(tagId, EntryType::TAG);
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/schema/GetEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) {
onFinished();
return;
}
folly::StringPiece iterVal;
MetaKeyUtils::getLatestEdgeScheInfo(iter, iterVal);
schemaValue = iterVal.str();
std::unordered_map<SchemaVer, folly::StringPiece> schemasRaw;
auto latestVersion = MetaKeyUtils::getLatestEdgeScheInfo(iter, schemasRaw);
schemaValue = schemasRaw[latestVersion].str();
} else { // Get given version
auto edgeKey = MetaKeyUtils::schemaEdgeKey(spaceId, edgeType, ver);
auto ret = doGet(edgeKey);
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/schema/GetTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ void GetTagProcessor::process(const cpp2::GetTagReq& req) {
onFinished();
return;
}
folly::StringPiece iterVal;
MetaKeyUtils::getLatestTagScheInfo(iter, iterVal);
schemaValue = iterVal.str();
std::unordered_map<SchemaVer, folly::StringPiece> schemasRaw;
auto latestVersion = MetaKeyUtils::getLatestTagScheInfo(iter, schemasRaw);
schemaValue = schemasRaw[latestVersion].str();
} else {
auto tagKey = MetaKeyUtils::schemaTagKey(spaceId, tagId, ver);
auto ret = doGet(tagKey);
Expand Down
Loading