Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market: fix rest responses #765

Merged
merged 19 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a84b785
[mkt] REST endpoints to comply with yaml spec
tworec Nov 4, 2020
d687289
Merge branch 'master' into market/fix-rest-responses
tworec Nov 4, 2020
d31d948
[mkt] REST to return ApprovalStatus as string
tworec Nov 4, 2020
7436ef8
Merge branch 'master' into market/fix-rest-responses
tworec Nov 10, 2020
8aa12fe
bump ya-client to align with golemfactory/ya-client#59
tworec Nov 10, 2020
3969a49
Merge branch 'master' into market/fix-rest-responses
tworec Dec 10, 2020
6b18463
bump ya-client reference
tworec Dec 10, 2020
fd8182c
Compatibility with updated ya-client
nieznanysprawiciel Dec 14, 2020
c5cf32f
Reference master branch on ya-client
nieznanysprawiciel Dec 14, 2020
ef57d0a
Merge branch 'event-api/master' into market/fix-rest-responses
tworec Dec 14, 2020
4b79090
Adjust panic event messages in tests
nieznanysprawiciel Dec 14, 2020
b602a44
Activity Event API [1/2]
mfranciszkiewicz Dec 14, 2020
7a061dd
Increase negotiation proposal timeouts in tests
nieznanysprawiciel Dec 14, 2020
30f8bbf
bump ya-client to newest event-api/master
tworec Dec 14, 2020
f3a51fb
Merge remote-tracking branch 'origin/mf/activity/event-api' into mark…
tworec Dec 14, 2020
ff68555
Merge branch 'market/agreement-events-extract-common-fields' into mar…
tworec Dec 14, 2020
2af6ab5
[mkt] refactor TakeEventsError to fix flaky test
tworec Dec 14, 2020
eba2dad
Merge branch 'market/agreement-events-extract-common-fields' into mar…
tworec Dec 14, 2020
89a34b8
Fix ClientAgreement::app_session_id
mfranciszkiewicz Dec 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ ya-sb-router = { path = "service-bus/router" }
ya-sb-util = { path = "service-bus/util" }

## CLIENT
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "e09070085f9f6b9f681a4fbc0ce039abe5211365"}
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "e09070085f9f6b9f681a4fbc0ce039abe5211365"}
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "142cbf39d5a0ed631e3e03dea7529133dd0d30ec"}
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "142cbf39d5a0ed631e3e03dea7529133dd0d30ec"}

#ya-client = { path = "../ya-client" }
#ya-client-model = { path = "../ya-client/model" }
Expand Down
131 changes: 62 additions & 69 deletions agent/provider/src/execution/task_runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix::prelude::*;
use anyhow::{anyhow, bail, Error, Result};
use chrono::{DateTime, Utc};
use derive_more::Display;
use futures::future::join_all;
use futures::{FutureExt, TryFutureExt};
Expand All @@ -15,6 +16,7 @@ use structopt::StructOpt;

use ya_agreement_utils::{AgreementView, OfferTemplate};
use ya_client::activity::ActivityProviderApi;
use ya_client_model::activity::provider_event::ProviderEventType;
use ya_client_model::activity::{ActivityState, ProviderEvent, State};
use ya_core_model::activity;
use ya_utils_actix::actix_handler::ResultTypeGetter;
Expand Down Expand Up @@ -122,6 +124,8 @@ pub struct TaskRunnerConfig {
pub process_termination_timeout: Duration,
#[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "10s")]
pub exeunit_state_retry_interval: Duration,
#[structopt(skip = "you-forgot-to-set-session-id")]
pub session_id: String,
}

// =========================================== //
Expand All @@ -144,6 +148,7 @@ pub struct TaskRunner {

config: Arc<TaskRunnerConfig>,

event_ts: DateTime<Utc>,
tasks_dir: PathBuf,
cache_dir: PathBuf,
}
Expand Down Expand Up @@ -192,6 +197,7 @@ impl TaskRunner {
activity_created: SignalSlot::<ActivityCreated>::new(),
activity_destroyed: SignalSlot::<ActivityDestroyed>::new(),
config: Arc::new(config),
event_ts: Utc::now(),
tasks_dir,
cache_dir,
})
Expand All @@ -205,25 +211,11 @@ impl TaskRunner {
self.registry.find_exeunit(&msg.name)
}

pub async fn collect_events(
client: Arc<ActivityProviderApi>,
addr: Addr<TaskRunner>,
) -> Result<()> {
match TaskRunner::query_events(client).await {
Err(error) => log::error!("Can't query activity events. Error: {:?}", error),
Ok(activity_events) => {
TaskRunner::dispatch_events(&activity_events, addr).await;
}
}

Ok(())
}

// =========================================== //
// TaskRunner internals - events dispatching
// =========================================== //

