Skip to content

Commit

Permalink
Introduced NegotiatorComponent trait; Implement Composite negotiator …
Browse files Browse the repository at this point in the history
…which uses dynamic negotiation logic
  • Loading branch information
nieznanysprawiciel committed Dec 23, 2020
1 parent f935201 commit 0a2757b
Show file tree
Hide file tree
Showing 19 changed files with 521 additions and 33 deletions.
2 changes: 1 addition & 1 deletion agent/provider/src/execution/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl TaskRunner {
_ctx: &mut Context<Self>,
) -> Result<()> {
// Agreement waits for first create activity event.
let agreement_id = msg.agreement.agreement_id.clone();
let agreement_id = msg.agreement.id.clone();
self.active_agreements.insert(agreement_id, msg.agreement);
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions agent/provider/src/market/negotiator.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
mod accept_all;
mod builtin;
mod common;
mod component;
mod composite;
pub mod factory;
mod limit_agreements;

pub use accept_all::AcceptAllNegotiator;
pub use composite::CompositeNegotiator;
pub use limit_agreements::LimitAgreementsNegotiator;

pub use common::{
AgreementResponse, AgreementResult, Negotiator, NegotiatorAddr, ProposalResponse,
};

pub use component::{NegotiationResult, NegotiatorComponent, NegotiatorsPack, ProposalView};
5 changes: 5 additions & 0 deletions agent/provider/src/market/negotiator/builtin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod expiration;
mod max_agreements;

pub use expiration::LimitExpiration;
pub use max_agreements::MaxAgreements;
85 changes: 85 additions & 0 deletions agent/provider/src/market/negotiator/builtin/expiration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use anyhow::Result;
use chrono::{DateTime, Duration, TimeZone, Utc};

use ya_agreement_utils::OfferDefinition;
use ya_client_model::market::Reason;

use crate::market::negotiator::factory::LimitAgreementsNegotiatorConfig;
use crate::market::negotiator::{
AgreementResult, NegotiationResult, NegotiatorComponent, ProposalView,
};

/// Negotiator that can limit number of running agreements.
pub struct LimitExpiration {
min_expiration: Duration,
max_expiration: Duration,
}

impl LimitExpiration {
pub fn new(_config: &LimitAgreementsNegotiatorConfig) -> LimitExpiration {
LimitExpiration {
min_expiration: Duration::minutes(5),
max_expiration: Duration::minutes(30),
}
}
}

fn proposal_expiration_from(proposal: &ProposalView) -> Result<DateTime<Utc>> {
let expiration_key_str = "/golem/srv/comp/expiration";
let value = proposal
.pointer(expiration_key_str)
.ok_or_else(|| anyhow::anyhow!("Missing expiration key in Proposal"))?
.clone();
let timestamp: i64 = serde_json::from_value(value)?;
Ok(Utc.timestamp_millis(timestamp))
}

impl NegotiatorComponent for LimitExpiration {
fn negotiate_step(&mut self, demand: &ProposalView, offer: ProposalView) -> NegotiationResult {
let min_expiration = Utc::now() + self.min_expiration;
let max_expiration = Utc::now() + self.max_expiration;

let expiration = match proposal_expiration_from(&demand) {
Ok(expiration) => expiration,
Err(e) => {
return NegotiationResult::Reject {
reason: Some(Reason::new(e)),
}
}
};

if expiration > max_expiration || expiration < min_expiration {
log::info!(
"Negotiator: Reject proposal [{}] due to expiration limits.",
demand.id
);
NegotiationResult::Reject {
reason: Some(Reason::new(format!(
"Proposal expires at: {} which is less than 5 min or more than 30 min from now",
expiration
))),
}
} else {
NegotiationResult::Ready { offer }
}
}

fn fill_template(
&mut self,
offer_template: OfferDefinition,
) -> anyhow::Result<OfferDefinition> {
Ok(offer_template)
}

fn on_agreement_terminated(
&mut self,
_agreement_id: &str,
_result: &AgreementResult,
) -> anyhow::Result<()> {
Ok(())
}

fn on_agreement_approved(&mut self, _agreement_id: &str) -> anyhow::Result<()> {
Ok(())
}
}
80 changes: 80 additions & 0 deletions agent/provider/src/market/negotiator/builtin/max_agreements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use anyhow::bail;
use std::collections::HashSet;

use ya_agreement_utils::OfferDefinition;
use ya_client_model::market::Reason;

use crate::market::negotiator::factory::LimitAgreementsNegotiatorConfig;
use crate::market::negotiator::{
AgreementResult, NegotiationResult, NegotiatorComponent, ProposalView,
};

/// Negotiator that can limit number of running agreements.
pub struct MaxAgreements {
active_agreements: HashSet<String>,
max_agreements: u32,
}

impl MaxAgreements {
pub fn new(config: &LimitAgreementsNegotiatorConfig) -> MaxAgreements {
MaxAgreements {
max_agreements: config.max_simultaneous_agreements,
active_agreements: HashSet::new(),
}
}

pub fn has_free_slot(&self) -> bool {
self.active_agreements.len() < self.max_agreements as usize
}
}

impl NegotiatorComponent for MaxAgreements {
fn negotiate_step(&mut self, demand: &ProposalView, offer: ProposalView) -> NegotiationResult {
if self.has_free_slot() {
NegotiationResult::Ready { offer }
} else {
log::info!(
"'MaxAgreements' negotiator: Reject proposal [{}] due to limit.",
demand.id, // TODO: Should be just `id`, but I reuse AgreementView struct.
);
NegotiationResult::Reject {
reason: Some(Reason::new(format!(
"No capacity available. Reached Agreements limit: {}",
self.max_agreements
))),
}
}
}

fn fill_template(
&mut self,
offer_template: OfferDefinition,
) -> anyhow::Result<OfferDefinition> {
Ok(offer_template)
}

fn on_agreement_terminated(
&mut self,
agreement_id: &str,
_result: &AgreementResult,
) -> anyhow::Result<()> {
self.active_agreements.remove(agreement_id);

let free_slots = self.max_agreements as usize - self.active_agreements.len();
log::info!("Negotiator: {} free slot(s) for agreements.", free_slots);
Ok(())
}

fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()> {
if self.has_free_slot() {
self.active_agreements.insert(agreement_id.to_string());
Ok(())
} else {
self.active_agreements.insert(agreement_id.to_string());
bail!(
"Agreement [{}] approved despite not available capacity.",
agreement_id
)
}
}
}
3 changes: 3 additions & 0 deletions agent/provider/src/market/negotiator/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct CreateOffer {
#[derive(Message)]
#[rtype(result = "Result<ProposalResponse>")]
pub struct ReactToProposal {
pub offer_id: String,
pub offer: NewOffer,
pub demand: Proposal,
}
Expand Down Expand Up @@ -143,12 +144,14 @@ impl NegotiatorAddr {
pub async fn react_to_proposal(
&self,
offer: &NewOffer,
offer_id: &str,
demand: &Proposal,
) -> Result<ProposalResponse> {
self.on_proposal
.send(ReactToProposal {
demand: demand.clone(),
offer: offer.clone(),
offer_id: offer_id.to_string(),
})
.await?
}
Expand Down
165 changes: 165 additions & 0 deletions agent/provider/src/market/negotiator/component.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use anyhow::anyhow;
use std::collections::HashMap;

use ya_agreement_utils::{AgreementView, OfferDefinition};
use ya_client::model::market::Reason;

use crate::market::negotiator::AgreementResult;

pub type ProposalView = AgreementView;

/// Result returned by `NegotiatorComponent` during Proposals evaluation.
pub enum NegotiationResult {
/// `NegotiatorComponent` fully negotiated his part of Proposal,
/// and it can be turned into valid Agreement. Provider will send
/// counter Proposal.
Ready { offer: ProposalView },
/// Proposal is not ready to become Agreement, but negotiations
/// are in progress.
Negotiating { offer: ProposalView },
/// Proposal is not acceptable and should be rejected.
/// Negotiations can't be continued.
Reject { reason: Option<Reason> },
}

/// `NegotiatorComponent` implements negotiation logic for part of Agreement
/// specification. Components should be as granular as possible to allow composition
/// with other Components.
///
/// Future goal is to allow developers to create their own specifications and implement
/// components, that are able to negotiate this specification.
/// It would be useful to have `NegotiatorComponenta`, that can be loaded from shared library
/// or can communicate with negotiation logic in external process (maybe RPC or TCP??).
pub trait NegotiatorComponent {
/// Push forward negotiations as far as you can.
/// `NegotiatorComponent` should modify only properties in his responsibility
/// and return remaining part of Proposal unchanged.
fn negotiate_step(&mut self, demand: &ProposalView, offer: ProposalView) -> NegotiationResult;

/// Called during Offer creation. `NegotiatorComponent` should add properties
/// and constraints for which it is responsible during future negotiations.
fn fill_template(&mut self, offer_template: OfferDefinition)
-> anyhow::Result<OfferDefinition>;

/// Called when Agreement was finished. `NegotiatorComponent` can use termination
/// result to adjust his future negotiation strategy.
fn on_agreement_terminated(
&mut self,
agreement_id: &str,
result: &AgreementResult,
) -> anyhow::Result<()>;

/// Called when Negotiator decided to approve Agreement. It's only notification,
/// `NegotiatorComponent` can't reject Agreement anymore.
fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()>;
}

pub struct NegotiatorsPack {
components: HashMap<String, Box<dyn NegotiatorComponent>>,
}

impl NegotiatorsPack {
pub fn new() -> NegotiatorsPack {
NegotiatorsPack {
components: HashMap::new(),
}
}

pub fn add_component(
mut self,
name: &str,
component: Box<dyn NegotiatorComponent>,
) -> NegotiatorsPack {
self.components.insert(name.to_string(), component);
self
}
}

impl NegotiatorComponent for NegotiatorsPack {
fn negotiate_step(
&mut self,
demand: &ProposalView,
mut offer: ProposalView,
) -> NegotiationResult {
let mut all_ready = true;
for (name, component) in &mut self.components {
let result = component.negotiate_step(demand, offer);
offer = match result {
NegotiationResult::Ready { offer } => offer,
NegotiationResult::Negotiating { offer } => {
log::info!(
"Negotiator component '{}' is still negotiating Proposal [{}].",
name,
demand.id
);
all_ready = false;
offer
}
NegotiationResult::Reject { reason } => {
return NegotiationResult::Reject { reason }
}
}
}

// Full negotiations is ready only, if all `NegotiatorComponent` returned
// ready state. Otherwise we must still continue negotiations.
match all_ready {
true => NegotiationResult::Ready { offer },
false => NegotiationResult::Negotiating { offer },
}
}

fn fill_template(
&mut self,
mut offer_template: OfferDefinition,
) -> anyhow::Result<OfferDefinition> {
for (name, component) in &mut self.components {
offer_template = component.fill_template(offer_template).map_err(|e| {
anyhow!(
"Negotiator component '{}' failed filling Offer template. {}",
name,
e
)
})?;
}
Ok(offer_template)
}

fn on_agreement_terminated(
&mut self,
agreement_id: &str,
result: &AgreementResult,
) -> anyhow::Result<()> {
for (name, component) in &mut self.components {
component
.on_agreement_terminated(agreement_id, result)
.map_err(|e| {
log::warn!(
"Negotiator component '{}' failed handling Agreement [{}] termination. {}",
name,
agreement_id,
e
)
})
.ok();
}
Ok(())
}

fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()> {
for (name, component) in &mut self.components {
component
.on_agreement_approved(agreement_id)
.map_err(|e| {
log::warn!(
"Negotiator component '{}' failed handling Agreement [{}] approval. {}",
name,
agreement_id,
e
)
})
.ok();
}
Ok(())
}
}
Loading

0 comments on commit 0a2757b

Please sign in to comment.