Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensures the affinity function is the same as in Quickwit 0.8 #5580

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub mod tower;
pub mod type_map;
pub mod uri;

mod socket_addr_legacy_hash;

use std::env;
use std::fmt::{Debug, Display};
use std::future::Future;
Expand All @@ -58,6 +60,7 @@ pub use coolid::new_coolid;
pub use kill_switch::KillSwitch;
pub use path_hasher::PathHasher;
pub use progress::{Progress, ProtectedZoneGuard};
pub use socket_addr_legacy_hash::SocketAddrLegacyHash;
pub use stream_utils::{BoxStream, ServiceStream};
use tracing::{error, info};

Expand Down
34 changes: 28 additions & 6 deletions quickwit/quickwit-common/src/rendezvous_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod tests {
use std::net::SocketAddr;

use super::*;
use crate::SocketAddrLegacyHash;

fn test_socket_addr(last_byte: u8) -> SocketAddr {
([127, 0, 0, last_byte], 10_000u16).into()
Expand All @@ -55,17 +56,38 @@ mod tests {
let socket3 = test_socket_addr(3);
let socket4 = test_socket_addr(4);

let mut socket_set1 = vec![socket4, socket3, socket1, socket2];
let legacy_socket1 = SocketAddrLegacyHash(&socket1);
let legacy_socket2 = SocketAddrLegacyHash(&socket2);
let legacy_socket3 = SocketAddrLegacyHash(&socket3);
let legacy_socket4 = SocketAddrLegacyHash(&socket4);

let mut socket_set1 = vec![
legacy_socket4,
legacy_socket3,
legacy_socket1,
legacy_socket2,
];
sort_by_rendez_vous_hash(&mut socket_set1, "key");

let mut socket_set2 = vec![socket1, socket2, socket4];
let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set2, "key");

let mut socket_set3 = vec![socket1, socket4];
let mut socket_set3 = vec![legacy_socket1, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set3, "key");

assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]);
assert_eq!(socket_set2, &[socket1, socket2, socket4]);
assert_eq!(socket_set3, &[socket1, socket4]);
assert_eq!(
socket_set1,
&[
legacy_socket1,
legacy_socket2,
legacy_socket3,
legacy_socket4
]
);
assert_eq!(
socket_set2,
&[legacy_socket1, legacy_socket2, legacy_socket4]
);
assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]);
}
}
100 changes: 100 additions & 0 deletions quickwit/quickwit-common/src/socket_addr_legacy_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::hash::Hasher;
use std::net::SocketAddr;

/// Computes the hash of socket addr, the way it was done before Rust 1.81
///
/// In <https://github.com/rust-lang/rust/commit/ba620344301aaa3b2733575a0696cdfd877edbdf>
/// rustc change the implementation of Hash for IpAddr v4 and v6.
///
/// The idea was to not hash an array of bytes but instead interpret it as a register
/// and hash this.
///
/// This was done for performance reason, but this change the result of the hash function
/// used to compute affinity in quickwit. As a result, the switch would invalidate all
/// existing cache.
///
/// In order to avoid this, we introduce the following function that reproduces the old
/// behavior.
#[repr(transparent)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr);

impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
std::mem::discriminant(self.0).hash(state);
match self.0 {
SocketAddr::V4(socket_addr_v4) => {
socket_addr_v4.ip().octets().hash(state);
socket_addr_v4.port().hash(state);
}
SocketAddr::V6(socket_addr_v6) => {
socket_addr_v6.ip().octets().hash(state);
socket_addr_v6.port().hash(state);
socket_addr_v6.flowinfo().hash(state);
socket_addr_v6.scope_id().hash(state);
}
}
}
}

#[cfg(test)]
mod tests {
use std::net::SocketAddrV6;

use super::*;

fn sample_socket_addr_v4() -> SocketAddr {
"17.12.15.3:1834".parse().unwrap()
}

fn sample_socket_addr_v6() -> SocketAddr {
let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap();
socket_addr_v6.set_scope_id(4047u32);
socket_addr_v6.set_flowinfo(303u32);
socket_addr_v6.into()
}

fn compute_hash(hashable: impl std::hash::Hash) -> u64 {
// I wish I could have used the sip hasher but we don't have the deps here and I did
// not want to move that code to quickwit-common.
//
// If test break because rust changed its default hasher, we can just update the tests in
// this file with the new values.
let mut hasher = siphasher::sip::SipHasher::default();
hashable.hash(&mut hasher);
hasher.finish()
}

#[test]
fn test_legacy_hash_socket_addr_v4() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4()));
// This value is coming from using rust 1.80 to hash socket addr
assert_eq!(h, 8725442259486497862);
}

