diff --git a/be/src/common/greplog.cpp b/be/src/common/greplog.cpp index ec5bb1d25ab63..991ecab8da4fe 100644 --- a/be/src/common/greplog.cpp +++ b/be/src/common/greplog.cpp @@ -24,6 +24,7 @@ #include "gutil/strings/substitute.h" #include "hs/hs_compile.h" #include "hs/hs_runtime.h" +#include "service/backend_options.h" #include "util/defer_op.h" using namespace std; @@ -33,16 +34,18 @@ namespace starrocks { static std::vector list_log_files_in_dir(const string& log_dir, char level) { std::vector files; // if level in WARNING, ERROR, FATAL, use logging logs, else use info logs - const std::string pattern = string("WEF").find(level) == string::npos ? "be.INFO.log." : "be.WARNING.log."; + const std::string process = BackendOptions::is_cn() ? "cn" : "be"; + const std::string pattern = process + (string("WEF").find(level) == string::npos ? ".INFO.log." : ".WARNING.log."); for (const auto& entry : filesystem::directory_iterator(log_dir)) { if (entry.is_regular_file()) { auto name = entry.path().filename().string(); - if (name.length() > pattern.length() && name.substr(0, pattern.length()) == pattern) { + if (name.length() > pattern.length() && name.find(pattern) != string::npos) { files.push_back(entry.path().string()); } } } std::sort(files.begin(), files.end(), std::greater()); + LOG_IF(WARNING, files.empty()) << "list_log_files_in_dir failed, no log files in " << log_dir; return files; } @@ -180,7 +183,7 @@ Status grep_log_single_file(const string& path, int64_t start_ts, int64_t end_ts ctx.line_len = read; if (database == nullptr) { // no pattern, add all lines - scan_by_line_handler(0, 0, 0, 0, &ctx); + scan_by_line_handler(0, 0, read, 0, &ctx); } else { if (hs_scan(database, line, read, 0, scratch, scan_by_line_handler, &ctx) != HS_SUCCESS) { break; @@ -195,6 +198,7 @@ Status grep_log_single_file(const string& path, int64_t start_ts, int64_t end_ts Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit, std::deque& entries) { + level = std::toupper(level); const string log_dir = config::sys_log_dir; if (log_dir.empty()) { return Status::InternalError(strings::Substitute("grep log failed $0 is empty", log_dir)); @@ -206,7 +210,8 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string& hs_database_t* database = nullptr; if (!pattern.empty()) { hs_compile_error_t* compile_err; - if (hs_compile(pattern.c_str(), 0, HS_MODE_BLOCK, NULL, &database, &compile_err) != HS_SUCCESS) { + if (hs_compile(pattern.c_str(), HS_FLAG_SINGLEMATCH, HS_MODE_BLOCK, nullptr, &database, &compile_err) != + HS_SUCCESS) { hs_free_compile_error(compile_err); return Status::InternalError( strings::Substitute("grep log failed compile pattern $0 failed $1", pattern, compile_err->message)); @@ -253,10 +258,11 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string& return Status::OK(); } -std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit) { +std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, const std::string& level, const std::string& pattern, + size_t limit) { std::ostringstream ss; std::deque entries; - auto st = grep_log(start_ts, end_ts, level, pattern, limit, entries); + auto st = grep_log(start_ts, end_ts, level[0], pattern, limit, entries); if (!st.ok()) { ss << strings::Substitute("grep log failed $0 start_ts:$1 end_ts:$2 level:$3 pattern:$4 limit:$5\n", st.to_string(), start_ts, end_ts, level, pattern, limit); diff --git a/be/src/common/greplog.h b/be/src/common/greplog.h index 005878d80102a..71884efeb1cc6 100644 --- a/be/src/common/greplog.h +++ b/be/src/common/greplog.h @@ -48,6 +48,7 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string& * Grep log file and return all line as whole string, parameters are same as grep_log * @return log string */ -std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit); +std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, const std::string& level, const std::string& pattern, + size_t limit); } // namespace starrocks diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index b234f076dc337..77c7a917d2933 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -4,6 +4,7 @@ #include +#include "agent/master_info.h" #include "exec/workgroup/work_group.h" #include "gutil/strings/substitute.h" #include "runtime/current_thread.h" @@ -135,6 +136,9 @@ void GlobalDriverExecutor::_worker_thread() { } if (!status.ok()) { + auto o_id = get_backend_id(); + int64_t be_id = o_id.has_value() ? o_id.value() : -1; + status = status.clone_and_append(fmt::format("BE:{}", be_id)); LOG(WARNING) << "[Driver] Process error, query_id=" << print_id(driver->query_ctx()->query_id()) << ", instance_id=" << print_id(driver->fragment_ctx()->fragment_instance_id()) << ", status=" << status; diff --git a/be/src/exec/vectorized/schema_scanner/schema_be_logs_scanner.cpp b/be/src/exec/vectorized/schema_scanner/schema_be_logs_scanner.cpp index a3095981f10e3..340781f6508ee 100644 --- a/be/src/exec/vectorized/schema_scanner/schema_be_logs_scanner.cpp +++ b/be/src/exec/vectorized/schema_scanner/schema_be_logs_scanner.cpp @@ -45,7 +45,7 @@ Status SchemaBeLogsScanner::start(RuntimeState* state) { if (_param->log_end_ts > 0) { end_ts = _param->log_end_ts; } - string level; + string level = "I"; string pattern; if (_param->log_level != nullptr) { level = *_param->log_level; diff --git a/be/src/http/action/greplog_action.cpp b/be/src/http/action/greplog_action.cpp index 214b189926a42..2399afe43641c 100644 --- a/be/src/http/action/greplog_action.cpp +++ b/be/src/http/action/greplog_action.cpp @@ -66,7 +66,7 @@ void GrepLogAction::handle(HttpRequest* req) { return; } - auto ret = grep_log_as_string(start_ts, end_ts, std::toupper(level[0]), pattern, limit); + auto ret = grep_log_as_string(start_ts, end_ts, level, pattern, limit); HttpChannel::send_reply(req, HttpStatus::OK, ret); } diff --git a/be/src/script/script.cpp b/be/src/script/script.cpp index 9140334754cfd..d34d812fb2cc0 100644 --- a/be/src/script/script.cpp +++ b/be/src/script/script.cpp @@ -27,6 +27,7 @@ #include "io/io_profiler.h" #include "runtime/exec_env.h" #include "runtime/mem_tracker.h" +#include "storage/del_vector.h" #include "storage/storage_engine.h" #include "storage/tablet.h" #include "storage/tablet_manager.h" @@ -275,6 +276,16 @@ class StorageEngineRef { } } + // this method is specifically used to recover "no delete vector found" error caused by corrupt pk tablet metadata + static std::string reset_delvec(int64_t tablet_id, int64_t segment_id, int64_t version) { + auto tablet = get_tablet(tablet_id); + RETURN_IF_UNLIKELY_NULL(tablet, "tablet not found"); + DelVector dv; + dv.init(version, nullptr, 0); + auto st = TabletMetaManager::set_del_vector(tablet->data_dir()->get_meta(), tablet_id, segment_id, dv); + return st.to_string(); + } + static size_t submit_manual_compaction_task_for_table(int64_t table_id, int64_t rowset_size_threshold) { auto infos = get_tablet_infos(table_id, -1); for (auto& info : infos) { @@ -452,6 +463,7 @@ class StorageEngineRef { REG_STATIC_METHOD(StorageEngineRef, get_tablet_info); REG_STATIC_METHOD(StorageEngineRef, get_tablet_infos); REG_STATIC_METHOD(StorageEngineRef, get_tablet_meta_json); + REG_STATIC_METHOD(StorageEngineRef, reset_delvec); REG_STATIC_METHOD(StorageEngineRef, get_tablet); REG_STATIC_METHOD(StorageEngineRef, drop_tablet); REG_STATIC_METHOD(StorageEngineRef, get_data_dirs); diff --git a/be/src/service/backend_options.cpp b/be/src/service/backend_options.cpp index 649c670882419..46a9970990fef 100644 --- a/be/src/service/backend_options.cpp +++ b/be/src/service/backend_options.cpp @@ -34,7 +34,14 @@ std::string BackendOptions::_s_localhost; std::vector BackendOptions::_s_priority_cidrs; TBackend BackendOptions::_backend; -bool BackendOptions::init() { +bool BackendOptions::_is_cn = false; + +bool BackendOptions::is_cn() { + return _is_cn; +} + +bool BackendOptions::init(bool is_cn) { + _is_cn = is_cn; if (!analyze_priority_cidrs()) { return false; } diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h index 5f2c648383447..54eb6c667ed06 100644 --- a/be/src/service/backend_options.h +++ b/be/src/service/backend_options.h @@ -30,10 +30,11 @@ class CIDR; class BackendOptions { public: - static bool init(); + static bool init(bool is_cn); static std::string get_localhost(); static TBackend get_localBackend(); static void set_localhost(const std::string& host); + static bool is_cn(); private: static bool analyze_priority_cidrs(); @@ -42,6 +43,7 @@ class BackendOptions { static std::string _s_localhost; static std::vector _s_priority_cidrs; static TBackend _backend; + static bool _is_cn; BackendOptions(const BackendOptions&) = delete; const BackendOptions& operator=(const BackendOptions&) = delete; diff --git a/be/src/service/starrocks_main.cpp b/be/src/service/starrocks_main.cpp index 234e706c23d1c..7e06abe58e0a3 100644 --- a/be/src/service/starrocks_main.cpp +++ b/be/src/service/starrocks_main.cpp @@ -254,7 +254,7 @@ int main(int argc, char** argv) { EXIT_IF_ERROR(starrocks::JDBCDriverManager::getInstance()->init(std::string(getenv("STARROCKS_HOME")) + "/lib/jdbc_drivers")); - if (!starrocks::BackendOptions::init()) { + if (!starrocks::BackendOptions::init(as_cn)) { exit(-1); } diff --git a/be/test/storage/tablet_updates_test.cpp b/be/test/storage/tablet_updates_test.cpp index 9f6c480f7fa54..55478259adddd 100644 --- a/be/test/storage/tablet_updates_test.cpp +++ b/be/test/storage/tablet_updates_test.cpp @@ -14,6 +14,7 @@ #include "fs/fs.h" #include "gutil/strings/substitute.h" #include "runtime/runtime_state.h" +#include "script/script.h" #include "storage/chunk_helper.h" #include "storage/empty_iterator.h" #include "storage/kv_store.h" @@ -1004,6 +1005,12 @@ void TabletUpdatesTest::test_writeread(bool enable_persistent_index) { auto rs0 = create_rowset(_tablet, keys); ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok()); ASSERT_EQ(2, _tablet->updates()->max_version()); + + string o; + ASSERT_TRUE(execute_script(fmt::format("StorageEngine.reset_delvec({}, {}, 2)", _tablet->tablet_id(), 0), o).ok()); + ASSERT_TRUE(execute_script("System.print(ExecEnv.grep_log_as_string(0,0,\"I\",\"tablet_manager\",1))", o).ok()); + LOG(INFO) << "grep log: " << o; + auto rs1 = create_rowset(_tablet, keys); ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok()); ASSERT_EQ(3, _tablet->updates()->max_version());