Skip to content

Commit

Permalink
JobManager related fix (vesoft-inc#4742)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
2 people authored and caton-hpg committed Oct 19, 2022
1 parent 0f1d6d4 commit 7da33c1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,9 +918,13 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Stop job failure!");
case nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE:
return Status::Error("Save job failure!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE:
case nebula::cpp2::ErrorCode::E_JOB_ALREADY_FINISH:
return Status::Error(
"Finished job or failed job can not be stopped, please start another job instead");
case nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE:
return Status::Error(
"The job type do not support stopping, either wait previous job done or restart the "
"cluster to start another job");
case nebula::cpp2::ErrorCode::E_BALANCER_FAILURE:
return Status::Error("Balance failure!");
case nebula::cpp2::ErrorCode::E_NO_INVALID_BALANCE_PLAN:
Expand Down
4 changes: 3 additions & 1 deletion src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,10 @@ enum ErrorCode {
E_TASK_REPORT_OUT_DATE = -2049, // Task report failed
E_JOB_NOT_IN_SPACE = -2050, // The current task is not in the graph space
E_JOB_NEED_RECOVER = -2051, // The current task needs to be resumed
E_JOB_NOT_STOPPABLE = -2052, // Failed or finished job could not be stopped
E_JOB_ALREADY_FINISH = -2052, // The job status has already been failed or finished
E_JOB_SUBMITTED = -2053, // Job default status.
E_JOB_NOT_STOPPABLE = -2054, // The given job do not support stop
E_JOB_HAS_NO_TARGET_STORAGE = -2055, // The leader distribution has not been reported, so can't send task to storage
E_INVALID_JOB = -2065, // Invalid task

// Backup Failure
Expand Down
15 changes: 9 additions & 6 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(

if (!optJobDesc.setStatus(jobStatus, force)) {
// job already been set as finished, failed or stopped
return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE;
return nebula::cpp2::ErrorCode::E_JOB_ALREADY_FINISH;
}

// If the job is marked as FAILED, one of the following will be triggered
Expand Down Expand Up @@ -373,17 +373,13 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(
optJobDesc.getStartTime(),
optJobDesc.getStopTime(),
optJobDesc.getErrorCode());
auto rc = save(jobKey, jobVal);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}

auto it = runningJobs_.find(jobId);
// Job has not started yet
if (it == runningJobs_.end()) {
// TODO job not existing in runningJobs_ also means leader changed, we handle it later
cleanJob(jobId);
return nebula::cpp2::ErrorCode::SUCCEEDED;
return save(jobKey, jobVal);
}

// Job has been started
Expand All @@ -398,12 +394,19 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(
cleanJob(jobId);
resetSpaceRunning(optJobDesc.getSpace());
}
// job has been stopped successfully, so update the job status
return save(jobKey, jobVal);
}
// job could not be stopped, so do not update the job status
return code;
} else {
// If the job is failed or finished, clean and call finish. We clean the job at first, no
// matter `finish` return SUCCEEDED or not. Because the job has already come to the end.
cleanJob(jobId);
auto rc = save(jobKey, jobVal);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
return jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED);
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/meta/processors/job/StorageJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ ErrOrHosts StorageJobExecutor::getLeaderHost(GraphSpaceID space) {
it->second.emplace_back(partId);
}
}
// If storage has not report leader distribution to meta and we don't report error here,
// JobMananger will think of the job consists of 0 task, and the task will not send to any
// storage. And the job will always be RUNNING.
if (hosts.empty()) {
return nebula::cpp2::ErrorCode::E_JOB_HAS_NO_TARGET_STORAGE;
}
return hosts;
}