#[test]
fn test_legacy_hash_socket_addr_v6() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6()));
// This value is coming from using rust 1.80 to hash socket addr
assert_eq!(h, 14277248675058176752);
}
}
30 changes: 15 additions & 15 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,30 +746,30 @@ mod tests {
#[tokio::test]
async fn test_put_kv_happy_path() {
// 3 servers 1, 2, 3
// Targeted key has affinity [3, 2, 1].
// Targeted key has affinity [2, 3, 1].
//
// Put on 2 and 3 is successful
// Get succeeds on 3.
// Get succeeds on 2.
let mock_search_service_1 = MockSearchService::new();
let mut mock_search_service_2 = MockSearchService::new();
// Due to the buffered call it is possible for the
// put request to 2 to be emitted too.
mock_search_service_2
.expect_put_kv()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
let mut mock_search_service_3 = MockSearchService::new();
mock_search_service_3.expect_put_kv().once().returning(
mock_search_service_2.expect_put_kv().once().returning(
|put_req: quickwit_proto::search::PutKvRequest| {
assert_eq!(put_req.key, b"my_key");
assert_eq!(put_req.payload, b"my_payload");
},
);
mock_search_service_3.expect_get_kv().once().returning(
mock_search_service_2.expect_get_kv().once().returning(
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
Some(b"my_payload".to_vec())
},
);
let mut mock_search_service_3 = MockSearchService::new();
// Due to the buffered call it is possible for the
// put request to 3 to be emitted too.
mock_search_service_3
.expect_put_kv()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", mock_search_service_1),
("127.0.0.1:1002", mock_search_service_2),
Expand All @@ -791,11 +791,11 @@ mod tests {
#[tokio::test]
async fn test_put_kv_failing_get() {
// 3 servers 1, 2, 3
// Targeted key has affinity [3, 2, 1].
// Targeted key has affinity [2, 3, 1].
//
// Put on 2 and 3 is successful
// Get fails on 3.
// Get succeeds on 2.
// Get fails on 2.
// Get succeeds on 3.
let mock_search_service_1 = MockSearchService::new();
let mut mock_search_service_2 = MockSearchService::new();
mock_search_service_2.expect_put_kv().once().returning(
Expand All @@ -807,7 +807,7 @@ mod tests {
mock_search_service_2.expect_get_kv().once().returning(
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
Some(b"my_payload".to_vec())
None
},
);
let mut mock_search_service_3 = MockSearchService::new();
Expand All @@ -820,7 +820,7 @@ mod tests {
mock_search_service_3.expect_get_kv().once().returning(
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
None
Some(b"my_payload".to_vec())
},
);
let searcher_pool = searcher_pool_for_test([
Expand Down
19 changes: 11 additions & 8 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use anyhow::bail;
use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_common::SocketAddrLegacyHash;
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::{info, warn};

Expand Down Expand Up @@ -77,7 +78,9 @@ impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
for report_split in evt.report_splits {
let node_addr = nodes
.keys()
.max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id))
.max_by_key(|node_addr| {
node_affinity(SocketAddrLegacyHash(node_addr), &report_split.split_id)
})
// This actually never happens thanks to the if-condition at the
// top of this function.
.expect("`nodes` should not be empty");
Expand Down Expand Up @@ -115,7 +118,7 @@ struct SocketAddrAndClient {

impl Hash for SocketAddrAndClient {
fn hash<H: Hasher>(&self, hasher: &mut H) {
self.socket_addr.hash(hasher);
SocketAddrLegacyHash(&self.socket_addr).hash(hasher);
}
}

Expand Down Expand Up @@ -174,7 +177,7 @@ impl SearchJobPlacer {
all_nodes.len()
);
}
let mut candidate_nodes: Vec<_> = all_nodes
let mut candidate_nodes: Vec<CandidateNode> = all_nodes
.into_iter()
.map(|(grpc_addr, client)| CandidateNode {
grpc_addr,
Expand Down Expand Up @@ -259,7 +262,7 @@ struct CandidateNode {

impl Hash for CandidateNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.grpc_addr.hash(state);
SocketAddrLegacyHash(&self.grpc_addr).hash(state);
}
}

Expand Down Expand Up @@ -432,17 +435,17 @@ mod tests {
(
expected_searcher_addr_1,
vec![
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split3", 3),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split1", 1),
],
),
(
expected_searcher_addr_2,
vec![
SearchJob::for_test("split6", 6),
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split1", 1),
],
),
];
Expand Down
Loading