Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support safe mode to return checksum error during iteration #169

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/libjungle/db_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#pragma once

#include "record.h"
#include "status.h"

#include <functional>
#include <vector>
Expand Down Expand Up @@ -141,6 +142,7 @@ class DBConfig {
, purgeDeletedDocImmediately(true)
, fastIndexScan(false)
, seqLoadingDelayFactor(0)
, safeMode(false)
{
tableSizeRatio.push_back(2.5);
levelSizeRatio.push_back(10.0);
Expand Down Expand Up @@ -565,6 +567,14 @@ class DBConfig {
* The custom manifest files should be created by `cloneManifest()` API.
*/
std::string customManifestPath;

/**
* If DB files are corrupted, Jungle will try to avoid the process crash
* as much as possible.
* This mode will slow down operations, thus should not be used in
* real production environment.
*/
bool safeMode;
};

class GlobalConfig {
Expand Down
3 changes: 3 additions & 0 deletions src/db_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ class Iterator::IteratorInternal {
avl_tree curWindow;
avl_node* windowCursor;
Iterator* parent;

// Intolerable error detected. If set, we cannot proceed iteration.
Status fatalError;
};

struct GlobalBatchStatus {
Expand Down
35 changes: 33 additions & 2 deletions src/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ Status Iterator::init(DB* dd,
p->db->p->tableMgr,
start_key,
end_key );
if (s == Status::CHECKSUM_ERROR || s == Status::FILE_CORRUPTION) {
// Intolerable error.
DELETE(ctx_table->tableItr);
DELETE(ctx_table);
close();
return s;
}

if (s) s = ctx_table->tableItr->get(ctx_table->lastRec);
if (s) {
avl_node* avl_ret =
Expand Down Expand Up @@ -144,6 +152,7 @@ Status Iterator::initSN(DB* db,
}

Status Iterator::get(Record& rec_out) {
if (!p->fatalError.ok()) return p->fatalError;
if (!p || !p->db) return Status::NOT_INITIALIZED;
if (!p->windowCursor) return Status::KEY_NOT_FOUND;
if (p && p->db) p->db->p->updateOpHistory();
Expand All @@ -160,6 +169,7 @@ Status Iterator::get(Record& rec_out) {
}

Status Iterator::prev() {
if (!p->fatalError.ok()) return p->fatalError;
if (!p || !p->db) return Status::NOT_INITIALIZED;
if (!p->windowCursor) return Status::OUT_OF_RANGE;
if (p && p->db) p->db->p->updateOpHistory();
Expand Down Expand Up @@ -196,12 +206,22 @@ Status Iterator::prev() {
if (item->tableItr) {
s = item->tableItr->get(item->lastRec);
}
assert(s);

avl_cmp_func* cmp_func = (p->type == ItrInt::BY_SEQ)
? (ItrInt::ItrItem::cmpSeq)
: (ItrInt::ItrItem::cmpKey);
avl_node* avl_ret = avl_insert(&p->curWindow, &item->an, cmp_func);

if (s == Status::CHECKSUM_ERROR || s == Status::FILE_CORRUPTION) {
// Intolerable error.
p->fatalError = s;
cur_key.free();

// To make next `get()` call return error,
// return this function without error.
return Status();
}

assert(avl_ret == &item->an);
(void)avl_ret;
cursor = avl_last(&p->curWindow);
Expand Down Expand Up @@ -257,6 +277,7 @@ Status Iterator::prev() {
}

Status Iterator::next() {
if (!p->fatalError.ok()) return p->fatalError;
if (!p || !p->db) return Status::NOT_INITIALIZED;
if (!p->windowCursor) return Status::OUT_OF_RANGE;
if (p && p->db) p->db->p->updateOpHistory();
Expand Down Expand Up @@ -293,12 +314,22 @@ Status Iterator::next() {
if (item->tableItr) {
s = item->tableItr->get(item->lastRec);
}
assert(s);

avl_cmp_func* cmp_func = (p->type == ItrInt::BY_SEQ)
? (ItrInt::ItrItem::cmpSeq)
: (ItrInt::ItrItem::cmpKey);
avl_node* avl_ret = avl_insert(&p->curWindow, &item->an, cmp_func);

if (s == Status::CHECKSUM_ERROR || s == Status::FILE_CORRUPTION) {
// Intolerable error.
p->fatalError = s;
cur_key.free();

// To make next `get()` call return error,
// return this function without error.
return Status();
}

assert(avl_ret == &item->an);
(void)avl_ret;
cursor = avl_first(&p->curWindow);
Expand Down
14 changes: 11 additions & 3 deletions src/jungle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,20 @@ Status DB::getNearestRecordByKey(const SizedBuf& key,
Record::Holder h_rec_from_log(rec_from_log);
uint64_t chknum = (sn)?(sn->chkNum):(NOT_INITIALIZED);
std::list<LogFileInfo*>* l_list = (sn)?(sn->logList):(nullptr);
p->logMgr->getNearest(chknum, l_list, key, rec_from_log, opt);
Status ls = p->logMgr->getNearest(chknum, l_list, key, rec_from_log, opt);
if (!ls.ok() && ls != Status::KEY_NOT_FOUND) {
// Intolerable error.
return ls;
}

Record rec_from_table;
Record::Holder h_rec_from_table(rec_from_table);
DB* snap_handle = (this->sn)?(this):(nullptr);
p->tableMgr->getNearest(snap_handle, key, rec_from_table, opt, meta_only);
Status ts = p->tableMgr->getNearest(snap_handle, key, rec_from_table, opt, meta_only);
if (!ts.ok() && ts != Status::KEY_NOT_FOUND) {
// Intolerable error.
return ts;
}

if (rec_from_log.empty() && rec_from_table.empty()) {
// Not found from both.
Expand Down Expand Up @@ -793,7 +801,7 @@ Status DB::getNearestRecordByKey(const SizedBuf& key,
// Choose the one from table.
rec_from_table.moveTo(rec_out);
}
return s;
return Status::OK;
}

Status DB::getRecordsByPrefix(const SizedBuf& prefix,
Expand Down
32 changes: 29 additions & 3 deletions src/table_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ TableFile::~TableFile() {
DELETE(tlbByKey);
}

Status TableFile::toJungleStatus(fdb_status fdb_s) {
switch (fdb_s) {
case FDB_RESULT_CHECKSUM_ERROR:
return Status::CHECKSUM_ERROR;
case FDB_RESULT_FILE_CORRUPTION:
return Status::FILE_CORRUPTION;
default:
return Status::ERROR;
}
}

std::string TableFile::getTableFileName(const std::string& path,
uint64_t prefix_num,
uint64_t table_file_num)
Expand Down Expand Up @@ -1146,7 +1157,12 @@ Status TableFile::get(DB* snap_handle,
}
}
if (fs != FDB_RESULT_SUCCESS) {
return Status::KEY_NOT_FOUND;
if (fs == FDB_RESULT_KEY_NOT_FOUND) {
return Status::KEY_NOT_FOUND;
}
// Otherwise, error.
_log_err(myLog, "fdb_get failed: %d", fs);
return toJungleStatus(fs);
}

try {
Expand Down Expand Up @@ -1274,7 +1290,12 @@ Status TableFile::getNearest(DB* snap_handle,
}
}
if (fs != FDB_RESULT_SUCCESS) {
return Status::KEY_NOT_FOUND;
if (fs == FDB_RESULT_KEY_NOT_FOUND) {
return Status::KEY_NOT_FOUND;
}
// Otherwise, error.
_log_err(myLog, "fdb_get failed: %d", fs);
return toJungleStatus(fs);
}

try {
Expand Down Expand Up @@ -1367,7 +1388,12 @@ Status TableFile::getPrefix(DB* snap_handle,
doc,
nearest_opt);
if (fs != FDB_RESULT_SUCCESS) {
return Status::KEY_NOT_FOUND;
if (fs == FDB_RESULT_KEY_NOT_FOUND) {
return Status::KEY_NOT_FOUND;
}
// Otherwise, error.
_log_err(myLog, "fdb_get failed: %d", fs);
return toJungleStatus(fs);
}

// Find next greater key.
Expand Down
4 changes: 4 additions & 0 deletions src/table_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class TableFile {
static uint64_t getBfSizeByWss(const DBConfig* db_config,
uint64_t wss);

static Status toJungleStatus(fdb_status fdb_s);

FdbHandle* getIdleHandle();

void returnHandle(FdbHandle* f_handle);
Expand Down Expand Up @@ -287,6 +289,8 @@ class TableFile {
fdb_iterator* fdbItr;
uint64_t minSeq;
uint64_t maxSeq;

bool safeMode;
};

Status updateSnapshot();
Expand Down
32 changes: 28 additions & 4 deletions src/table_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ TableFile::Iterator::Iterator()
, fdbItr(nullptr)
, minSeq(NOT_INITIALIZED)
, maxSeq(NOT_INITIALIZED)
, safeMode(false)
{}

TableFile::Iterator::~Iterator() {
Expand All @@ -41,6 +42,9 @@ Status TableFile::Iterator::init(DB* snap_handle,
const SizedBuf& end_key)
{
tFile = t_file;
safeMode = (tFile && tFile->tableMgr)
? tFile->tableMgr->getDbConfig()->safeMode : false;

fdb_status fs;
Status s;

Expand Down Expand Up @@ -167,10 +171,15 @@ Status TableFile::Iterator::get(Record& rec_out) {
fdb_doc tmp_doc;
memset(&tmp_doc, 0x0, sizeof(tmp_doc));

fdb_doc *doc = &tmp_doc;
fdb_doc *doc = safeMode ? nullptr : &tmp_doc;
fs = fdb_iterator_get(fdbItr, &doc);
if (fs != FDB_RESULT_SUCCESS) {
return Status::ERROR;
if (fs == FDB_RESULT_KEY_NOT_FOUND) {
return Status::KEY_NOT_FOUND;
}
// Otherwise, error.
_log_err(tFile->myLog, "fdb_iterator_get failed: %d", fs);
return toJungleStatus(fs);
}

rec_out.kv.key.set(doc->keylen, doc->key);
Expand Down Expand Up @@ -201,11 +210,17 @@ Status TableFile::Iterator::get(Record& rec_out) {
? Record::DELETION
: Record::INSERTION;

if (safeMode && doc) {
free(doc);
}
return Status();

} catch (Status s) {
rec_out.kv.free();
rec_out.meta.free();
if (safeMode && doc) {
free(doc);
}
return s;
}
}
Expand All @@ -221,10 +236,15 @@ Status TableFile::Iterator::getMeta(Record& rec_out,
fdb_doc tmp_doc;
memset(&tmp_doc, 0x0, sizeof(tmp_doc));

fdb_doc *doc = &tmp_doc;
fdb_doc *doc = safeMode ? nullptr : &tmp_doc;
fs = fdb_iterator_get_metaonly(fdbItr, &doc);
if (fs != FDB_RESULT_SUCCESS) {
return Status::ERROR;
if (fs == FDB_RESULT_KEY_NOT_FOUND) {
return Status::KEY_NOT_FOUND;
}
// Otherwise, error.
_log_err(tFile->myLog, "fdb_iterator_get failed: %d", fs);
return toJungleStatus(fs);
}

rec_out.kv.key.set(doc->keylen, doc->key);
Expand All @@ -248,6 +268,10 @@ Status TableFile::Iterator::getMeta(Record& rec_out,
? Record::DELETION
: Record::INSERTION;

if (safeMode && doc) {
free(doc);
}

return Status();
}

Expand Down
Loading
Loading