Expand Down
11 changes: 11 additions & 0 deletions src/meta/test/GetStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ class GetStatsTest : public ::testing::Test {
mock::MockCluster cluster;
kv_ = cluster.initMetaKV(rootPath_->path());

// write some random leader key into kv, make sure that job will find a target storage
std::vector<nebula::kvstore::KV> data{
std::make_pair(MetaKeyUtils::leaderKey(1, 1), MetaKeyUtils::leaderValV3(HostAddr(), 1))};
folly::Baton<true, std::atomic> baton;
kv_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();

DefaultValue<folly::Future<Status>>::SetFactory(
[] { return folly::Future<Status>(Status::OK()); });
DefaultValue<folly::Future<StatusOr<bool>>>::SetFactory(
Expand Down
41 changes: 34 additions & 7 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,20 @@ class JobManagerTest : public ::testing::Test {
kv_ = cluster.initMetaKV(rootPath_->path());
GraphSpaceID spaceId = 1;
int32_t partitionNum = 1;
PartitionID partId = 1;
ASSERT_TRUE(TestUtils::createSomeHosts(kv_.get()));

// write some random leader key into kv, make sure that job will find a target storage
std::vector<nebula::kvstore::KV> data{std::make_pair(MetaKeyUtils::leaderKey(spaceId, partId),
MetaKeyUtils::leaderValV3(HostAddr(), 1))};
folly::Baton<true, std::atomic> baton;
kv_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();

TestUtils::assembleSpace(kv_.get(), spaceId, partitionNum);

// Make sure the rebuild job could find the index name.
Expand Down Expand Up @@ -748,18 +760,14 @@ TEST_F(JobManagerTest, NotStoppableJob) {
cpp2::JobType::REBUILD_FULLTEXT_INDEX,
// cpp2::JobType::DOWNLOAD, // download need hdfs command, it is unstoppable as well
cpp2::JobType::INGEST,
cpp2::JobType::LEADER_BALANCE};
// JobManangerTest has only 1 storage replica, and it won't trigger leader balance
// cpp2::JobType::LEADER_BALANCE
};
for (const auto& type : notStoppableJob) {
if (type != cpp2::JobType::LEADER_BALANCE) {
EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(
ByMove(folly::makeFuture<StatusOr<bool>>(true).delayed(std::chrono::seconds(1)))));
} else {
HostLeaderMap dist;
dist[HostAddr("0", 0)][1] = {1, 2, 3, 4, 5};
EXPECT_CALL(*adminClient_, getLeaderDist(_))
.WillOnce(testing::DoAll(SetArgPointee<0>(dist),
Return(ByMove(folly::Future<Status>(Status::OK())))));
}

JobDescription jobDesc(spaceId, jobId, type);
Expand All @@ -778,11 +786,21 @@ TEST_F(JobManagerTest, NotStoppableJob) {
auto tup = MetaKeyUtils::parseJobVal(value);
status = std::get<2>(tup);
}
EXPECT_EQ(cpp2::JobStatus::RUNNING, status);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
code = jobMgr->stopJob(spaceId, jobId);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE);
jobId++;

// check job status again, it still should be running
{
code = kv_->get(kDefaultSpaceId, kDefaultPartId, jobKey, &value);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);
auto tup = MetaKeyUtils::parseJobVal(value);
status = std::get<2>(tup);
EXPECT_EQ(cpp2::JobStatus::RUNNING, status);
}

// If the jobExecutor is still executing, resetSpaceRunning is not set in stoppJob
// When the jobExecutor completes, set the resetSpaceRunning of the meta job
// The resetSpaceRunning of the storage job is done in reportTaskFinish
Expand Down Expand Up @@ -853,6 +871,15 @@ TEST_F(JobManagerTest, StoppableJob) {
code = jobMgr->stopJob(spaceId, jobId);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);

// check job status again, it still should be running
{
code = kv_->get(kDefaultSpaceId, kDefaultPartId, jobKey, &value);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);
auto tup = MetaKeyUtils::parseJobVal(value);
status = std::get<2>(tup);
EXPECT_EQ(cpp2::JobStatus::STOPPED, status);
}

jobId++;
// If the jobExecutor is still executing, resetSpaceRunning is not set in stoppJob
// When the jobExecutor completes, set the resetSpaceRunning of the meta job
Expand Down

0 comments on commit 7da33c1

Please sign in to comment.