Skip to content

Commit

Permalink
Merge pull request #69 from vertexclique/ansrivas-tokio-pg
Browse files Browse the repository at this point in the history
wip - tokio-postgres
  • Loading branch information
vertexclique authored Apr 26, 2022
2 parents c9f15f5 + 46afbbf commit 359a7d6
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
timeout-minutes: 40
with:
command: test
args: --all --features sink_elastic -- --test-threads 1 --nocapture
args: --all --features sink_postgres,sink_elastic -- --test-threads 1 --nocapture

- name: Check build
uses: actions-rs/cargo@v1
Expand Down
76 changes: 44 additions & 32 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ authors = [
"Theo B. <[email protected]>",
"Ankur S. <[email protected]>",
]
keywords = ["stream", "stream-processing", "microservice", "distributed", "database"]
categories = ["concurrency", "asynchronous", "database", "database-implementations"]
keywords = [
"stream",
"stream-processing",
"microservice",
"distributed",
"database",
]
categories = [
"concurrency",
"asynchronous",
"database",
"database-implementations",
]
edition = "2021"
rust-version = "1.60"
license = "Apache-2.0"
Expand All @@ -34,10 +45,7 @@ exclude = [

[features]
default = ["epoll", "asyncexec"]
docs = [
"store_rocksdb",
"sink_elastic"
]
docs = ["store_rocksdb", "sink_elastic", "sink_postgres"]
# IO systems
iouring = ["nuclei/iouring"]
epoll = ["nuclei/epoll"]
Expand All @@ -49,44 +57,48 @@ tokio02 = ["nuclei/tokio02"]
# Storage systems
store_rocksdb = ["rocksdb"]
# Sinks
sink_elastic = [
"tokio",
"elasticsearch"
]
sink_elastic = ["tokio", "elasticsearch"]
sink_postgres = ["tokio", "deadpool-postgres", "deadpool"]

[dependencies]
#nuclei = { version = "0.2", default-features = false, features = ["epoll", "async-exec"] }
nuclei = "0.2"
nuclei = "0.2.0"
lightproc = "0.3.5"
lever = "0.1.3"
thiserror = "1.0"
async-trait = "0.1"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures-timer = "3.0"
crossbeam-channel = "0.5"
rdkafka = { version = "0.28.0", default-features = false, features = ["libz"]}
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
libc = "0.2.123"
cuneiform-fields = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
http-types = "2.12"
async-h1 = "2.3"
thiserror = "1.0.30"
async-trait = "0.1.53"
futures = { version = "0.3.21", default-features = false, features = [
"std",
"async-await",
] }
futures-timer = "3.0.2"
crossbeam-channel = "0.5.4"
rdkafka = { version = "0.28.0", default-features = false, features = ["libz"] }
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
url = "2.2.2"
libc = "0.2.124"
cuneiform-fields = "0.1.1"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
bincode = "1.3.3"
http-types = "2.12.0"
async-h1 = "2.3.3"
pin-project-lite = "0.2.8"
futures-lite = "1.12"
futures-lite = "1.12.0"

# Optionals
rocksdb = { version = "0.18", optional = true }
rocksdb = { version = "0.18.0", optional = true }
elasticsearch = { version = "7.14.0-alpha.1", optional = true }

deadpool-postgres = { version = "0.10.2", features = [
"serde",
], optional = true }
deadpool = { version = "0.9.3", optional = true }
async-global-executor = "2.0.4"
[dev-dependencies]
daemonize = "0.4"
daemonize = "0.4.1"
dirs = "4.0.0"

[package.metadata.docs.rs]
features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

53 changes: 53 additions & 0 deletions examples/postgres-sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use callysto::errors::CallystoError;
use callysto::futures::StreamExt;
use callysto::prelude::message::*;
use callysto::prelude::postgres::{CPostgresRow, CPostgresSink};
use callysto::prelude::*;
use serde::{Deserialize, Serialize};

/// create table spotify ( id bigserial primary key, song_url text );
/// rpk topic create --brokers localhost:9092 example
#[derive(Serialize, Deserialize)]
pub struct SpotifyDocument {
song_url: String,
}

impl SpotifyDocument {
pub fn new(song_url: String) -> Self {
Self { song_url }
}
}

async fn postgres_agent(stream: CStream, _ctx: Context<()>) -> Result<()> {
stream
.enumerate()
.map(|(idx, m)| {
m.map(|e| {
let doc = SpotifyDocument::new(e.payload_view::<str>().unwrap().unwrap().into());
println!("Processing document ID: {}", idx);

let query = "INSERT INTO
spotify (song_url)
VALUES ($1)";

CPostgresRow::new(query.into(), vec![doc.song_url])
})
.ok_or(CallystoError::GeneralError("No payload".into()))
})
.forward(
CPostgresSink::new("postgres://testuser:testpassword@localhost/testdb", 4, 0).unwrap(),
)
.await?;

Ok(())
}

fn main() {
let mut app = Callysto::new();

app.with_name("elasticsearch-app");
app.agent("elastic-agent", app.topic("example"), postgres_agent);

app.run();
}
25 changes: 25 additions & 0 deletions examples/producer_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pip install aiokafka -U

import aiokafka
import asyncio


async def produce_message(topic: str, message: str):
producer = aiokafka.AIOKafkaProducer(bootstrap_servers="localhost:9092")
await producer.start()
try:
await producer.send_and_wait(topic, message.encode())
finally:
await producer.stop()


async def main():
data = "hello"
for i in range(0, 100):
await produce_message("example", data)
print("Produced on topic `example`")
print("DONE")


if __name__ == "__main__":
asyncio.run(main())
4 changes: 4 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ pub enum CallystoError {
#[cfg(feature = "sink_elastic")]
#[error("ElasticSearch Error")]
ElasticSearchError(#[from] elasticsearch::Error),

#[cfg(feature = "sink_postgres")]
#[error("Postgres Error")]
PostgresError(#[from] deadpool_postgres::tokio_postgres::error::Error),
}
5 changes: 5 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
#[cfg_attr(feature = "docs", doc(cfg(sink_elastic)))]
/// ElasticSearch Sink for CStream
pub mod elasticsearch;

#[cfg(feature = "sink_postgres")]
#[cfg_attr(feature = "docs", doc(cfg(sink_postgres)))]
/// Postgres Sink for CStream
pub mod postgres;
150 changes: 150 additions & 0 deletions src/sinks/postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use crate::errors::*;
use crate::runtime::async_con::Arc;
use crossbeam_channel::{Receiver, Sender};
use cuneiform_fields::arch::ArchPadding;
use deadpool_postgres::tokio_postgres::types::ToSql;
use deadpool_postgres::{Client, Config, Pool, PoolError, Runtime};
use deadpool_postgres::{Manager, ManagerConfig, RecyclingMethod};
use futures::sink::{drain, With};
use futures::Future;
use futures::{Sink, SinkExt};
use futures_lite::FutureExt;
use nuclei::Task;
use pin_project_lite::pin_project;
use serde::Serialize;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::vec::Drain;
use tracing::{debug, trace, warn};
use url::Url;

#[derive(Debug)]
pub struct CPostgresRow<T: ToSql + Sync + 'static + Send> {
pub query: String,
pub args: Vec<T>,
}

impl<T> CPostgresRow<T>
where
T: Send + ToSql + Sync + 'static,
{
pub fn new(query: String, args: Vec<T>) -> Self {
Self { query, args }
}
}

pin_project! {
pub struct CPostgresSink<T>
where
T: ToSql,
T: Sync,
T: Send,
T: 'static
{
client: Arc<deadpool_postgres::Pool>,
tx: ArchPadding<Sender<CPostgresRow<T>>>,
buffer_size: usize,
#[pin]
data_sink: Task<()>
}
}

impl<T> CPostgresSink<T>
where
T: ToSql + Sync + 'static + Send,
{
async fn setup_pg(dsn: &str, tls: bool, pool_size: usize) -> Result<Pool> {
// TODO(ansrivas): Currently only NoTls is supported, will add it later.
let pg_config = deadpool_postgres::tokio_postgres::Config::from_str(dsn)?;
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};

let mgr = Manager::from_config(
pg_config,
deadpool_postgres::tokio_postgres::NoTls,
mgr_config,
);
let pool = Pool::builder(mgr)
.max_size(pool_size)
.build()
.map_err(|e| CallystoError::GeneralError(e.to_string()))?;
Ok(pool)
}

pub fn new<S: Into<String>>(pg_dsn: S, pool_size: usize, buffer_size: usize) -> Result<Self> {
let pgpool = nuclei::block_on(async move {
Self::setup_pg(&pg_dsn.into(), false, pool_size)
.await
.unwrap()
});

let (tx, rx) = crossbeam_channel::unbounded::<CPostgresRow<T>>();
let (tx, rx) = (ArchPadding::new(tx), ArchPadding::new(rx));

let inner_client = pgpool.clone();
let client = Arc::new(pgpool);
let data_sink = nuclei::spawn(async move {
while let Ok(item) = rx.recv() {
let mut client = inner_client.get().await.unwrap();
let stmt = client.prepare_cached(&item.query).await.unwrap();
let rows = client.query_raw(&stmt, &item.args).await.unwrap();
trace!("CPostgresSink - Ingestion status:");
}
});

Ok(Self {
client,
tx,
buffer_size,
data_sink,
})
}
}

impl<T> Sink<CPostgresRow<T>> for CPostgresSink<T>
where
T: ToSql + Sync + 'static + Send,
{
type Error = CallystoError;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
if self.buffer_size == 0 {
// Bypass buffering
return Poll::Ready(Ok(()));
}

if self.tx.len() >= self.buffer_size {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn start_send(mut self: Pin<&mut Self>, item: CPostgresRow<T>) -> Result<()> {
let mut this = &mut *self;
this.tx
.send(item)
.map_err(|e| CallystoError::GeneralError(format!("Failed to send to db: `{}`", e)))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
if self.tx.len() > 0 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
if self.tx.len() > 0 {
Poll::Pending
} else {
// TODO: Drop the task `data_sink`.
Poll::Ready(Ok(()))
}
}
}

0 comments on commit 359a7d6

Please sign in to comment.