Skip to content

Commit

Permalink
perf(be): cache parsed avro schema
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci committed Nov 4, 2022
1 parent b95b2b2 commit b229189
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
9 changes: 3 additions & 6 deletions src-tauri/src/lib/parser/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where

let id = get_schema_id(raw)?;

let raw_schema = self
let schema = self
.schema_registry_client
.get_schema_by_id(id)
.await
Expand All @@ -45,9 +45,6 @@ where
err.to_string()
),
})?;
let schema = Schema::parse_str(raw_schema.as_str()).map_err(|err| Error::AvroParse {
message: format!("{}\n{}", "Unable to parse the schema from schema registry", err),
})?;
let mut data = Cursor::new(&raw[5..]);
let record = from_avro_datum(&schema, &mut data, None).map_err(|err| Error::AvroParse {
message: format!("{}\n{}", "Unable to parse the avro record", err),
Expand Down Expand Up @@ -193,8 +190,8 @@ mod tests {
async fn get_subject(&self, _: &str) -> Result<Subject> {
todo!()
}
async fn get_schema_by_id(&self, _: i32) -> Result<String> {
Ok(self.schema.clone())
async fn get_schema_by_id(&self, _: i32) -> Result<ApacheAvroSchema> {
Ok(ApacheAvroSchema::parse_str(&self.schema).unwrap())
}
}
fn get_sut(schema: String) -> AvroParser<MockSchemaRegistry> {
Expand Down
2 changes: 1 addition & 1 deletion src-tauri/src/lib/record_store/app_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::lib::{types::ParsedKafkaRecord, Error, Result};
use parking_lot::{FairMutex};
use parking_lot::FairMutex;
use rusqlite::{named_params, Connection};
use std::sync::Arc;

Expand Down
20 changes: 14 additions & 6 deletions src-tauri/src/lib/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

use apache_avro::Schema as AvroSchema;

use crate::lib::Error;

use super::error::Result;
use super::http_client::{HttpClient, ReqwestClient};
use super::types::{BasicAuth, Schema, Subject};
Expand All @@ -19,7 +23,7 @@ struct GetSchemaByIdResult {
pub trait SchemaRegistryClient {
async fn list_subjects(&self) -> Result<Vec<String>>;
async fn get_subject(&self, subject_name: &str) -> Result<Subject>;
async fn get_schema_by_id(&self, id: i32) -> Result<String>;
async fn get_schema_by_id(&self, id: i32) -> Result<AvroSchema>;
}

#[derive(Clone)]
Expand All @@ -29,7 +33,7 @@ where
{
http_client: C,
endpoint: String,
schema_cache_by_id: Arc<Mutex<HashMap<i32, String>>>,
schema_cache_by_id: Arc<Mutex<HashMap<i32, AvroSchema>>>,
}

impl CachedSchemaRegistry<ReqwestClient> {
Expand Down Expand Up @@ -83,19 +87,23 @@ where
})
}

async fn get_schema_by_id(&self, id: i32) -> Result<String> {
async fn get_schema_by_id(&self, id: i32) -> Result<AvroSchema> {
let mut cache = self.schema_cache_by_id.lock().await;
trace!("Getting schema {} by id.", id);

if let Some(cached) = cache.get(&id) {
trace!("Schema found in cache");
Ok(cached.clone())
} else {
trace!("Schema not found in cache, retrieving");
let url = Url::parse(&self.endpoint)?.join(format!("/schemas/ids/{}", id).as_str())?;
let schema: GetSchemaByIdResult = self.http_client.get(url.as_ref()).await?;
cache.insert(id, schema.schema.clone());
Ok(schema.schema)
let schema = AvroSchema::parse_str(schema.schema.as_str())
.map_err(|err| Error::AvroParse {
message: format!("{}\n{}", "Unable to parse the schema from schema registry", err),
})
.expect("todo");
cache.insert(id, schema.clone());
Ok(schema)
}
}
}
Expand Down

0 comments on commit b229189

Please sign in to comment.