Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Nov 16, 2023
1 parent a65997a commit a6c844f
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 114 deletions.
174 changes: 107 additions & 67 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::iter::once;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -49,8 +48,9 @@ use tracing::{error, info, warn};

use super::fetch::FetchStreamTask;
use super::models::IngesterShard;
use super::mrecord::MRecord;
use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity};
use super::mrecordlog_utils::{
append_doc_batch, append_eof_record_if_necessary, check_enough_capacity, AppendDocBatchError,
};
use super::rate_limiter::{RateLimiter, RateLimiterSettings};
use super::replication::{
ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle,
Expand Down Expand Up @@ -101,6 +101,49 @@ pub(super) struct IngesterState {
pub observation_tx: watch::Sender<IngestV2Result<ObservationMessage>>,
}

impl IngesterState {
async fn close_shards(&mut self, queue_ids: &[QueueId]) {
for queue_id in queue_ids {
append_eof_record_if_necessary(&mut self.mrecordlog, queue_id).await;

if let Some(shard) = self.shards.get_mut(queue_id) {
shard.shard_state = ShardState::Closed;
shard.notify_new_records();
}
}
// TODO: Handle replicated shards.
}

pub async fn decommission(&mut self) {
let queue_ids: Vec<QueueId> = self.shards.keys().cloned().collect();
self.close_shards(&queue_ids).await;

self.status = IngesterStatus::Decommissioning;
self.check_decommissioning_status();
}

/// Checks whether the ingester is fully decommissioned and updates its status accordingly.
fn check_decommissioning_status(&mut self) {
if self.status != IngesterStatus::Decommissioning {
return;
}
if self.shards.values().all(|shard| {
shard.shard_state.is_closed() && shard.truncation_position_inclusive == Position::Eof
}) {
info!("ingester fully decommissioned");
self.status = IngesterStatus::Decommissioned;

self.observation_tx.send_if_modified(|observation_result| {
if let Ok(observation) = observation_result {
observation.status = IngesterStatus::Decommissioned as i32;
return true;
}
false
});
}
}
}

impl Ingester {
pub async fn try_new(
self_node_id: NodeId,
Expand Down Expand Up @@ -158,27 +201,6 @@ impl Ingester {
Ok(ingester)
}

/// Checks whether the ingester is fully decommissioned and updates its status accordingly.
fn check_decommissioning_status(&self, state: &mut IngesterState) {
if state.status != IngesterStatus::Decommissioning {
return;
}
if state.shards.values().all(|shard| {
shard.shard_state.is_closed() && shard.truncation_position_inclusive == Position::Eof
}) {
info!("ingester fully decommissioned");
state.status = IngesterStatus::Decommissioned;

state.observation_tx.send_if_modified(|observation_result| {
if let Ok(observation) = observation_result {
observation.status = IngesterStatus::Decommissioned as i32;
return true;
}
false
});
}
}

async fn init(&self) -> IngestV2Result<()> {
let mut state_guard = self.state.write().await;

Expand Down Expand Up @@ -246,18 +268,6 @@ impl Ingester {
Ok(entry.or_insert(shard))
}

async fn close_shards_inner(&self, state: &mut IngesterState, queue_ids: &[QueueId]) {
for queue_id in queue_ids {
append_eof_record_if_necessary(&mut state.mrecordlog, queue_id).await;

if let Some(shard) = state.shards.get_mut(queue_id) {
shard.shard_state = ShardState::Closed;
shard.notify_new_records();
}
}
// TODO: Handle replicated shards.
}

async fn init_replication_stream(
&self,
state: &mut IngesterState,
Expand Down Expand Up @@ -322,6 +332,7 @@ impl IngesterService for Ingester {
let mut persist_successes = Vec::with_capacity(persist_request.subrequests.len());
let mut persist_failures = Vec::new();
let mut replicate_subrequests: HashMap<NodeId, Vec<ReplicateSubrequest>> = HashMap::new();
let mut should_decommission = false;

let commit_type = persist_request.commit_type();
let force_commit = commit_type == CommitTypeV2::Force;
Expand Down Expand Up @@ -350,6 +361,17 @@ impl IngesterService for Ingester {
return Ok(persist_response);
}
for subrequest in persist_request.subrequests {
if should_decommission {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardClosed as i32,
};
persist_failures.push(persist_failure);
continue;
}
let queue_id = subrequest.queue_id();
let follower_id_opt: Option<NodeId> = subrequest.follower_id.map(Into::into);
let shard = if let Some(shard) = state_guard.shards.get_mut(&queue_id) {
Expand Down Expand Up @@ -435,25 +457,41 @@ impl IngesterService for Ingester {
persist_failures.push(persist_failure);
continue;
}
let current_position_inclusive: Position = if force_commit {
let encoded_mrecords = doc_batch
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
}
.into();
let append_result = append_doc_batch(
&mut state_guard.mrecordlog,
&queue_id,
&doc_batch,
force_commit,
)
.await;

let current_position_inclusive = match append_result {
Ok(current_position_inclusive) => current_position_inclusive,
Err(append_error) => {
let reason = match &append_error {
AppendDocBatchError::Io(_) => {
error!(
"failed to append records to shard `{queue_id}`: {append_error}"
);
should_decommission = true;
PersistFailureReason::ShardClosed
}
AppendDocBatchError::QueueNotFound(_) => {
warn!("{append_error}");
PersistFailureReason::ShardNotFound
}
};
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: reason as i32,
};
persist_failures.push(persist_failure);
continue;
}
};
let batch_num_bytes = doc_batch.num_bytes() as u64;
let batch_num_docs = doc_batch.num_docs() as u64;

Expand Down Expand Up @@ -549,6 +587,7 @@ impl IngesterService for Ingester {
// already.
let persist_failure_reason = match replicate_failure.reason() {
ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified,
ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound,
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
ReplicateFailureReason::ResourceExhausted => {
PersistFailureReason::ResourceExhausted
Expand All @@ -564,7 +603,13 @@ impl IngesterService for Ingester {
persist_failures.push(persist_failure);
}
}
if should_decommission {
error!("decommissioning ingester after IO error");
let mut state_guard = self.state.write().await;
state_guard.decommission().await;
}
let leader_id = self.self_node_id.to_string();

let persist_response = PersistResponse {
leader_id,
successes: persist_successes,
Expand Down Expand Up @@ -683,7 +728,7 @@ impl IngesterService for Ingester {
.flat_map(|shards| shards.queue_ids())
.collect();

self.close_shards_inner(&mut state_guard, &queue_ids).await;
state_guard.close_shards(&queue_ids).await;

Ok(CloseShardsResponse {})
}
Expand Down Expand Up @@ -733,7 +778,7 @@ impl IngesterService for Ingester {
}
}
}
self.check_decommissioning_status(&mut state_guard);
state_guard.check_decommissioning_status();
let truncate_response = TruncateShardsResponse {};
Ok(truncate_response)
}
Expand All @@ -744,12 +789,7 @@ impl IngesterService for Ingester {
) -> IngestV2Result<DecommissionResponse> {
info!("decommissioning ingester");
let mut state_guard = self.state.write().await;

let queue_ids: Vec<QueueId> = state_guard.shards.keys().cloned().collect();
self.close_shards_inner(&mut state_guard, &queue_ids).await;

state_guard.status = IngesterStatus::Decommissioning;
self.check_decommissioning_status(&mut state_guard);
state_guard.decommission().await;

Ok(DecommissionResponse {})
}
Expand Down Expand Up @@ -1836,15 +1876,15 @@ mod tests {
let (_ingester_ctx, ingester) = IngesterForTest::default().build().await;
let mut state_guard = ingester.state.write().await;

ingester.check_decommissioning_status(&mut state_guard);
state_guard.check_decommissioning_status();
assert_eq!(state_guard.status, IngesterStatus::Ready);
assert_eq!(
ingester.observation_rx.borrow().as_ref().unwrap().status(),
IngesterStatus::Ready
);

state_guard.status = IngesterStatus::Decommissioning;
ingester.check_decommissioning_status(&mut state_guard);
state_guard.check_decommissioning_status();
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);

state_guard.status = IngesterStatus::Decommissioning;
Expand All @@ -1855,13 +1895,13 @@ mod tests {
queue_id_01.clone(),
IngesterShard::new_solo(ShardState::Closed, Position::Eof, Position::Beginning),
);
ingester.check_decommissioning_status(&mut state_guard);
state_guard.check_decommissioning_status();
assert_eq!(state_guard.status, IngesterStatus::Decommissioning);

let shard = state_guard.shards.get_mut(&queue_id_01).unwrap();
shard.truncation_position_inclusive = Position::Eof;

ingester.check_decommissioning_status(&mut state_guard);
state_guard.check_decommissioning_status();
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);
assert_eq!(
ingester.observation_rx.borrow().as_ref().unwrap().status(),
Expand Down
50 changes: 50 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,65 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::io;
use std::iter::once;

use bytesize::ByteSize;
use mrecordlog::error::{AppendError, MissingQueue};
use mrecordlog::MultiRecordLog;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::types::{Position, QueueId};
use tracing::warn;

use super::mrecord::is_eof_mrecord;
use crate::MRecord;

#[derive(Debug, thiserror::Error)]
pub(super) enum AppendDocBatchError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("WAL queue `{0}` not found")]
QueueNotFound(QueueId),
}

/// Appends a document batch to the WAL queue.
///
/// # Panics
///
/// Panics if `doc_batch` is empty.
pub(super) async fn append_doc_batch(
mrecordlog: &mut MultiRecordLog,
queue_id: &QueueId,
doc_batch: &DocBatchV2,
force_commit: bool,
) -> Result<Position, AppendDocBatchError> {
let append_result = if force_commit {
let encoded_mrecords = doc_batch
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));
mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
};
match append_result {
Ok(Some(offset)) => Ok(Position::from(offset)),
Ok(None) => panic!("`doc_batch` should not be empty"),
Err(AppendError::IoError(io_error)) => Err(AppendDocBatchError::Io(io_error)),
Err(AppendError::MissingQueue(queue_id)) => {
Err(AppendDocBatchError::QueueNotFound(queue_id))
}
Err(AppendError::Past) => {
panic!("`append_records` should be called with `None`")
}
}
}

/// Appends an EOF record to the queue if it is empty or the last record is not an EOF
/// record.
pub(super) async fn append_eof_record_if_necessary(
Expand Down
Loading

0 comments on commit a6c844f

Please sign in to comment.