async fn dispatch_events(events: &Vec<ProviderEvent>, myself: Addr<TaskRunner>) {
async fn dispatch_events(events: Vec<ProviderEvent>, myself: &Addr<TaskRunner>) {
if events.len() == 0 {
return;
};
Expand All @@ -235,26 +227,22 @@ impl TaskRunner {
.into_iter()
.zip(iter::repeat(myself))
.map(|(event, myself)| async move {
let _ = match event {
ProviderEvent::CreateActivity {
activity_id,
agreement_id,
requestor_pub_key,
} => {
let _ = match event.event_type {
ProviderEventType::CreateActivity { requestor_pub_key } => {
myself
.send(CreateActivity::new(
activity_id,
agreement_id,
requestor_pub_key.as_ref(),
))
.send(CreateActivity {
activity_id: event.activity_id,
agreement_id: event.agreement_id,
requestor_pub_key,
})
.await?
}
ProviderEvent::DestroyActivity {
activity_id,
agreement_id,
} => {
ProviderEventType::DestroyActivity {} => {
myself
.send(DestroyActivity::new(activity_id, agreement_id))
.send(DestroyActivity {
activity_id: event.activity_id,
agreement_id: event.agreement_id,
})
.await?
}
}
Expand All @@ -266,10 +254,6 @@ impl TaskRunner {
let _ = join_all(futures).await;
}

async fn query_events(client: Arc<ActivityProviderApi>) -> Result<Vec<ProviderEvent>> {
Ok(client.get_activity_events(Some(3.), None).await?)
}

// =========================================== //
// TaskRunner internals - activity reactions
// =========================================== //
Expand Down Expand Up @@ -552,7 +536,10 @@ async fn remove_remaining_tasks(
agreement_id,
);
myself
.send(DestroyActivity::new(&activity_id, &agreement_id))
.send(DestroyActivity {
activity_id: activity_id.clone(),
agreement_id,
})
.await
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -609,13 +596,43 @@ forward_actix_handler!(
impl Handler<UpdateActivity> for TaskRunner {
type Result = ActorResponse<Self, (), Error>;

fn handle(&mut self, _msg: UpdateActivity, ctx: &mut Context<Self>) -> Self::Result {
let client = self.api.clone();
fn handle(&mut self, _: UpdateActivity, ctx: &mut Context<Self>) -> Self::Result {
let addr = ctx.address();
let client = self.api.clone();

let mut event_ts = self.event_ts.clone();
let app_session_id = self.config.session_id.clone();
let poll_timeout = Duration::from_secs(3);

let fut = async move {
let result = client
.get_activity_events(
Some(event_ts.clone()),
Some(app_session_id),
Some(poll_timeout),
None,
)
.await;

match result {
Ok(events) => {
events
.iter()
.max_by_key(|e| e.event_date)
.map(|e| event_ts = event_ts.max(e.event_date));
Self::dispatch_events(events, &addr).await;
}
Err(error) => log::error!("Can't query activity events: {:?}", error),
};
event_ts
}
.into_actor(self)
.map(|event_ts, actor, _| {
actor.event_ts = actor.event_ts.max(event_ts);
Ok(())
});

ActorResponse::r#async(
async move { TaskRunner::collect_events(client, addr).await }.into_actor(self),
)
ActorResponse::r#async(fut)
}
}

Expand Down Expand Up @@ -741,7 +758,10 @@ impl Handler<Shutdown> for TaskRunner {
let fut = async move {
for (activity_id, agreement_id) in ids {
if let Err(e) = addr
.send(DestroyActivity::new(&activity_id, &agreement_id))
.send(DestroyActivity {
activity_id,
agreement_id,
})
.await?
{
log::error!("Unable to destroy activity: {}", e);
Expand All @@ -753,30 +773,3 @@ impl Handler<Shutdown> for TaskRunner {
ActorResponse::r#async(fut.into_actor(self))
}
}

// =========================================== //
// Messages creation
// =========================================== //

impl CreateActivity {
pub fn new<S: AsRef<str>>(
activity_id: S,
agreement_id: S,
requestor_pub_key: Option<S>,
) -> CreateActivity {
CreateActivity {
activity_id: activity_id.as_ref().to_string(),
agreement_id: agreement_id.as_ref().to_string(),
requestor_pub_key: requestor_pub_key.map(|s| s.as_ref().to_string()),
}
}
}

impl DestroyActivity {
pub fn new(activity_id: &str, agreement_id: &str) -> DestroyActivity {
DestroyActivity {
activity_id: activity_id.to_string(),
agreement_id: agreement_id.to_string(),
}
}
}
29 changes: 11 additions & 18 deletions agent/provider/src/market/provider_market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use std::sync::Arc;
use ya_agreement_utils::{AgreementView, OfferDefinition};
use ya_client::market::MarketProviderApi;
use ya_client_model::market::{
agreement_event::AgreementTerminator, Agreement, AgreementOperationEvent as AgreementEvent,
NewOffer, Proposal, ProviderEvent, Reason,
agreement_event::AgreementTerminator, Agreement, NewOffer, Proposal, ProviderEvent, Reason,
};
use ya_utils_actix::{
actix_handler::ResultTypeGetter,
Expand All @@ -29,6 +28,7 @@ use crate::market::config::MarketConfig;
use crate::market::mock_negotiator::LimitAgreementsNegotiator;
use crate::market::termination_reason::GolemReason;
use crate::tasks::{AgreementBroken, AgreementClosed, CloseAgreement};
use ya_client_model::market::agreement_event::AgreementEventType;

// =========================================== //
// Public exposed messages
Expand Down Expand Up @@ -227,9 +227,7 @@ impl ProviderMarket {
) -> Result<()> {
log::info!("Got approved agreement [{}].", msg.agreement.agreement_id,);
// At this moment we only forward agreement to outside world.
self.agreement_signed_signal.send_signal(AgreementApproved {
agreement: msg.agreement,
})
self.agreement_signed_signal.send_signal(msg)
}

// =========================================== //
Expand Down Expand Up @@ -351,7 +349,7 @@ async fn process_proposal(
ProposalResponse::IgnoreProposal => log::info!("Ignoring proposal {:?}", proposal_id),
ProposalResponse::RejectProposal { reason } => {
ctx.api
.reject_proposal_with_reason(&subscription.id, proposal_id, &reason)
.reject_proposal(&subscription.id, proposal_id, &reason)
.await?;
}
},
Expand Down Expand Up @@ -463,13 +461,12 @@ async fn collect_agreement_events(ctx: AsyncCtx) {
};

for event in events {
match event {
AgreementEvent::AgreementTerminatedEvent {
agreement_id,
reason,
terminator,
event_date,
..
last_timestamp = event.event_date;
let agreement_id = event.agreement_id.clone();

match event.event_type {
AgreementEventType::AgreementTerminatedEvent {
reason, terminator, ..
} => {
// Ignore events sent in reaction to termination by us.
if terminator == AgreementTerminator::Requestor {
Expand All @@ -480,13 +477,9 @@ async fn collect_agreement_events(ctx: AsyncCtx) {
};
ctx.market.send(msg).await.ok();
}
last_timestamp = event_date
}
AgreementEvent::AgreementApprovedEvent { event_date, .. }
| AgreementEvent::AgreementRejectedEvent { event_date, .. }
| AgreementEvent::AgreementCancelledEvent { event_date, .. } => {
_ => {
log::trace!("Got: {:?}", event);
last_timestamp = event_date;
continue;
}
}
Expand Down
4 changes: 3 additions & 1 deletion agent/provider/src/provider_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ impl ProviderAgent {
.node_name
.clone()
.unwrap_or("provider".to_string());
args.market.session_id = format!("{}-[{}]", name, std::process::id());
let session_id = format!("{}-[{}]", name, std::process::id());
args.market.session_id = session_id.clone();
args.runner.session_id = session_id;

let mut globals = GlobalsManager::try_new(&config.globals_file, args.node)?;
globals.spawn_monitor(&config.globals_file)?;
Expand Down
17 changes: 17 additions & 0 deletions core/activity/migrations/2020-12-03-092313_events/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE activity_event_migrate (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
activity_id INTEGER NOT NULL,
identity_id VARCHAR(50) NOT NULL,
event_date DATETIME NOT NULL,
event_type_id INTEGER NOT NULL,
requestor_pub_key BLOB,
FOREIGN KEY(activity_id) REFERENCES activity (id),
FOREIGN KEY(event_type_id) REFERENCES activity_event_type (id)
);

INSERT INTO activity_event_migrate(activity_id, identity_id, event_date, event_type_id, requestor_pub_key)
SELECT activity_id, identity_id, event_date, event_type_id, requestor_pub_key
FROM activity_event;

DROP TABLE activity_event;
ALTER TABLE activity_event_migrate RENAME TO activity_event;
4 changes: 4 additions & 0 deletions core/activity/migrations/2020-12-03-092313_events/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE activity_event ADD COLUMN app_session_id VARCHAR(100);

CREATE INDEX idx_app_session_id
ON activity_event(app_session_id);
2 changes: 1 addition & 1 deletion core/activity/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn web_scope(db: &DbExecutor) -> Scope {
mod common {
use actix_web::{web, Responder};

use ya_core_model::{activity, market::Role};
use ya_core_model::{activity, Role};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
use ya_service_bus::{timeout::IntoTimeoutFuture, RpcEndpoint};
Expand Down
Loading