Skip to content

Commit

Permalink
[ECO-2105] Add ws server (#37)
Browse files Browse the repository at this point in the history
* Add websocket server

* Small fixes

* Use combined events
  • Loading branch information
CRBl69 authored Aug 21, 2024
1 parent b153e65 commit 1000b77
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 58 deletions.
264 changes: 222 additions & 42 deletions rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tiny-keccak = { workspace = true }
tokio-postgres = { workspace = true }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
dotenvy = "0.15.7"
axum = { version = "0.7.5", features = ["ws"] }

[target.'cfg(unix)'.dependencies]
jemallocator = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig,
transaction_filter::TransactionFilter, worker::Worker,
emojicoin_dot_fun::EmojicoinDbEvent, gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, transaction_filter::TransactionFilter, worker::Worker, ws_server
};
use ahash::AHashMap;
use anyhow::{Context, Result};
Expand Down Expand Up @@ -85,6 +84,7 @@ impl IndexerGrpcProcessorConfig {
#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcProcessorConfig {
async fn run(&self) -> Result<()> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<EmojicoinDbEvent>();
let mut worker = Worker::new(
self.processor_config.clone(),
self.postgres_connection_string.clone(),
Expand All @@ -103,10 +103,18 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.transaction_filter.clone(),
self.grpc_response_item_timeout_in_secs,
self.deprecated_tables.clone(),
sender,
)
.await
.context("Failed to build worker")?;
worker.run().await;
tokio::select! {
_ = worker.run() => {
tracing::error!("Worker error.")
}
_ = ws_server::start(receiver) => {
tracing::error!("WebSocket server error")
}
};
Ok(())
}

Expand Down
118 changes: 114 additions & 4 deletions rust/processor/src/db/common/models/emojicoin_models/enums.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::constants::{
use super::{constants::{
CHAT_EVENT, GLOBAL_STATE_EVENT, LIQUIDITY_EVENT, MARKET_REGISTRATION_EVENT, MARKET_RESOURCE,
PERIODIC_STATE_EVENT, STATE_EVENT, SWAP_EVENT,
};
use serde::{Deserialize, Deserializer, Serialize};
}, json_types::{EventWithMarket, GlobalStateEvent}, models::{chat_event::ChatEventModel, global_state_event::GlobalStateEventModel, liquidity_event::LiquidityEventModel, market_latest_state_event::MarketLatestStateEventModel, market_registration_event::MarketRegistrationEventModel, periodic_state_event::PeriodicStateEventModel, swap_event::SwapEventModel}};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, diesel_derive_enum::DbEnum,
Expand Down Expand Up @@ -33,7 +33,30 @@ impl Trigger {
}
}

pub fn deserialize_state_trigger<'de, D>(deserializer: D) -> core::result::Result<Trigger, D::Error>
impl From<&Trigger> for i16 {
fn from(i: &Trigger) -> Self {
match i {
Trigger::PackagePublication => 0,
Trigger::MarketRegistration => 1,
Trigger::SwapBuy => 2,
Trigger::SwapSell => 3,
Trigger::ProvideLiquidity => 4,
Trigger::RemoveLiquidity => 5,
Trigger::Chat => 6,
}
}
}

pub fn serialize_state_trigger<S>(element: &Trigger, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
s.serialize_i16(element.into())
}

pub fn deserialize_state_trigger<'de, D>(
deserializer: D,
) -> core::result::Result<Trigger, D::Error>
where
D: Deserializer<'de>,
{
Expand Down Expand Up @@ -69,6 +92,22 @@ pub enum Period {
OneDay,
}

pub fn serialize_state_period<S>(element: &Period, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let r = match element {
Period::OneMinute => "60000000",
Period::FiveMinutes => "300000000",
Period::FifteenMinutes => "900000000",
Period::ThirtyMinutes => "1800000000",
Period::OneHour => "3600000000",
Period::FourHours => "14400000000",
Period::OneDay => "86400000000",
};
s.serialize_str(r)
}

pub fn deserialize_state_period<'de, D>(deserializer: D) -> core::result::Result<Period, D::Error>
where
D: Deserializer<'de>,
Expand Down Expand Up @@ -101,6 +140,77 @@ pub enum EmojicoinTypeTag {
Market,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum EmojicoinEvent {
EventWithMarket(EventWithMarket),
EventWithoutMarket(GlobalStateEvent),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum EmojicoinDbEvent {
Swap(SwapEventModel),
Chat(ChatEventModel),
MarketRegistration(MarketRegistrationEventModel),
PeriodicState(PeriodicStateEventModel),
MarketLatestState(MarketLatestStateEventModel),
GlobalState(GlobalStateEventModel),
Liquidity(LiquidityEventModel),
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum EmojicoinEventType {
Swap,
Chat,
MarketRegistration,
PeriodicState,
State,
GlobalState,
Liquidity,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum EmojicoinDbEventType {
Swap,
Chat,
MarketRegistration,
PeriodicState,
MarketLatestState,
GlobalState,
Liquidity,
}

impl From<&EmojicoinEvent> for EmojicoinEventType {
fn from(value: &EmojicoinEvent) -> Self {
match value {
EmojicoinEvent::EventWithMarket(e) => {
match e {
EventWithMarket::PeriodicState(_) => EmojicoinEventType::PeriodicState,
EventWithMarket::State(_) => EmojicoinEventType::State,
EventWithMarket::Swap(_) => EmojicoinEventType::Swap,
EventWithMarket::Chat(_) => EmojicoinEventType::Chat,
EventWithMarket::Liquidity(_) => EmojicoinEventType::Liquidity,
EventWithMarket::MarketRegistration(_) => EmojicoinEventType::MarketRegistration,
}
},
EmojicoinEvent::EventWithoutMarket(_) => EmojicoinEventType::GlobalState
}
}
}

impl From<&EmojicoinDbEvent> for EmojicoinDbEventType {
fn from(value: &EmojicoinDbEvent) -> Self {
match value {
EmojicoinDbEvent::Swap(_) => Self::Swap,
EmojicoinDbEvent::Chat(_) => Self::Chat,
EmojicoinDbEvent::MarketRegistration(_) => Self::MarketRegistration,
EmojicoinDbEvent::PeriodicState(_) => Self::PeriodicState,
EmojicoinDbEvent::MarketLatestState(_) => Self::MarketLatestState,
EmojicoinDbEvent::GlobalState(_) => Self::GlobalState,
EmojicoinDbEvent::Liquidity(_) => Self::Liquidity,
}
}
}

impl EmojicoinTypeTag {
pub fn from_type_str(type_str: &str) -> Option<Self> {
match type_str {
Expand Down
Loading

0 comments on commit 1000b77

Please sign in to comment.