Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe to wildcard #688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct Entry {
pub metadata: Metadata,
}

#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum Field {
Datapoint,
ActuatorTarget,
Expand Down Expand Up @@ -706,9 +706,13 @@ impl ChangeSubscription {
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
} else {
Ok(())
Expand Down Expand Up @@ -1328,21 +1332,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
entries: HashMap<String, HashSet<Field>>,
valid_entries: HashMap<i32, HashSet<Field>>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
let valid_entries = {
let mut valid_entries = HashMap::new();
for (path, fields) in entries {
match self.get_id_by_path(path.as_ref()).await {
Some(id) => {
valid_entries.insert(id, fields);
}
None => return Err(SubscriptionError::NotFound),
}
}
valid_entries
};

if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}
Expand Down Expand Up @@ -2803,10 +2794,7 @@ mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(
"test.datapoint1".into(),
HashSet::from([Field::Datapoint]),
)]))
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.await
.expect("subscription should succeed");

Expand Down
84 changes: 62 additions & 22 deletions kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,31 +375,71 @@ impl proto::val_server::Val for broker::DataBroker {
));
}

let mut entries = HashMap::new();

for entry in request.entries {
let mut fields = HashSet::new();
for id in entry.fields {
match proto::Field::from_i32(id) {
Some(field) => match field {
proto::Field::Value => {
fields.insert(broker::Field::Datapoint);
}
proto::Field::ActuatorTarget => {
fields.insert(broker::Field::ActuatorTarget);
let mut valid_requests: HashMap<String, (regex::Regex, HashSet<broker::Field>)> =
HashMap::new();

for entry in &request.entries {
if entry.path.contains('*') && !glob::is_valid_pattern(&entry.path) {
tonic::Status::new(tonic::Code::InvalidArgument, "Invalid Pattern Argument");
continue;
}

let regex_exp = glob::to_regex(&entry.path);
if let Ok(regex) = regex_exp {
let mut fields = HashSet::new();
for id in &entry.fields {
if let Some(field) = proto::Field::from_i32(*id) {
match field {
proto::Field::Value => {
fields.insert(broker::Field::Datapoint);
}
proto::Field::ActuatorTarget => {
fields.insert(broker::Field::ActuatorTarget);
}
_ => {
// Just ignore other fields for now
}
}
_ => {
// Just ignore other fields for now
};
}
valid_requests.insert(entry.path.clone(), (regex, fields));
}
}

let mut entries: HashMap<i32, HashSet<broker::Field>> = HashMap::new();

if !valid_requests.is_empty() {
for (path, (regex, fields)) in valid_requests {
let mut requested_path_found = false;
let mut permission_error = false;
broker
.for_each_entry(|entry| {
let entry_path = &entry.metadata().path;
if regex.is_match(entry_path) {
requested_path_found = true;
entries
.entry(entry.metadata().id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone());
})
.or_insert(fields.clone());

match entry.datapoint() {
Ok(_) => {}
Err(_) => permission_error = true,
}
}
},
None => {
return Err(tonic::Status::invalid_argument(format!(
"Invalid Field (id: {id})"
)))
}
};
})
.await;
if !requested_path_found {
let message = format!("No entries found for the provided. Path: {}", path);
return Err(tonic::Status::new(tonic::Code::NotFound, message));
}
if permission_error {
let message = format!("Permission denied for some entries. Path: {}", path);
return Err(tonic::Status::new(tonic::Code::PermissionDenied, message));
}
}
entries.insert(entry.path.clone(), fields);
}

match broker.subscribe(entries).await {
Expand Down