Skip to content

Commit

Permalink
- revert removal of identity filtering
Browse files Browse the repository at this point in the history
- event dao: create timestamp within the transaction fn
- fix comments in common.rs
  • Loading branch information
mfranciszkiewicz committed Jan 12, 2021
1 parent 67206ec commit 4de3b2c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
3 changes: 2 additions & 1 deletion core/activity/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ pub struct QueryTimeoutCommandIndex {

#[derive(Deserialize, Debug)]
pub struct QueryEvents {
/// number of milliseconds to wait
/// application session identifier
#[serde(rename = "appSessionId")]
pub app_session_id: Option<String>,
/// number of milliseconds to wait
#[serde(rename = "pollTimeout", default = "default_query_timeout")]
pub poll_timeout: Option<f32>,
/// select events past the specified point in time
#[serde(rename = "afterTimestamp")]
pub after_timestamp: DateTime<Utc>,
/// maximum count of events to return
Expand Down
12 changes: 8 additions & 4 deletions core/activity/src/dao/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl<'c> EventDao<'c> {
use schema::activity::dsl;
use schema::activity_event::dsl as dsl_event;

let now = Utc::now().naive_utc();
log::trace!("creating event_type: {:?}", event_type);

let app_session_id = app_session_id.to_owned();
let activity_id = activity_id.to_owned();
let identity_id = identity_id.to_owned();

do_with_transaction(self.pool, move |conn| {
let now = Utc::now().naive_utc();
diesel::insert_into(dsl_event::activity_event)
.values(
dsl::activity
Expand Down Expand Up @@ -106,13 +106,15 @@ impl<'c> EventDao<'c> {

pub async fn get_events(
&self,
after_timestamp: DateTime<Utc>,
identity_id: &NodeId,
app_session_id: &Option<String>,
after_timestamp: DateTime<Utc>,
max_events: Option<u32>,
) -> Result<Option<Vec<ProviderEvent>>> {
use schema::activity::dsl;
use schema::activity_event::dsl as dsl_event;

let identity_id = identity_id.to_string();
let app_session_id = app_session_id.to_owned();
let limit = match max_events {
Some(val) => MAX_EVENTS.min(val as i64),
Expand All @@ -123,6 +125,7 @@ impl<'c> EventDao<'c> {
readonly_transaction(self.pool, move |conn| {
let mut query = dsl_event::activity_event
.inner_join(schema::activity::table)
.filter(dsl_event::identity_id.eq(identity_id))
.select((
dsl_event::id,
dsl_event::event_date,
Expand Down Expand Up @@ -151,15 +154,16 @@ impl<'c> EventDao<'c> {

pub async fn get_events_wait(
&self,
after_timestamp: DateTime<Utc>,
identity_id: &NodeId,
app_session_id: &Option<String>,
after_timestamp: DateTime<Utc>,
max_events: Option<u32>,
) -> Result<Vec<ProviderEvent>> {
let duration = Duration::from_millis(750);

loop {
if let Some(events) = self
.get_events(after_timestamp, &app_session_id, max_events)
.get_events(identity_id, app_session_id, after_timestamp, max_events)
.await?
{
if events.len() > 0 {
Expand Down
9 changes: 7 additions & 2 deletions core/activity/src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ async fn set_activity_state(

/// Fetch Requestor command events.
#[actix_web::get("/events")]
async fn get_events(db: web::Data<DbExecutor>, query: web::Query<QueryEvents>) -> impl Responder {
async fn get_events(
db: web::Data<DbExecutor>,
query: web::Query<QueryEvents>,
id: Identity,
) -> impl Responder {
log::trace!("getting events {:?}", query);
let events = db
.as_dao::<EventDao>()
.get_events_wait(
query.after_timestamp,
&id.identity,
&query.app_session_id,
query.after_timestamp,
query.max_events,
)
.timeout(query.poll_timeout)
Expand Down

0 comments on commit 4de3b2c

Please sign in to comment.