Skip to content

Commit

Permalink
feat: add connection_string column
Browse files Browse the repository at this point in the history
  • Loading branch information
EvolveArt committed Nov 24, 2024
1 parent 5b6fb03 commit 9bf4ad2
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE indexers DROP COLUMN custom_connection_string;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
-- Add custom connection string column
ALTER TABLE indexers
ADD COLUMN custom_connection_string VARCHAR;
1 change: 1 addition & 0 deletions src/domain/models/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct IndexerModel {
pub target_url: Option<String>,
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
}

#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
40 changes: 29 additions & 11 deletions src/handlers/indexers/create_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use axum::Json;
use diesel::SelectableHelper;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, RunQueryDsl};
use serde::Deserialize;
use uuid::Uuid;

use crate::config::config;
Expand All @@ -19,13 +20,29 @@ use crate::publishers::indexers::publish_start_indexer;
use crate::utils::env::get_environment_variable;
use crate::AppState;

#[derive(Default)]
struct CreateIndexerRequest {
target_url: Option<String>,
data: Bytes,
table_name: Option<String>,
indexer_type: IndexerType,
status_server_port: i32,
#[derive(Debug, Deserialize)]
pub struct CreateIndexerRequest {
pub indexer_type: IndexerType,
pub target_url: Option<String>,
pub table_name: Option<String>,
pub custom_connection_string: Option<String>,
#[serde(skip)]
pub data: Bytes,
#[serde(skip)]
pub status_server_port: i32,
}

impl Default for CreateIndexerRequest {
fn default() -> Self {
Self {
indexer_type: IndexerType::default(),
target_url: None,
table_name: None,
custom_connection_string: None,
data: Bytes::new(),
status_server_port: 1234,
}
}
}

impl CreateIndexerRequest {
Expand Down Expand Up @@ -103,12 +120,13 @@ pub async fn create_indexer(
let id = Uuid::new_v4();
let create_indexer_request = build_create_indexer_request(&mut request).await?;
let new_indexer_db = indexer_repository::NewIndexerDb {
id,
status: IndexerStatus::Created.to_string(),
type_: create_indexer_request.indexer_type.to_string(),
id,
target_url: create_indexer_request.target_url,
table_name: create_indexer_request.table_name,
status_server_port: Some(create_indexer_request.status_server_port),
target_url: create_indexer_request.target_url.clone(),
table_name: create_indexer_request.table_name.clone(),
status_server_port: None,
custom_connection_string: create_indexer_request.custom_connection_string.clone(),
};

let config = config().await;
Expand Down
1 change: 1 addition & 0 deletions src/handlers/indexers/indexer_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ mod tests {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: Some(1234),
custom_connection_string: None,
};

// clear the sqs queue
Expand Down
5 changes: 4 additions & 1 deletion src/handlers/indexers/indexer_types/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ pub struct PostgresIndexer;
impl Indexer for PostgresIndexer {
async fn start(&self, indexer: &IndexerModel, attempt: u32) -> Result<u32, IndexerError> {
let binary_file = format!("{}/{}", get_environment_variable("BINARY_BASE_PATH"), "sink-postgres");
let postgres_connection_string = get_environment_variable("APIBARA_POSTGRES_CONNECTION_STRING");
let postgres_connection_string = indexer
.custom_connection_string
.clone()
.unwrap_or_else(|| get_environment_variable("APIBARA_POSTGRES_CONNECTION_STRING"));
let table_name = indexer.table_name.as_ref().expect("`table_name` not set for postgres indexer");
let id = self.start_common(
binary_file,
Expand Down
1 change: 1 addition & 0 deletions src/infra/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ diesel::table! {
target_url -> Nullable<Varchar>,
table_name -> Nullable<Varchar>,
status_server_port -> Nullable<Int4>,
custom_connection_string -> Nullable<Varchar>,
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/infra/repositories/indexer_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct IndexerDb {
pub target_url: Option<String>,
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
}

#[derive(Deserialize)]
Expand All @@ -39,6 +40,7 @@ pub struct NewIndexerDb {
pub target_url: Option<String>,
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
}

#[derive(Deserialize, Insertable)]
Expand Down Expand Up @@ -220,6 +222,7 @@ impl TryFrom<NewIndexerDb> for IndexerModel {
process_id: None,
table_name: value.table_name,
status_server_port: value.status_server_port,
custom_connection_string: value.custom_connection_string,
}
.try_into()?;
Ok(model)
Expand All @@ -237,6 +240,7 @@ impl TryFrom<IndexerDb> for IndexerModel {
target_url: value.target_url,
table_name: value.table_name,
status_server_port: value.status_server_port,
custom_connection_string: value.custom_connection_string,
};
Ok(model)
}
Expand Down Expand Up @@ -273,6 +277,7 @@ mod tests {
target_url: Some(target_url.to_string()),
table_name: Some(table_name.into()),
status_server_port: Some(1234),
custom_connection_string: None,
};

let indexer_model: Result<IndexerModel, ParseError> = indexer_db.try_into();
Expand Down Expand Up @@ -313,6 +318,7 @@ mod tests {
target_url: Some(target_url.to_string()),
table_name: Some(table_name.into()),
status_server_port: Some(1234),
custom_connection_string: None,
};

let indexer_model: Result<IndexerModel, ParseError> = indexer_db.try_into();
Expand Down
5 changes: 5 additions & 0 deletions src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl MockRepository {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
Expand All @@ -39,6 +40,7 @@ impl MockRepository {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
Expand All @@ -48,6 +50,7 @@ impl MockRepository {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
Expand All @@ -57,6 +60,7 @@ impl MockRepository {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
Expand All @@ -66,6 +70,7 @@ impl MockRepository {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
],
}
Expand Down
6 changes: 6 additions & 0 deletions src/tests/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async fn test_get_indexer() {
target_url: Some("https://example.com".to_string()), // TODO: Mock webhook and test its behavior
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -54,6 +55,7 @@ async fn test_insert_indexer() {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -82,6 +84,7 @@ async fn test_update_status() {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -109,6 +112,7 @@ async fn test_update_status_and_process_id() {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -144,6 +148,7 @@ async fn test_get_all_indexers() {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand All @@ -158,6 +163,7 @@ async fn test_get_all_indexers() {
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
})
.await
.unwrap();
Expand Down

0 comments on commit 9bf4ad2

Please sign in to comment.