Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report error to PD server #43

Merged
merged 7 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 50 additions & 16 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use concurrency_manager::ConcurrencyManager;
use dashmap::DashMap;
use engine_traits::KvEngine;

use error_code::ErrorCodeExt;
use futures::executor::block_on;
use kvproto::brpb::StreamBackupError;
use kvproto::metapb::Region;
use pd_client::PdClient;
use raftstore::router::RaftStoreRouter;
Expand All @@ -30,10 +32,10 @@ use crate::errors::Error;
use crate::event_loader::InitialDataLoader;
use crate::metadata::store::{EtcdStore, MetaStore};
use crate::metadata::{MetadataClient, MetadataEvent, StreamTask};
use crate::metrics;
use crate::router::{ApplyEvents, Router, FLUSH_STORAGE_INTERVAL};
use crate::utils::{self, StopWatch};
use crate::{errors::Result, observer::BackupStreamObserver};
use crate::{metrics, try_send};

use online_config::ConfigChange;
use raftstore::coprocessor::{CmdBatch, ObserveHandle, RegionInfoProvider};
Expand Down Expand Up @@ -212,10 +214,38 @@ where
RT: RaftStoreRouter<E> + 'static,
PDC: PdClient + 'static,
{
fn on_fatal_error(&self, _task: String, _err: Box<Error>) {
// This is a stub.
// TODO: implement the feature of reporting fatal error to the meta storage,
// and pause the task then.
fn get_meta_client(&self) -> MetadataClient<S> {
self.meta_client.as_ref().unwrap().clone()
}

fn on_fatal_error(&self, task: String, err: Box<Error>) {
// Let's pause the task locally first.
joccau marked this conversation as resolved.
Show resolved Hide resolved
self.on_unregister(&task);

let meta_cli = self.get_meta_client();
let store_id = self.store_id;
let sched = self.scheduler.clone();
self.pool.block_on(async move {
// TODO: also pause the task using the meta client.
let err_fut = async {
meta_cli.pause(&task).await?;
let mut last_error = StreamBackupError::new();
last_error.set_error_code(err.error_code().code.to_owned());
last_error.set_error_message(err.to_string());
last_error.set_store_id(store_id);
last_error.set_happen_at(TimeStamp::physical_now());
meta_cli.report_last_error(&task, last_error).await?;
Result::Ok(())
};
if let Err(err_report) = err_fut.await {
err_report.report(format_args!("failed to upload error {}", err_report));
// Let's retry reporting after 5s.
3pointer marked this conversation as resolved.
Show resolved Hide resolved
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
try_send!(sched, Task::FatalError(task, err));
});
}
})
}

async fn starts_flush_ticks(router: Router) {
Expand Down Expand Up @@ -719,7 +749,20 @@ where
}
}

pub fn do_backup(&mut self, events: Vec<CmdBatch>) {
pub fn run_task(&self, task: Task) {
debug!("run backup stream task"; "task" => ?task);
match task {
Task::WatchTask(op) => self.handle_watch_task(op),
Task::BatchEvent(events) => self.do_backup(events),
Task::Flush(task) => self.on_flush(task, self.store_id),
Task::ModifyObserve(op) => self.on_modify_observe(op),
Task::ForceFlush(task) => self.on_force_flush(task, self.store_id),
Task::FatalError(task, err) => self.on_fatal_error(task, err),
_ => (),
}
}

pub fn do_backup(&self, events: Vec<CmdBatch>) {
for batch in events {
self.backup_batch(batch)
}
Expand Down Expand Up @@ -821,15 +864,6 @@ where
type Task = Task;

fn run(&mut self, task: Task) {
debug!("run backup stream task"; "task" => ?task);
match task {
Task::WatchTask(op) => self.handle_watch_task(op),
Task::BatchEvent(events) => self.do_backup(events),
Task::Flush(task) => self.on_flush(task, self.store_id),
Task::ModifyObserve(op) => self.on_modify_observe(op),
Task::ForceFlush(task) => self.on_force_flush(task, self.store_id),
Task::FatalError(task, err) => self.on_fatal_error(task, err),
_ => (),
}
self.run_task(task)
}
}
42 changes: 41 additions & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
},
};

use kvproto::brpb::StreamBackupTaskInfo;
use kvproto::brpb::{StreamBackupError, StreamBackupTaskInfo};

use tikv_util::{defer, time::Instant, warn};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -127,13 +127,53 @@ impl<Store: MetaStore> MetadataClient<Store> {
}
}

pub async fn report_last_error(&self, name: &str, last_error: StreamBackupError) -> Result<()> {
use protobuf::Message;
let now = Instant::now();
defer! {
super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_report_error"]).observe(now.saturating_elapsed_secs())
}

let key = MetaKey::last_error_of(name, self.store_id);
let mut value = Vec::with_capacity(last_error.compute_size() as _);
last_error.write_to_vec(&mut value)?;
self.meta_store.set(KeyValue(key, value)).await?;

Ok(())
}

pub async fn get_last_error(
&self,
name: &str,
store_id: u64,
) -> Result<Option<StreamBackupError>> {
let key = MetaKey::last_error_of(name, store_id);

let s = self.meta_store.snapshot().await?;
let r = s.get(Keys::Key(key)).await?;
if r.len() < 1 {
return Ok(None);
}
let r = &r[0];
let err = protobuf::parse_from_bytes(r.value())?;
Ok(Some(err))
}

/// check whether the task is paused.
pub async fn check_task_paused(&self, name: &str) -> Result<bool> {
let snap = self.meta_store.snapshot().await?;
let kvs = snap.get(Keys::Key(MetaKey::pause_of(name))).await?;
Ok(!kvs.is_empty())
}

/// pause a task.
pub async fn pause(&self, name: &str) -> Result<()> {
Ok(self
.meta_store
.set(KeyValue(MetaKey::pause_of(name), vec![]))
.await?)
}

pub async fn get_tasks_pause_status(&self) -> Result<HashMap<Vec<u8>, bool>> {
let snap = self.meta_store.snapshot().await?;
let kvs = snap.get(Keys::Prefix(MetaKey::pause_prefix())).await?;
Expand Down
5 changes: 5 additions & 0 deletions components/backup-stream/src/metadata/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const PATH_INFO: &str = "/info";
const PATH_NEXT_BACKUP_TS: &str = "/checkpoint";
const PATH_RANGES: &str = "/ranges";
const PATH_PAUSE: &str = "/pause";
const PATH_LAST_ERROR: &str = "/last-error";
// Note: maybe use something like `const_fmt` for concatenating constant strings?
const TASKS_PREFIX: &str = "/tidb/br-stream/info/";

Expand Down Expand Up @@ -123,6 +124,10 @@ impl MetaKey {
Self(format!("{}{}/{}", PREFIX, PATH_PAUSE, name).into_bytes())
}

pub fn last_error_of(name: &str, store: u64) -> Self {
Self(format!("{}{}/{}/{}", PREFIX, PATH_LAST_ERROR, name, store).into_bytes())
}

/// return the key that keeps the range [self, self.next()) contains only
/// `self`.
pub fn next(&self) -> Self {
Expand Down
4 changes: 1 addition & 3 deletions components/backup-stream/src/metadata/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ async fn test_watch() -> Result<()> {
assert_eq!(
events,
vec![
MetadataEvent::AddTask {
task: "simple_2".to_owned()
},
MetadataEvent::AddTask { task: task2 },
MetadataEvent::RemoveTask {
task: "simple_1".to_owned()
}
Expand Down
Loading