Skip to content

Commit

Permalink
add /{table}/{id} (#14)
Browse files Browse the repository at this point in the history
* add `/{table}/{id}`

* move to connection pool for db, and make heartbeat check db access.

* alembic was failing

* wow

---------

Co-authored-by: Sanchit Ram Arvind <[email protected]>
  • Loading branch information
jhheider and sanchitram1 authored Oct 22, 2024
1 parent 8c2b87f commit 8f9b926
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 55 deletions.
3 changes: 2 additions & 1 deletion alembic/load-values.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ ON CONFLICT (type) DO NOTHING;
INSERT INTO "package_managers" ("source_id")
SELECT id
FROM "sources"
WHERE "type" IN ('crates', 'npm', 'pypi', 'rubygems', 'github', 'homebrew');
WHERE "type" IN ('crates', 'npm', 'pypi', 'rubygems', 'github', 'homebrew')
ON CONFLICT (source_id) DO NOTHING;
4 changes: 4 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "MIT"
repository = "https://github.com/teaxyz/chai-oss"

[dependencies]
uuid = { version = "1.11.0", features = ["serde", "v4"] }
actix-web = "4.3"
dotenv = "0.15"
tokio = { version = "1", features = ["full"] }
Expand All @@ -20,4 +21,7 @@ chrono = { version = "0.4", features = ["serde"] }
tokio-postgres = { version = "0.7", features = [
"with-serde_json-1",
"with-chrono-0_4",
"with-uuid-1",
] }
deadpool-postgres = "0.10.0"
url = "2.5.2"
4 changes: 2 additions & 2 deletions api/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use deadpool_postgres::Pool;
use std::sync::Arc;
use tokio_postgres::Client;

pub struct AppState {
pub client: Arc<Client>,
pub pool: Pool,
pub tables: Arc<Vec<String>>,
}
34 changes: 23 additions & 11 deletions api/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use deadpool_postgres::{Config, Pool, Runtime};
use std::env;
use std::sync::Arc;
use tokio_postgres::{Client, NoTls};
use url::Url;

pub async fn create_db_client(database_url: &str) -> Arc<Client> {
let (client, connection) = tokio_postgres::connect(database_url, NoTls)
.await
.expect("Failed to connect to PostgreSQL");
pub async fn create_pool() -> Pool {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let db_url = Url::parse(&database_url).expect("Invalid database URL");

tokio::spawn(async move {
if let Err(e) = connection.await {
log::error!("Database connection error: {}", e);
}
});
let mut config = Config::new();
config.host = db_url.host_str().map(ToOwned::to_owned);
config.port = db_url.port();
config.user = Some(db_url.username().to_owned());
config.password = db_url.password().map(ToOwned::to_owned);
config.dbname = db_url.path().strip_prefix('/').map(ToOwned::to_owned);

Arc::new(client)
config
.create_pool(Some(Runtime::Tokio1), NoTls)
.expect("Failed to create pool")
}

pub async fn get_tables(client: &Arc<Client>) -> Vec<String> {
pub async fn get_tables(client: &Client) -> Vec<String> {
let rows = client
.query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
Expand All @@ -28,3 +33,10 @@ pub async fn get_tables(client: &Arc<Client>) -> Vec<String> {
.map(|row| row.get::<_, String>("table_name"))
.collect()
}

pub async fn initialize_db() -> (Pool, Arc<Vec<String>>) {
let pool = create_pool().await;
let client = pool.get().await.expect("Failed to get client from pool");
let tables = Arc::new(get_tables(&client).await);
(pool, tables)
}
126 changes: 98 additions & 28 deletions api/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use actix_web::{get, web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio_postgres::error::SqlState;
use uuid::Uuid;

use crate::app_state::AppState;
use crate::utils::{get_column_names, rows_to_json};
Expand Down Expand Up @@ -28,8 +30,20 @@ pub async fn list_tables(data: web::Data<AppState>) -> impl Responder {
}

#[get("/heartbeat")]
pub async fn heartbeat() -> impl Responder {
HttpResponse::Ok().body("OK")
pub async fn heartbeat(data: web::Data<AppState>) -> impl Responder {
match data.pool.get().await {
Ok(client) => match client.query_one("SELECT 1", &[]).await {
Ok(_) => HttpResponse::Ok().body("OK - Database connection is healthy"),
Err(e) => {
log::error!("Database query failed: {}", e);
HttpResponse::InternalServerError().body("Database query failed")
}
},
Err(e) => {
log::error!("Failed to get database connection: {}", e);
HttpResponse::InternalServerError().body("Failed to get database connection")
}
}
}

#[get("/{table}")]
Expand All @@ -52,39 +66,95 @@ pub async fn get_table(
let count_query = format!("SELECT COUNT(*) FROM {}", table);
let data_query = format!("SELECT * FROM {} LIMIT $1 OFFSET $2", table);

match data.client.query_one(&count_query, &[]).await {
Ok(count_row) => {
let total_count: i64 = count_row.get(0);
let total_pages = (total_count as f64 / limit as f64).ceil() as i64;
match data.pool.get().await {
Ok(client) => match client.query_one(&count_query, &[]).await {
Ok(count_row) => {
let total_count: i64 = count_row.get(0);
let total_pages = (total_count as f64 / limit as f64).ceil() as i64;

match data.client.query(&data_query, &[&limit, &offset]).await {
Ok(rows) => {
let columns = get_column_names(&rows);
let data = rows_to_json(&rows);
let response = PaginatedResponse {
table,
total_count,
page,
limit,
total_pages,
columns,
data,
};
HttpResponse::Ok().json(response)
match client.query(&data_query, &[&limit, &offset]).await {
Ok(rows) => {
let columns = get_column_names(&rows);
let data = rows_to_json(&rows);
let response = PaginatedResponse {
table,
total_count,
page,
limit,
total_pages,
columns,
data,
};
HttpResponse::Ok().json(response)
}
Err(e) => {
log::error!("Database query error: {}", e);
HttpResponse::InternalServerError().json(json!({
"error": "An error occurred while querying the database"
}))
}
}
Err(e) => {
log::error!("Database query error: {}", e);
}
Err(e) => {
log::error!("Database count query error: {}", e);
HttpResponse::InternalServerError().json(json!({
"error": "An error occurred while counting rows in the database"
}))
}
},
Err(e) => {
log::error!("Failed to get database connection: {}", e);
HttpResponse::InternalServerError().body("Failed to get database connection")
}
}
}

#[get("/{table}/{id}")]
pub async fn get_table_row(
path: web::Path<(String, Uuid)>,
data: web::Data<AppState>,
) -> impl Responder {
let (table_name, id) = path.into_inner();

if !data.tables.contains(&table_name) {
return HttpResponse::NotFound().json(json!({
"error": format!("Table '{}' not found", table_name)
}));
}

let query = format!("SELECT * FROM {} WHERE id = $1", table_name);

match data.pool.get().await {
Ok(client) => match client.query_one(&query, &[&id]).await {
Ok(row) => {
let json = rows_to_json(&[row]);
let value = json.first().unwrap();
HttpResponse::Ok().json(value)
}
Err(e) => {
if e.as_db_error()
.map_or(false, |e| e.code() == &SqlState::UNDEFINED_TABLE)
{
HttpResponse::NotFound().json(json!({
"error": format!("Table '{}' not found", table_name)
}))
} else if e
.as_db_error()
.map_or(false, |e| e.code() == &SqlState::NO_DATA_FOUND)
{
HttpResponse::NotFound().json(json!({
"error": format!("No row found with id '{}' in table '{}'", id, table_name)
}))
} else {
HttpResponse::InternalServerError().json(json!({
"error": "An error occurred while querying the database"
"error": format!("Database error: {}", e)
}))
}
}
}
},
Err(e) => {
log::error!("Database count query error: {}", e);
HttpResponse::InternalServerError().json(json!({
"error": "An error occurred while counting rows in the database"
}))
log::error!("Failed to get database connection: {}", e);
HttpResponse::InternalServerError().body("Failed to get database connection")
}
}
}
10 changes: 4 additions & 6 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,19 @@ use std::env;
use std::sync::Arc;

use crate::app_state::AppState;
use crate::db::create_db_client;
use crate::handlers::{get_table, heartbeat, list_tables};
use crate::handlers::{get_table, get_table_row, heartbeat, list_tables};
use crate::logging::setup_logger;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
setup_logger();

let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("PORT").unwrap_or_else(|_| "8080".to_string());
let bind_address = format!("{}:{}", host, port);

let client = create_db_client(&database_url).await;
let tables = Arc::new(db::get_tables(&client).await);
let (pool, tables) = db::initialize_db().await;

log::info!("Available tables: {:?}", tables);
log::info!("Starting server at http://{}", bind_address);
Expand All @@ -34,12 +31,13 @@ async fn main() -> std::io::Result<()> {
App::new()
.wrap(logging::Logger::default())
.app_data(web::Data::new(AppState {
client: Arc::clone(&client),
pool: pool.clone(),
tables: Arc::clone(&tables),
}))
.service(list_tables)
.service(heartbeat)
.service(get_table)
.service(get_table_row)
})
.bind(&bind_address)?
.run()
Expand Down
19 changes: 12 additions & 7 deletions api/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use serde_json::{json, Value};
use tokio_postgres::types::{Json, Type};
use tokio_postgres::Row;
use tokio_postgres::{types::Type, Row};
use uuid::Uuid;

pub fn get_column_names(rows: &[Row]) -> Vec<String> {
if let Some(row) = rows.first() {
Expand All @@ -27,20 +28,24 @@ pub fn rows_to_json(rows: &[Row]) -> Vec<Value> {
Type::BOOL => json!(row.get::<_, bool>(i)),
Type::VARCHAR | Type::TEXT | Type::BPCHAR => json!(row.get::<_, String>(i)),
Type::TIMESTAMP => {
let ts: chrono::NaiveDateTime = row.get(i);
let ts: NaiveDateTime = row.get(i);
json!(ts.to_string())
}
Type::TIMESTAMPTZ => {
let ts: chrono::DateTime<chrono::Utc> = row.get(i);
let ts: DateTime<Utc> = row.get(i);
json!(ts.to_rfc3339())
}
Type::DATE => {
let date: chrono::NaiveDate = row.get(i);
let date: NaiveDate = row.get(i);
json!(date.to_string())
}
Type::JSON | Type::JSONB => {
let json_value: Json<Value> = row.get(i);
json_value.0
let json_value: serde_json::Value = row.get(i);
json_value
}
Type::UUID => {
let uuid: Uuid = row.get(i);
json!(uuid.to_string())
}
_ => Value::Null,
};
Expand Down

0 comments on commit 8f9b926

Please sign in to comment.