From 61872dc5c592cca10b372ada48e947808856e248 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 29 Sep 2020 08:19:38 -0700 Subject: [PATCH 01/24] update --- .../paypal/aurora/scheduler/offers/Host.java | 6 + .../aurora/scheduler/offers/OfferSetImpl.java | 230 ++++++++++++++++++ .../scheduler/offers/OfferSetModule.java | 42 ++++ .../aurora/scheduler/offers/PluginConfig.java | 19 ++ .../scheduler/offers/PreviousOffer.java | 22 ++ .../aurora/scheduler/offers/Resource.java | 12 + .../scheduler/offers/ScheduleRequest.java | 11 + .../scheduler/offers/ScheduleResponse.java | 8 + 8 files changed, 350 insertions(+) create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/Host.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/PreviousOffer.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/Resource.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/Host.java b/src/main/java/com/paypal/aurora/scheduler/offers/Host.java new file mode 100644 index 000000000..6eea829bf --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/Host.java @@ -0,0 +1,6 @@ +package com.paypal.aurora.scheduler.offers; + +public class Host { + public String name; + public Resource offer; +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java new file mode 100644 index 000000000..429f816ba --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java @@ -0,0 +1,230 @@ +package com.paypal.aurora.scheduler.offers; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.google.gson.Gson; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferSet; + +import org.apache.aurora.scheduler.resources.ResourceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation for OfferSet. Backed by a ConcurrentSkipListSet. + */ +@VisibleForTesting +public class OfferSetImpl implements OfferSet { + + private final Set offers; + private Map previousOffers; + private static final Logger LOG = LoggerFactory.getLogger(OfferSetImpl.class); + + @Inject + public OfferSetImpl(Ordering ordering) { + offers = new ConcurrentSkipListSet<>(ordering); + previousOffers = new HashMap(); + } + + @Override + public void add(HostOffer offer) { + offers.add(offer); + } + + @Override + public void remove(HostOffer removed) { + offers.remove(removed); + } + + @Override + public int size() { + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more + // expensive. + // Could track this separately if it turns out to pose problems. + return offers.size(); + } + + @Override + public void clear() { + offers.clear(); + } + + @Override + public Iterable values() { + return offers; + } + + private Timestamp prevTimestamp = new Timestamp(System.currentTimeMillis()); + private long ageInMilliseconds = 60*1000; + @Override + public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + LOG.info("getOrdered for task " + groupKey + " with request: " + resourceRequest.getResourceBag()); + Timestamp now = new Timestamp(System.currentTimeMillis()); + // filter out the offers that have just been updated. + List newOffers = new LinkedList(); + List oldOffers = new LinkedList(); + + // if all offers have just recently updated recently within ageInMilliseconds. +// if (previousOffers.size()>0 && now.getTime() - prevTimestamp.getTime() < ageInMilliseconds){ + if (false){ + for (HostOffer offer: offers) { + PreviousOffer previousOffer = this.previousOffers.get(offer.getAttributes().getHost()); + if (previousOffer!=null) { + // if the offer is old enough, skip it. + if (now.getTime() - previousOffer.getTimestamp().getTime() < ageInMilliseconds) { + continue; + } + // if the offer was changed, avoid using it. + if (!offer.getResourceBag(true).equals(previousOffer.getOffer().getResourceBag(true)) + || !offer.getResourceBag(false).equals(previousOffer.getOffer().getResourceBag(false))) { + // update the new offer with the timestamp. + this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); + newOffers.add(offer); + continue; + } + } else { + // if the offer is fresh, add it to the PreviousOffers + this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); + } + oldOffers.add(offer); + } + for (HostOffer offer: offers){ + oldOffers.add(offer); + this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); + } + } else { + // cache the offers + for (HostOffer offer: offers){ + oldOffers.add(offer); + this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); + } + } + // reset the prevTimestamp + this.prevTimestamp = now; + List orderedOffers = pluginSchedule(resourceRequest, oldOffers); + if (orderedOffers != null){ + orderedOffers.addAll(newOffers); + return orderedOffers; + } + // fallback to default scheduler. + LOG.warn("MagicMatch failed to schedule the task. Aurora uses the default scheduler."); + return offers; + } + + private List pluginSchedule(ResourceRequest resourceRequest, List offers){ + PluginConfig plugin = new PluginConfig(); + Gson gson = new Gson(); + List orderedOffers = new ArrayList(); + Map offerMap = new HashMap(); + for(HostOffer offer: offers) { + offerMap.put(offer.getAttributes().getHost(), offer); + } + // send the Rest API request to the scheduler plugin + URL url = null; + try { + url = new URL(plugin.getEndpoint() + "/v1/offerset"); + } catch (MalformedURLException e) { + LOG.error(e.toString()); + return null; + } + + // create json request + ScheduleRequest scheduleRequest = new ScheduleRequest(); + scheduleRequest.request.cpu = resourceRequest.getResourceBag().valueOf(ResourceType.CPUS); + scheduleRequest.request.memory = resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB); + scheduleRequest.request.disk = resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB); + scheduleRequest.hosts = new Host[offers.size()]; + int i =0; + for (HostOffer offer:offers) { + scheduleRequest.hosts[i] = new Host(); + scheduleRequest.hosts[i].name = offer.getAttributes().getHost(); + double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS); + double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB) ; + double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); + scheduleRequest.hosts[i].offer = new Resource(cpu, memory, disk); + i++; + } + + // create connection + HttpURLConnection con = null; + try { + con = (HttpURLConnection)url.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json; utf-8"); + con.setRequestProperty("Accept", "application/json"); + con.setDoOutput(true); + } catch (ProtocolException pe) { + LOG.error(pe.toString()); + return null; + } catch (IOException ioe) { + LOG.error(ioe.toString()); + return null; + } + String jsonStr = gson.toJson(scheduleRequest); + LOG.info("jsonStr: "+jsonStr); + try(OutputStream os = con.getOutputStream()) { + byte[] input = jsonStr.getBytes("utf-8"); + os.write(input, 0, input.length); + } catch (UnsupportedEncodingException uee) { + LOG.error(uee.toString()); + return null; + } catch (IOException ioe){ + LOG.error(ioe.toString()); + return null; + } + // read response + StringBuilder response = new StringBuilder(); + try{ + BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), "utf-8")); + String responseLine = null; + while ((responseLine = br.readLine()) != null) { + response.append(responseLine.trim()); + } + } catch (UnsupportedEncodingException uee){ + LOG.error(uee.toString()); + return null; + } catch (IOException ioe) { + LOG.error(ioe.toString()); + return null; + } + ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); + LOG.info("jsonResponse: "+response.toString()); + // process the scheduleResult + if (scheduleResponse.hosts!=null){ + for(String host: scheduleResponse.hosts) { + HostOffer offer = offerMap.get(host); + if (offer!=null) { + orderedOffers.add(offer); + } else { + LOG.error("Cannot find this host "+host+" in "+offerMap.size()+" offers"); + } + } + } + return orderedOffers; + } +} \ No newline at end of file diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java new file mode 100644 index 000000000..bda2a0f1f --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java @@ -0,0 +1,42 @@ +package com.paypal.aurora.scheduler.offers; + +import javax.inject.Singleton; + +import com.google.common.collect.Ordering; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; + +import com.google.inject.TypeLiteral; + +import org.apache.aurora.scheduler.config.CliOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferOrderBuilder; +import org.apache.aurora.scheduler.offers.OfferSet; + +public class OfferSetModule extends AbstractModule { + private final CliOptions options; + private static final Logger LOG = LoggerFactory.getLogger(OfferSetModule.class); + + public OfferSetModule(CliOptions options) { + this.options = options; + } + + @Override + protected void configure() { + LOG.info("PayPal Offer Set module Enabled."); + install(new PrivateModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { }) + .toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); + bind(OfferSetImpl.class).in(Singleton.class); + bind(OfferSet.class).to(OfferSetImpl.class); + expose(OfferSet.class); + } + }); + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java b/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java new file mode 100644 index 000000000..f45935259 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java @@ -0,0 +1,19 @@ +package com.paypal.aurora.scheduler.offers; + +public class PluginConfig { + private String endpoint = "http://localhost:9090"; + private String configJson = "/etc/aurora-plugin.json"; + + public PluginConfig() { + } + + public PluginConfig(String endpoint, String configJson) { + this.endpoint = endpoint; + this.configJson = configJson; + //TODO: (nhatle) load the aurora-plugin.json + } + + public String getEndpoint() { + return this.endpoint; + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/PreviousOffer.java b/src/main/java/com/paypal/aurora/scheduler/offers/PreviousOffer.java new file mode 100644 index 000000000..24e9760f4 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/PreviousOffer.java @@ -0,0 +1,22 @@ +package com.paypal.aurora.scheduler.offers; + +import java.sql.Timestamp; +import org.apache.aurora.scheduler.offers.HostOffer; + +public class PreviousOffer { + private Timestamp timestamp; + private HostOffer offer; + + public PreviousOffer(Timestamp now, HostOffer offer){ + this.timestamp = now; + this.offer = offer; + } + + public Timestamp getTimestamp() { + return this.timestamp; + } + + public HostOffer getOffer(){ + return this.offer; + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/Resource.java b/src/main/java/com/paypal/aurora/scheduler/offers/Resource.java new file mode 100644 index 000000000..282db44d2 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/Resource.java @@ -0,0 +1,12 @@ +package com.paypal.aurora.scheduler.offers; + +public class Resource { + public double cpu; + public double memory; + public double disk; + public Resource(double cpu, double memory, double disk){ + this.cpu = cpu; + this.memory = memory; + this.disk = disk; + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java b/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java new file mode 100644 index 000000000..cf27ce222 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java @@ -0,0 +1,11 @@ +package com.paypal.aurora.scheduler.offers; + +import org.apache.aurora.scheduler.resources.ResourceBag; + +public class ScheduleRequest { + public Resource request; + public Host []hosts; + public ScheduleRequest(){ + request = new Resource(0.0,0.0,0.0); + } +} \ No newline at end of file diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java b/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java new file mode 100644 index 000000000..6f1e98dd1 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java @@ -0,0 +1,8 @@ +package com.paypal.aurora.scheduler.offers; + +import org.apache.aurora.scheduler.offers.HostOffer; + +public class ScheduleResponse { + public String error; + public String []hosts; +} \ No newline at end of file From 0b8f1ae8d844f019cc2c085d990058da4d5f2083 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Mon, 12 Oct 2020 16:50:33 -0700 Subject: [PATCH 02/24] add paypal plugin --- .../paypal/aurora/scheduler/model/Config.java | 7 ++ .../paypal/aurora/scheduler/model/Host.java | 8 +++ .../scheduler/{offers => model}/Resource.java | 2 +- .../{offers => model}/ScheduleRequest.java | 6 +- .../{offers => model}/ScheduleResponse.java | 2 +- .../paypal/aurora/scheduler/offers/Host.java | 6 -- .../aurora/scheduler/offers/OfferSetImpl.java | 72 +++++-------------- .../aurora/scheduler/offers/PluginConfig.java | 51 ++++++++++--- .../scheduler/offers/PreviousOffer.java | 22 ------ 9 files changed, 78 insertions(+), 98 deletions(-) create mode 100644 src/main/java/com/paypal/aurora/scheduler/model/Config.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/model/Host.java rename src/main/java/com/paypal/aurora/scheduler/{offers => model}/Resource.java (83%) rename src/main/java/com/paypal/aurora/scheduler/{offers => model}/ScheduleRequest.java (51%) rename src/main/java/com/paypal/aurora/scheduler/{offers => model}/ScheduleResponse.java (75%) delete mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/Host.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/PreviousOffer.java diff --git a/src/main/java/com/paypal/aurora/scheduler/model/Config.java b/src/main/java/com/paypal/aurora/scheduler/model/Config.java new file mode 100644 index 000000000..3b47ecbcc --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/model/Config.java @@ -0,0 +1,7 @@ +package com.paypal.aurora.scheduler.model; + +// for aurora plugin configuration +public class Config { + public String host; + public int port; +} diff --git a/src/main/java/com/paypal/aurora/scheduler/model/Host.java b/src/main/java/com/paypal/aurora/scheduler/model/Host.java new file mode 100644 index 000000000..e908994c5 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/model/Host.java @@ -0,0 +1,8 @@ +package com.paypal.aurora.scheduler.model; + +import com.paypal.aurora.scheduler.model.Resource; + +public class Host { + public String name; + public Resource offer; +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/Resource.java b/src/main/java/com/paypal/aurora/scheduler/model/Resource.java similarity index 83% rename from src/main/java/com/paypal/aurora/scheduler/offers/Resource.java rename to src/main/java/com/paypal/aurora/scheduler/model/Resource.java index 282db44d2..72da0301b 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/Resource.java +++ b/src/main/java/com/paypal/aurora/scheduler/model/Resource.java @@ -1,4 +1,4 @@ -package com.paypal.aurora.scheduler.offers; +package com.paypal.aurora.scheduler.model; public class Resource { public double cpu; diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java similarity index 51% rename from src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java rename to src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java index cf27ce222..7df36380b 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleRequest.java +++ b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java @@ -1,10 +1,8 @@ -package com.paypal.aurora.scheduler.offers; - -import org.apache.aurora.scheduler.resources.ResourceBag; +package com.paypal.aurora.scheduler.model; public class ScheduleRequest { public Resource request; - public Host []hosts; + public Host[]hosts; public ScheduleRequest(){ request = new Resource(0.0,0.0,0.0); } diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java similarity index 75% rename from src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java rename to src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java index 6f1e98dd1..03bcb8bcd 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/ScheduleResponse.java +++ b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java @@ -1,4 +1,4 @@ -package com.paypal.aurora.scheduler.offers; +package com.paypal.aurora.scheduler.model; import org.apache.aurora.scheduler.offers.HostOffer; diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/Host.java b/src/main/java/com/paypal/aurora/scheduler/offers/Host.java deleted file mode 100644 index 6eea829bf..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/offers/Host.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.paypal.aurora.scheduler.offers; - -public class Host { - public String name; - public Resource offer; -} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java index 429f816ba..1a0efd9d4 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java +++ b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java @@ -13,16 +13,20 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.google.gson.Gson; +import com.paypal.aurora.scheduler.model.Host; +import com.paypal.aurora.scheduler.model.Resource; +import com.paypal.aurora.scheduler.model.ScheduleRequest; +import com.paypal.aurora.scheduler.model.ScheduleResponse; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.offers.HostOffer; @@ -39,13 +43,11 @@ public class OfferSetImpl implements OfferSet { private final Set offers; - private Map previousOffers; private static final Logger LOG = LoggerFactory.getLogger(OfferSetImpl.class); @Inject public OfferSetImpl(Ordering ordering) { offers = new ConcurrentSkipListSet<>(ordering); - previousOffers = new HashMap(); } @Override @@ -80,61 +82,20 @@ public Iterable values() { private long ageInMilliseconds = 60*1000; @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - LOG.info("getOrdered for task " + groupKey + " with request: " + resourceRequest.getResourceBag()); - Timestamp now = new Timestamp(System.currentTimeMillis()); - // filter out the offers that have just been updated. - List newOffers = new LinkedList(); - List oldOffers = new LinkedList(); - - // if all offers have just recently updated recently within ageInMilliseconds. -// if (previousOffers.size()>0 && now.getTime() - prevTimestamp.getTime() < ageInMilliseconds){ - if (false){ - for (HostOffer offer: offers) { - PreviousOffer previousOffer = this.previousOffers.get(offer.getAttributes().getHost()); - if (previousOffer!=null) { - // if the offer is old enough, skip it. - if (now.getTime() - previousOffer.getTimestamp().getTime() < ageInMilliseconds) { - continue; - } - // if the offer was changed, avoid using it. - if (!offer.getResourceBag(true).equals(previousOffer.getOffer().getResourceBag(true)) - || !offer.getResourceBag(false).equals(previousOffer.getOffer().getResourceBag(false))) { - // update the new offer with the timestamp. - this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); - newOffers.add(offer); - continue; - } - } else { - // if the offer is fresh, add it to the PreviousOffers - this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); - } - oldOffers.add(offer); - } - for (HostOffer offer: offers){ - oldOffers.add(offer); - this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); - } - } else { - // cache the offers - for (HostOffer offer: offers){ - oldOffers.add(offer); - this.previousOffers.put(offer.getAttributes().getHost(), new PreviousOffer(now, offer)); - } - } - // reset the prevTimestamp - this.prevTimestamp = now; - List orderedOffers = pluginSchedule(resourceRequest, oldOffers); + List orderedOffers = getOffersFromPlugin(resourceRequest, offers); if (orderedOffers != null){ - orderedOffers.addAll(newOffers); return orderedOffers; } // fallback to default scheduler. - LOG.warn("MagicMatch failed to schedule the task. Aurora uses the default scheduler."); + LOG.error("MagicMatch failed to schedule the task. Aurora uses the default scheduler."); return offers; } - - private List pluginSchedule(ResourceRequest resourceRequest, List offers){ - PluginConfig plugin = new PluginConfig(); + PluginConfig plugin = null; + // getOffersFromPlugin gets the offers from MagicMatch. + private List getOffersFromPlugin(ResourceRequest resourceRequest, Iterable offers){ + if (plugin==null){ + plugin = new PluginConfig(); + } Gson gson = new Gson(); List orderedOffers = new ArrayList(); Map offerMap = new HashMap(); @@ -149,13 +110,12 @@ private List pluginSchedule(ResourceRequest resourceRequest, List pluginSchedule(ResourceRequest resourceRequest, List pluginSchedule(ResourceRequest resourceRequest, List Date: Mon, 19 Oct 2020 23:01:40 -0700 Subject: [PATCH 03/24] resolve the review comments --- .../paypal/aurora/scheduler/model/Config.java | 7 - .../paypal/aurora/scheduler/model/Host.java | 8 - .../aurora/scheduler/model/Resource.java | 12 - .../scheduler/model/ScheduleRequest.java | 9 - .../scheduler/model/ScheduleResponse.java | 8 - .../offers/MagicMatchOfferSetImpl.java | 253 ++++++++++++++++++ .../offers/MagicMatchOfferSetModule.java | 51 ++++ .../aurora/scheduler/offers/OfferSetImpl.java | 190 ------------- .../scheduler/offers/OfferSetModule.java | 42 --- .../aurora/scheduler/offers/PluginConfig.java | 64 +++-- 10 files changed, 342 insertions(+), 302 deletions(-) delete mode 100644 src/main/java/com/paypal/aurora/scheduler/model/Config.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/model/Host.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/model/Resource.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java create mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java delete mode 100644 src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java diff --git a/src/main/java/com/paypal/aurora/scheduler/model/Config.java b/src/main/java/com/paypal/aurora/scheduler/model/Config.java deleted file mode 100644 index 3b47ecbcc..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/model/Config.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.paypal.aurora.scheduler.model; - -// for aurora plugin configuration -public class Config { - public String host; - public int port; -} diff --git a/src/main/java/com/paypal/aurora/scheduler/model/Host.java b/src/main/java/com/paypal/aurora/scheduler/model/Host.java deleted file mode 100644 index e908994c5..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/model/Host.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.paypal.aurora.scheduler.model; - -import com.paypal.aurora.scheduler.model.Resource; - -public class Host { - public String name; - public Resource offer; -} diff --git a/src/main/java/com/paypal/aurora/scheduler/model/Resource.java b/src/main/java/com/paypal/aurora/scheduler/model/Resource.java deleted file mode 100644 index 72da0301b..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/model/Resource.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.paypal.aurora.scheduler.model; - -public class Resource { - public double cpu; - public double memory; - public double disk; - public Resource(double cpu, double memory, double disk){ - this.cpu = cpu; - this.memory = memory; - this.disk = disk; - } -} diff --git a/src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java deleted file mode 100644 index 7df36380b..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/model/ScheduleRequest.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.paypal.aurora.scheduler.model; - -public class ScheduleRequest { - public Resource request; - public Host[]hosts; - public ScheduleRequest(){ - request = new Resource(0.0,0.0,0.0); - } -} \ No newline at end of file diff --git a/src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java b/src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java deleted file mode 100644 index 03bcb8bcd..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/model/ScheduleResponse.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.paypal.aurora.scheduler.model; - -import org.apache.aurora.scheduler.offers.HostOffer; - -public class ScheduleResponse { - public String error; - public String []hosts; -} \ No newline at end of file diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java b/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java new file mode 100644 index 000000000..749ae3d14 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java @@ -0,0 +1,253 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.paypal.aurora.scheduler.offers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.gson.Gson; +import com.google.inject.Inject; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferSet; +import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation for OfferSet. + */ +@VisibleForTesting +public class MagicMatchOfferSetImpl implements OfferSet { + + private final Set offers; + private static final Logger LOG = LoggerFactory.getLogger(MagicMatchOfferSetImpl.class); + private PluginConfig plugin = null; + private final Gson gson; + + @Inject + public MagicMatchOfferSetImpl(Ordering ordering) { + offers = new ConcurrentSkipListSet<>(ordering); + gson = new Gson(); + } + + @Override + public void add(HostOffer offer) { + offers.add(offer); + } + + @Override + public void remove(HostOffer removed) { + offers.remove(removed); + } + + @Override + public int size() { + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more + // expensive. + // Could track this separately if it turns out to pose problems. + return offers.size(); + } + + @Override + public void clear() { + offers.clear(); + } + + @Override + public Iterable values() { + return offers; + } + + @Override + public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + List orderedOffers = getOffersFromPlugin(resourceRequest); + if (orderedOffers != null) { + return orderedOffers; + } + // fall back to default scheduler. + LOG.warn("MagicMatch failed to schedule the task. Falling back on default ordering."); + return offers; + } + + // getOffersFromPlugin gets the offers from MagicMatch. + private List getOffersFromPlugin(ResourceRequest resourceRequest) { + if (plugin == null) { + plugin = new PluginConfig(); + } + List orderedOffers = new ArrayList<>(); + Map offerMap = new HashMap<>(); + for (HostOffer offer : offers) { + offerMap.put(offer.getAttributes().getHost(), offer); + } + // send the Rest API request to the scheduler plugin + URL url; + try { + url = new URL(plugin.getEndpoint() + "/v1/offerset"); + } catch (MalformedURLException e) { + LOG.error(e.toString()); + return null; + } + + // create json request + Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), + resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), + resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); + Host[] hosts = new Host[Iterables.size(offers)]; + int i = 0; + for (HostOffer offer : offers) { + hosts[i] = new Host(); + hosts[i].name = offer.getAttributes().getHost(); + double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS); + double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); + double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); + hosts[i].offer = new Resource(cpu, memory, disk); + i++; + } + ScheduleRequest scheduleRequest = new ScheduleRequest(req, hosts); + LOG.debug(scheduleRequest.toString()); + + // create connection + HttpURLConnection con; + try { + con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json; utf-8"); + con.setRequestProperty("Accept", "application/json"); + con.setDoOutput(true); + } catch (ProtocolException pe) { + LOG.error("The HTTP protocol was not setup correctly. \n" + pe.toString()); + return null; + } catch (IOException ioe) { + LOG.error("Unable to open HTTP connection. \n" + ioe.toString()); + return null; + } + String jsonStr = gson.toJson(scheduleRequest); + LOG.debug("request to plugin: " + jsonStr); + try (OutputStream os = con.getOutputStream()) { + byte[] input = jsonStr.getBytes(StandardCharsets.UTF_8); + os.write(input, 0, input.length); + } catch (UnsupportedEncodingException uee) { + LOG.error("ScheduleRequest json is not valid.\n" + uee.toString()); + return null; + } catch (IOException ioe) { + LOG.error("Unable to send scheduleRequest to MagicMatch .\n" + ioe.toString()); + return null; + } + + // read response + StringBuilder response = new StringBuilder(); + try { + String responseLine = IOUtils.toString(con.getInputStream(), StandardCharsets.UTF_8); + response.append(responseLine.trim()); + } catch (UnsupportedEncodingException uee) { + LOG.error("MagicMatch response is not valid.\n" + uee.toString()); + return null; + } catch (IOException ioe) { + LOG.error("Unable to read the response from MagicMatch.\n" + ioe.toString()); + return null; + } + ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); + LOG.debug("plugin response: " + response.toString()); + + // process the scheduleResult + if (scheduleResponse.error.equals("") && scheduleResponse.hosts != null) { + for (String host : scheduleResponse.hosts) { + HostOffer offer = offerMap.get(host); + if (offer == null) { + LOG.error("Cannot find this host " + host + " in " + offerMap.toString()); + } else { + orderedOffers.add(offer); + } + } + return orderedOffers; + } + LOG.error("Unable to get sorted offers from MagicMatch due to " + scheduleResponse.error); + return null; + } + + // Host represents for each host offer. + static class Host { + String name; + Resource offer; + + @Override + public String toString() { + return "Host{" + "name='" + name + '\'' + ", offer=" + offer + '}'; + } + } + + // Resource is used between Aurora and MagicMatch. + static class Resource { + double cpu; + double memory; + double disk; + + Resource(double cpu, double memory, double disk) { + this.cpu = cpu; + this.memory = memory; + this.disk = disk; + } + + @Override + public String toString() { + return "Resource{" + "cpu=" + cpu + ", memory=" + memory + ", disk=" + disk + '}'; + } + } + + // ScheduleRequest is the request sent to MagicMatch. + static class ScheduleRequest { + Resource request; + Host[] hosts; + + ScheduleRequest(Resource request, Host... hosts) { + this.request = request; + this.hosts = hosts; + } + + @Override + public String toString() { + return "ScheduleRequest{" + "request=" + request + ", hosts=" + Arrays.toString(hosts) + '}'; + } + } + + // ScheduleResponse is the scheduling result responded by MagicMatch + static class ScheduleResponse { + String error; + String[] hosts; + + @Override + public String toString() { + return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" + + Arrays.toString(hosts) + '}'; + } + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java b/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java new file mode 100644 index 000000000..66b28fca1 --- /dev/null +++ b/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java @@ -0,0 +1,51 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.paypal.aurora.scheduler.offers; + +import javax.inject.Singleton; + +import com.google.common.collect.Ordering; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferOrderBuilder; +import org.apache.aurora.scheduler.offers.OfferSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MagicMatchOfferSetModule extends AbstractModule { + private final CliOptions options; + private static final Logger LOG = LoggerFactory.getLogger(MagicMatchOfferSetModule.class); + + public MagicMatchOfferSetModule(CliOptions options) { + this.options = options; + } + + @Override + protected void configure() { + LOG.info("MagicMatch OfferSet Module Enabled."); + install(new PrivateModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); + bind(OfferSet.class).to(MagicMatchOfferSetImpl.class).in(Singleton.class); + expose(OfferSet.class); + } + }); + } +} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java deleted file mode 100644 index 1a0efd9d4..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java +++ /dev/null @@ -1,190 +0,0 @@ -package com.paypal.aurora.scheduler.offers; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.ProtocolException; -import java.net.URL; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import com.google.inject.Inject; -import com.google.gson.Gson; - -import com.paypal.aurora.scheduler.model.Host; -import com.paypal.aurora.scheduler.model.Resource; -import com.paypal.aurora.scheduler.model.ScheduleRequest; -import com.paypal.aurora.scheduler.model.ScheduleResponse; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.offers.HostOffer; -import org.apache.aurora.scheduler.offers.OfferSet; - -import org.apache.aurora.scheduler.resources.ResourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Default implementation for OfferSet. Backed by a ConcurrentSkipListSet. - */ -@VisibleForTesting -public class OfferSetImpl implements OfferSet { - - private final Set offers; - private static final Logger LOG = LoggerFactory.getLogger(OfferSetImpl.class); - - @Inject - public OfferSetImpl(Ordering ordering) { - offers = new ConcurrentSkipListSet<>(ordering); - } - - @Override - public void add(HostOffer offer) { - offers.add(offer); - } - - @Override - public void remove(HostOffer removed) { - offers.remove(removed); - } - - @Override - public int size() { - // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more - // expensive. - // Could track this separately if it turns out to pose problems. - return offers.size(); - } - - @Override - public void clear() { - offers.clear(); - } - - @Override - public Iterable values() { - return offers; - } - - private Timestamp prevTimestamp = new Timestamp(System.currentTimeMillis()); - private long ageInMilliseconds = 60*1000; - @Override - public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - List orderedOffers = getOffersFromPlugin(resourceRequest, offers); - if (orderedOffers != null){ - return orderedOffers; - } - // fallback to default scheduler. - LOG.error("MagicMatch failed to schedule the task. Aurora uses the default scheduler."); - return offers; - } - PluginConfig plugin = null; - // getOffersFromPlugin gets the offers from MagicMatch. - private List getOffersFromPlugin(ResourceRequest resourceRequest, Iterable offers){ - if (plugin==null){ - plugin = new PluginConfig(); - } - Gson gson = new Gson(); - List orderedOffers = new ArrayList(); - Map offerMap = new HashMap(); - for(HostOffer offer: offers) { - offerMap.put(offer.getAttributes().getHost(), offer); - } - // send the Rest API request to the scheduler plugin - URL url = null; - try { - url = new URL(plugin.getEndpoint() + "/v1/offerset"); - } catch (MalformedURLException e) { - LOG.error(e.toString()); - return null; - } - // create json request - ScheduleRequest scheduleRequest = new ScheduleRequest(); - scheduleRequest.request.cpu = resourceRequest.getResourceBag().valueOf(ResourceType.CPUS); - scheduleRequest.request.memory = resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB); - scheduleRequest.request.disk = resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB); - scheduleRequest.hosts = new Host[Iterables.size(offers)]; - int i =0; - for (HostOffer offer:offers) { - scheduleRequest.hosts[i] = new Host(); - scheduleRequest.hosts[i].name = offer.getAttributes().getHost(); - double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + - offer.getResourceBag(false).valueOf(ResourceType.CPUS); - double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + - offer.getResourceBag(false).valueOf(ResourceType.RAM_MB) ; - double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + - offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); - scheduleRequest.hosts[i].offer = new Resource(cpu, memory, disk); - i++; - } - - // create connection - HttpURLConnection con = null; - try { - con = (HttpURLConnection)url.openConnection(); - con.setRequestMethod("POST"); - con.setRequestProperty("Content-Type", "application/json; utf-8"); - con.setRequestProperty("Accept", "application/json"); - con.setDoOutput(true); - } catch (ProtocolException pe) { - LOG.error(pe.toString()); - return null; - } catch (IOException ioe) { - LOG.error(ioe.toString()); - return null; - } - String jsonStr = gson.toJson(scheduleRequest); - LOG.debug("request to plugin: "+jsonStr); - try(OutputStream os = con.getOutputStream()) { - byte[] input = jsonStr.getBytes("utf-8"); - os.write(input, 0, input.length); - } catch (UnsupportedEncodingException uee) { - LOG.error(uee.toString()); - return null; - } catch (IOException ioe){ - LOG.error(ioe.toString()); - return null; - } - // read response - StringBuilder response = new StringBuilder(); - try{ - BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), "utf-8")); - String responseLine = null; - while ((responseLine = br.readLine()) != null) { - response.append(responseLine.trim()); - } - } catch (UnsupportedEncodingException uee){ - LOG.error(uee.toString()); - return null; - } catch (IOException ioe) { - LOG.error(ioe.toString()); - return null; - } - ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); - LOG.debug("plugin response: "+response.toString()); - // process the scheduleResult - if (scheduleResponse.hosts!=null){ - for(String host: scheduleResponse.hosts) { - HostOffer offer = offerMap.get(host); - if (offer!=null) { - orderedOffers.add(offer); - } else { - LOG.error("Cannot find this host "+host+" in "+offerMap.size()+" offers"); - } - } - } - return orderedOffers; - } -} \ No newline at end of file diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java b/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java deleted file mode 100644 index bda2a0f1f..000000000 --- a/src/main/java/com/paypal/aurora/scheduler/offers/OfferSetModule.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.paypal.aurora.scheduler.offers; - -import javax.inject.Singleton; - -import com.google.common.collect.Ordering; -import com.google.inject.AbstractModule; -import com.google.inject.PrivateModule; - -import com.google.inject.TypeLiteral; - -import org.apache.aurora.scheduler.config.CliOptions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.aurora.scheduler.offers.HostOffer; -import org.apache.aurora.scheduler.offers.OfferOrderBuilder; -import org.apache.aurora.scheduler.offers.OfferSet; - -public class OfferSetModule extends AbstractModule { - private final CliOptions options; - private static final Logger LOG = LoggerFactory.getLogger(OfferSetModule.class); - - public OfferSetModule(CliOptions options) { - this.options = options; - } - - @Override - protected void configure() { - LOG.info("PayPal Offer Set module Enabled."); - install(new PrivateModule() { - @Override - protected void configure() { - bind(new TypeLiteral>() { }) - .toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); - bind(OfferSetImpl.class).in(Singleton.class); - bind(OfferSet.class).to(OfferSetImpl.class); - expose(OfferSet.class); - } - }); - } -} diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java b/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java index db750acf2..b3aae0874 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java +++ b/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java @@ -1,20 +1,31 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.paypal.aurora.scheduler.offers; -import com.paypal.aurora.scheduler.model.Config; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + import com.google.gson.Gson; -import org.apache.commons.io.IOUtils; + +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; - public class PluginConfig { private static final Logger LOG = LoggerFactory.getLogger(PluginConfig.class); private String endpoint = "http://localhost:9090"; - private String configFile = "/etc/aurora-plugin.json"; /* aurora-plugin.json file: { @@ -23,32 +34,33 @@ public class PluginConfig { } */ public PluginConfig() { + final String configFile = "/etc/magicmatch/aurora-plugin.json"; // load file - FileInputStream targetFile = null; + String jsonStr = null; try { - targetFile = new FileInputStream(new File(this.configFile)); - } catch (FileNotFoundException e) { - LOG.error("Cannot load "+this.configFile); - LOG.error(e.toString()); + jsonStr = FileUtils.readFileToString(new File(configFile), StandardCharsets.UTF_8); + } catch (IOException io) { + LOG.error("Cannot load " + configFile + "\n " + io.toString()); } - String jsonStr = ""; - try { - jsonStr = IOUtils.toString(targetFile, "UTF-8"); - } catch (IOException io){ - LOG.error("Cannot load "+this.configFile); - LOG.error(io.toString()); - } - if (!jsonStr.equals("")){ + if (jsonStr == null || "".equals(jsonStr)) { + LOG.error(configFile + " is empty"); + } else { Config config = new Gson().fromJson(jsonStr, Config.class); - if (config!=null) { - this.endpoint = "http://"+config.host+":"+config.port; + if (config == null) { + LOG.error(configFile + " is invalid."); + } else { + this.endpoint = "http://" + config.host + ":" + config.port; } - } else { - LOG.error(this.configFile+" is empty"); } } - + public String getEndpoint() { return this.endpoint; } + + // for parsing json config file. + static class Config { + String host; + int port; + } } From e0395a10162c5e4b0ea255d0984c7d32ac5ef467 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Thu, 22 Oct 2020 14:52:10 -0700 Subject: [PATCH 04/24] resolves comments from reviewers --- .../scheduler/offers/HttpOfferSetImpl.java} | 102 +++++++++++++----- .../scheduler/offers/HttpOfferSetModule.java} | 10 +- .../scheduler/offers/HttpPluginConfig.java} | 36 +++++-- 3 files changed, 107 insertions(+), 41 deletions(-) rename src/main/java/{com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java => io/github/aurora/scheduler/offers/HttpOfferSetImpl.java} (77%) rename src/main/java/{com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java => io/github/aurora/scheduler/offers/HttpOfferSetModule.java} (81%) rename src/main/java/{com/paypal/aurora/scheduler/offers/PluginConfig.java => io/github/aurora/scheduler/offers/HttpPluginConfig.java} (63%) diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java similarity index 77% rename from src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java rename to src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 749ae3d14..472d9e24d 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.paypal.aurora.scheduler.offers; +package io.github.aurora.scheduler.offers; import java.io.IOException; import java.io.OutputStream; @@ -43,17 +43,23 @@ * Implementation for OfferSet. */ @VisibleForTesting -public class MagicMatchOfferSetImpl implements OfferSet { +public class HttpOfferSetImpl implements OfferSet { private final Set offers; - private static final Logger LOG = LoggerFactory.getLogger(MagicMatchOfferSetImpl.class); - private PluginConfig plugin = null; + private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); + private HttpPluginConfig plugin = null; private final Gson gson; + private long numOfTasks; + private long totalSchedTime; + private long currTotalSchedTime; + private long worstSchedTime; + private long currWorstSchedTime; @Inject - public MagicMatchOfferSetImpl(Ordering ordering) { + public HttpOfferSetImpl(Ordering ordering) { offers = new ConcurrentSkipListSet<>(ordering); gson = new Gson(); + plugin = new HttpPluginConfig(); } @Override @@ -84,22 +90,67 @@ public Iterable values() { return offers; } + // monitor prints the scheduling time statistics + private void monitor(long startTime) { + numOfTasks++; + long timeElapsed = System.nanoTime() - startTime; + totalSchedTime += timeElapsed; + if (worstSchedTime < timeElapsed) { + worstSchedTime = timeElapsed; + } + if (numOfTasks == plugin.getLogStepInTaskNum()) { + //numOfTasks,currTotalSchedTime,currWorstSchedTime,totalSchedTime,worstSchedTime + String msg = numOfTasks + "," + currTotalSchedTime + "," + currWorstSchedTime + "," + + totalSchedTime + "," + worstSchedTime; + LOG.info(msg); + numOfTasks = 0; + currTotalSchedTime = 0; + currWorstSchedTime = 0; + } + } + @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + if (plugin == null) { + plugin = new HttpPluginConfig(); + } + long current = System.nanoTime(); List orderedOffers = getOffersFromPlugin(resourceRequest); + if (plugin.isDebug()) { + this.monitor(current); + } if (orderedOffers != null) { return orderedOffers; } // fall back to default scheduler. - LOG.warn("MagicMatch failed to schedule the task. Falling back on default ordering."); + LOG.warn("Failed to schedule the task. Falling back on default ordering."); return offers; } + //createScheduleRequest creates the ScheduleRequest to be sent out to the plugin. + private ScheduleRequest createScheduleRequest(ResourceRequest resourceRequest) { + Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), + resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), + resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); + Host[] hosts = new Host[Iterables.size(offers)]; + int i = 0; + for (HostOffer offer : offers) { + hosts[i] = new Host(); + hosts[i].name = offer.getAttributes().getHost(); + double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS); + double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); + double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); + hosts[i].offer = new Resource(cpu, memory, disk); + i++; + } + return new ScheduleRequest(req, hosts); + } + // getOffersFromPlugin gets the offers from MagicMatch. private List getOffersFromPlugin(ResourceRequest resourceRequest) { - if (plugin == null) { - plugin = new PluginConfig(); - } List orderedOffers = new ArrayList<>(); Map offerMap = new HashMap<>(); for (HostOffer offer : offers) { @@ -115,24 +166,7 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { } // create json request - Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), - resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), - resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); - Host[] hosts = new Host[Iterables.size(offers)]; - int i = 0; - for (HostOffer offer : offers) { - hosts[i] = new Host(); - hosts[i].name = offer.getAttributes().getHost(); - double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) - + offer.getResourceBag(false).valueOf(ResourceType.CPUS); - double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) - + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); - double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) - + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); - hosts[i].offer = new Resource(cpu, memory, disk); - i++; - } - ScheduleRequest scheduleRequest = new ScheduleRequest(req, hosts); + ScheduleRequest scheduleRequest = createScheduleRequest(resourceRequest); LOG.debug(scheduleRequest.toString()); // create connection @@ -178,8 +212,10 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); LOG.debug("plugin response: " + response.toString()); - // process the scheduleResult + // process the scheduleResponse if (scheduleResponse.error.equals("") && scheduleResponse.hosts != null) { + StringBuffer offersStr = new StringBuffer(); + int c = 0; for (String host : scheduleResponse.hosts) { HostOffer offer = offerMap.get(host); if (offer == null) { @@ -187,10 +223,18 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { } else { orderedOffers.add(offer); } + if (c < 5) { + offersStr.append(host + ","); + c++; + } + } + if (scheduleResponse.hosts.length > 0) { + offersStr.append("..."); + LOG.info("Sorted offers: " + offersStr.toString()); } return orderedOffers; } - LOG.error("Unable to get sorted offers from MagicMatch due to " + scheduleResponse.error); + LOG.error("Unable to get sorted offers due to " + scheduleResponse.error); return null; } diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java similarity index 81% rename from src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java rename to src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 66b28fca1..403b25f0b 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/MagicMatchOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.paypal.aurora.scheduler.offers; +package io.github.aurora.scheduler.offers; import javax.inject.Singleton; @@ -27,11 +27,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MagicMatchOfferSetModule extends AbstractModule { +public class HttpOfferSetModule extends AbstractModule { private final CliOptions options; - private static final Logger LOG = LoggerFactory.getLogger(MagicMatchOfferSetModule.class); + private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); - public MagicMatchOfferSetModule(CliOptions options) { + public HttpOfferSetModule(CliOptions options) { this.options = options; } @@ -43,7 +43,7 @@ protected void configure() { protected void configure() { bind(new TypeLiteral>() { }).toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); - bind(OfferSet.class).to(MagicMatchOfferSetImpl.class).in(Singleton.class); + bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } }); diff --git a/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java similarity index 63% rename from src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java rename to src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java index b3aae0874..a96363572 100644 --- a/src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.paypal.aurora.scheduler.offers; +package io.github.aurora.scheduler.offers; import java.io.File; import java.io.IOException; @@ -23,18 +23,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PluginConfig { - private static final Logger LOG = LoggerFactory.getLogger(PluginConfig.class); +public class HttpPluginConfig { + private static final Logger LOG = LoggerFactory.getLogger(HttpPluginConfig.class); private String endpoint = "http://localhost:9090"; + private Config config; + private static final int DEFAULT_LOG_STEP = 1000; /* aurora-plugin.json file: { "host": "localhost", - "port": 9090 + "port": 9090, + "debug": true, + "logStepInTaskNum": 100 } */ - public PluginConfig() { - final String configFile = "/etc/magicmatch/aurora-plugin.json"; + public HttpPluginConfig() { + final String configFile = "/etc/aurora-scheduler/http-endpoint.json"; // load file String jsonStr = null; try { @@ -45,11 +49,13 @@ public PluginConfig() { if (jsonStr == null || "".equals(jsonStr)) { LOG.error(configFile + " is empty"); } else { - Config config = new Gson().fromJson(jsonStr, Config.class); + config = new Gson().fromJson(jsonStr, Config.class); if (config == null) { LOG.error(configFile + " is invalid."); } else { this.endpoint = "http://" + config.host + ":" + config.port; + LOG.info("Aurora-scheduler uses HttpOfferSet for scheduling at " + + this.endpoint); } } } @@ -58,9 +64,25 @@ public String getEndpoint() { return this.endpoint; } + public boolean isDebug() { + if (config != null) { + return this.config.debug; + } + return false; + } + + public int getLogStepInTaskNum() { + if (config != null) { + return this.config.logStepInTaskNum; + } + return DEFAULT_LOG_STEP; + } + // for parsing json config file. static class Config { String host; int port; + boolean debug = false; + int logStepInTaskNum = DEFAULT_LOG_STEP; } } From 3250902f47bdf41604f2282aa6d321fd18675b4f Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Thu, 22 Oct 2020 14:54:23 -0700 Subject: [PATCH 05/24] resolves comments from reviewers --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 472d9e24d..2beacf806 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -111,9 +111,6 @@ private void monitor(long startTime) { @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - if (plugin == null) { - plugin = new HttpPluginConfig(); - } long current = System.nanoTime(); List orderedOffers = getOffersFromPlugin(resourceRequest); if (plugin.isDebug()) { From a67155f812be02eee0a827f6904263c5cff065f1 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Thu, 22 Oct 2020 15:19:37 -0700 Subject: [PATCH 06/24] resolve compiling error --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 2beacf806..e8a690865 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -47,7 +47,7 @@ public class HttpOfferSetImpl implements OfferSet { private final Set offers; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); - private HttpPluginConfig plugin = null; + private final HttpPluginConfig plugin = new HttpPluginConfig(); private final Gson gson; private long numOfTasks; private long totalSchedTime; @@ -59,7 +59,6 @@ public class HttpOfferSetImpl implements OfferSet { public HttpOfferSetImpl(Ordering ordering) { offers = new ConcurrentSkipListSet<>(ordering); gson = new Gson(); - plugin = new HttpPluginConfig(); } @Override From c0d168f21e2319685ff44c054e7e019fd5500da5 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Mon, 26 Oct 2020 09:57:19 -0700 Subject: [PATCH 07/24] update configuration --- .../scheduler/offers/HttpOfferSetImpl.java | 42 +++++++-------- .../scheduler/offers/HttpPluginConfig.java | 54 ++++++++++--------- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index e8a690865..35b431d08 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -19,7 +19,6 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.ProtocolException; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; @@ -45,20 +44,25 @@ @VisibleForTesting public class HttpOfferSetImpl implements OfferSet { - private final Set offers; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); - private final HttpPluginConfig plugin = new HttpPluginConfig(); + private final Set offers; private final Gson gson; private long numOfTasks; private long totalSchedTime; private long currTotalSchedTime; private long worstSchedTime; private long currWorstSchedTime; + private HttpPluginConfig plugin; @Inject public HttpOfferSetImpl(Ordering ordering) { offers = new ConcurrentSkipListSet<>(ordering); gson = new Gson(); + try { + plugin = new HttpPluginConfig(); + } catch (MalformedURLException e) { + LOG.error("URL of Config Plugin is malformed.\n" + e); + } } @Override @@ -98,7 +102,6 @@ private void monitor(long startTime) { worstSchedTime = timeElapsed; } if (numOfTasks == plugin.getLogStepInTaskNum()) { - //numOfTasks,currTotalSchedTime,currWorstSchedTime,totalSchedTime,worstSchedTime String msg = numOfTasks + "," + currTotalSchedTime + "," + currWorstSchedTime + "," + totalSchedTime + "," + worstSchedTime; LOG.info(msg); @@ -110,13 +113,17 @@ private void monitor(long startTime) { @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - long current = System.nanoTime(); - List orderedOffers = getOffersFromPlugin(resourceRequest); - if (plugin.isDebug()) { - this.monitor(current); - } - if (orderedOffers != null) { - return orderedOffers; + if (plugin != null) { + long current = System.nanoTime(); + List orderedOffers = getOffersFromPlugin(resourceRequest); + if (plugin.isDebug()) { + this.monitor(current); + } + if (orderedOffers == null) { + LOG.warn("Unable to get orderedOffers from the external plugin."); + } else { + return orderedOffers; + } } // fall back to default scheduler. LOG.warn("Failed to schedule the task. Falling back on default ordering."); @@ -152,23 +159,14 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { for (HostOffer offer : offers) { offerMap.put(offer.getAttributes().getHost(), offer); } - // send the Rest API request to the scheduler plugin - URL url; - try { - url = new URL(plugin.getEndpoint() + "/v1/offerset"); - } catch (MalformedURLException e) { - LOG.error(e.toString()); - return null; - } - - // create json request + // create json request & send the Rest API request to the scheduler plugin ScheduleRequest scheduleRequest = createScheduleRequest(resourceRequest); LOG.debug(scheduleRequest.toString()); // create connection HttpURLConnection con; try { - con = (HttpURLConnection) url.openConnection(); + con = (HttpURLConnection) plugin.getUrl().openConnection(); con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json; utf-8"); con.setRequestProperty("Accept", "application/json"); diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java index a96363572..9593a3288 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java @@ -15,6 +15,8 @@ import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import com.google.gson.Gson; @@ -23,45 +25,50 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * HttpPluginConfig is used to represent the configuration this HttpOfferSetModule. + * It has the host & port of the external scheduling unit that this plugin communicates with. + * It loads the configuration from a file. An file example is as follows. + * http-plugin.json file: + * { + * "url": "http://localhost:9090/v1/offerset", + * "debug": true, + * "logStepInTaskNum": 100 + * } + */ public class HttpPluginConfig { private static final Logger LOG = LoggerFactory.getLogger(HttpPluginConfig.class); - private String endpoint = "http://localhost:9090"; - private Config config; private static final int DEFAULT_LOG_STEP = 1000; - /* - aurora-plugin.json file: - { - "host": "localhost", - "port": 9090, - "debug": true, - "logStepInTaskNum": 100 - } - */ - public HttpPluginConfig() { - final String configFile = "/etc/aurora-scheduler/http-endpoint.json"; + + public static final String CONFIG_FILE = "/etc/aurora-scheduler/http-endpoint.json"; + + private URL url; + private Config config; + + public HttpPluginConfig() throws MalformedURLException { // load file String jsonStr = null; try { - jsonStr = FileUtils.readFileToString(new File(configFile), StandardCharsets.UTF_8); + jsonStr = FileUtils.readFileToString(new File(CONFIG_FILE), StandardCharsets.UTF_8); } catch (IOException io) { - LOG.error("Cannot load " + configFile + "\n " + io.toString()); + LOG.error("Cannot load " + CONFIG_FILE + "\n " + io.toString()); } if (jsonStr == null || "".equals(jsonStr)) { - LOG.error(configFile + " is empty"); + LOG.error(CONFIG_FILE + " is empty"); } else { config = new Gson().fromJson(jsonStr, Config.class); if (config == null) { - LOG.error(configFile + " is invalid."); + LOG.error(CONFIG_FILE + " is invalid."); } else { - this.endpoint = "http://" + config.host + ":" + config.port; - LOG.info("Aurora-scheduler uses HttpOfferSet for scheduling at " - + this.endpoint); + this.url = new URL(config.url); + LOG.info("Aurora-scheduler uses HttpOfferSet for scheduling via uri:" + + this.url); } } } - public String getEndpoint() { - return this.endpoint; + public URL getUrl() { + return this.url; } public boolean isDebug() { @@ -80,8 +87,7 @@ public int getLogStepInTaskNum() { // for parsing json config file. static class Config { - String host; - int port; + String url; boolean debug = false; int logStepInTaskNum = DEFAULT_LOG_STEP; } From 71b0ce2cc58057c5615c70646638b2ff7424365e Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Mon, 26 Oct 2020 14:22:45 -0700 Subject: [PATCH 08/24] change error messages --- .../aurora/scheduler/offers/HttpOfferSetImpl.java | 14 +++++++------- .../scheduler/offers/HttpOfferSetModule.java | 2 +- .../aurora/scheduler/offers/HttpPluginConfig.java | 3 +-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 35b431d08..ad38e4096 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -61,7 +61,7 @@ public HttpOfferSetImpl(Ordering ordering) { try { plugin = new HttpPluginConfig(); } catch (MalformedURLException e) { - LOG.error("URL of Config Plugin is malformed.\n" + e); + LOG.error("URL of Config Plugin is malformed.", e); } } @@ -172,10 +172,10 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { con.setRequestProperty("Accept", "application/json"); con.setDoOutput(true); } catch (ProtocolException pe) { - LOG.error("The HTTP protocol was not setup correctly. \n" + pe.toString()); + LOG.error("The HTTP protocol was not setup correctly.", pe); return null; } catch (IOException ioe) { - LOG.error("Unable to open HTTP connection. \n" + ioe.toString()); + LOG.error("Unable to open HTTP connection.", ioe); return null; } String jsonStr = gson.toJson(scheduleRequest); @@ -184,10 +184,10 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { byte[] input = jsonStr.getBytes(StandardCharsets.UTF_8); os.write(input, 0, input.length); } catch (UnsupportedEncodingException uee) { - LOG.error("ScheduleRequest json is not valid.\n" + uee.toString()); + LOG.error("ScheduleRequest json is not valid. " + jsonStr, uee); return null; } catch (IOException ioe) { - LOG.error("Unable to send scheduleRequest to MagicMatch .\n" + ioe.toString()); + LOG.error("Unable to send scheduleRequest to http endpoint " + plugin.getUrl(), ioe); return null; } @@ -197,10 +197,10 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { String responseLine = IOUtils.toString(con.getInputStream(), StandardCharsets.UTF_8); response.append(responseLine.trim()); } catch (UnsupportedEncodingException uee) { - LOG.error("MagicMatch response is not valid.\n" + uee.toString()); + LOG.error("Response is not valid.", uee); return null; } catch (IOException ioe) { - LOG.error("Unable to read the response from MagicMatch.\n" + ioe.toString()); + LOG.error("Unable to read the response from the http-plugin.", ioe); return null; } ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 403b25f0b..2ccb76dd9 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -37,7 +37,7 @@ public HttpOfferSetModule(CliOptions options) { @Override protected void configure() { - LOG.info("MagicMatch OfferSet Module Enabled."); + LOG.info("HttpOfferSetModule Enabled."); install(new PrivateModule() { @Override protected void configure() { diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java index 9593a3288..eb1bbb306 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java @@ -61,8 +61,7 @@ public HttpPluginConfig() throws MalformedURLException { LOG.error(CONFIG_FILE + " is invalid."); } else { this.url = new URL(config.url); - LOG.info("Aurora-scheduler uses HttpOfferSet for scheduling via uri:" - + this.url); + LOG.info("Aurora-scheduler uses HttpOfferSetModule " + this.url); } } } From 6bf4ce6fddab3d6451dcd1ed0328a530dcb1d46c Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Mon, 26 Oct 2020 15:34:00 -0700 Subject: [PATCH 09/24] resolve PR comments --- .../scheduler/offers/HttpOfferSetImpl.java | 39 +++++++------------ .../scheduler/offers/HttpPluginConfig.java | 11 +++++- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index ad38e4096..2eefa93e2 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.gson.Gson; import com.google.inject.Inject; @@ -135,19 +134,18 @@ private ScheduleRequest createScheduleRequest(ResourceRequest resourceRequest) { Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); - Host[] hosts = new Host[Iterables.size(offers)]; - int i = 0; + List hosts = new LinkedList<>(); for (HostOffer offer : offers) { - hosts[i] = new Host(); - hosts[i].name = offer.getAttributes().getHost(); + Host host = new Host(); + host.name = offer.getAttributes().getHost(); double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + offer.getResourceBag(false).valueOf(ResourceType.CPUS); double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); - hosts[i].offer = new Resource(cpu, memory, disk); - i++; + host.offer = new Resource(cpu, memory, disk); + hosts.add(host); } return new ScheduleRequest(req, hosts); } @@ -167,6 +165,8 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { HttpURLConnection con; try { con = (HttpURLConnection) plugin.getUrl().openConnection(); + con.setConnectTimeout(plugin.getTimeout()); + con.setReadTimeout(plugin.getTimeout()); con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json; utf-8"); con.setRequestProperty("Accept", "application/json"); @@ -183,9 +183,6 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { try (OutputStream os = con.getOutputStream()) { byte[] input = jsonStr.getBytes(StandardCharsets.UTF_8); os.write(input, 0, input.length); - } catch (UnsupportedEncodingException uee) { - LOG.error("ScheduleRequest json is not valid. " + jsonStr, uee); - return null; } catch (IOException ioe) { LOG.error("Unable to send scheduleRequest to http endpoint " + plugin.getUrl(), ioe); return null; @@ -208,8 +205,6 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { // process the scheduleResponse if (scheduleResponse.error.equals("") && scheduleResponse.hosts != null) { - StringBuffer offersStr = new StringBuffer(); - int c = 0; for (String host : scheduleResponse.hosts) { HostOffer offer = offerMap.get(host); if (offer == null) { @@ -217,15 +212,9 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { } else { orderedOffers.add(offer); } - if (c < 5) { - offersStr.append(host + ","); - c++; - } - } - if (scheduleResponse.hosts.length > 0) { - offersStr.append("..."); - LOG.info("Sorted offers: " + offersStr.toString()); } + LOG.info("Sorted offers: " + String.join(",", + scheduleResponse.hosts.subList(0, Math.min(5, scheduleResponse.hosts.size())) + "...")); return orderedOffers; } LOG.error("Unable to get sorted offers due to " + scheduleResponse.error); @@ -264,28 +253,28 @@ public String toString() { // ScheduleRequest is the request sent to MagicMatch. static class ScheduleRequest { Resource request; - Host[] hosts; + List hosts; - ScheduleRequest(Resource request, Host... hosts) { + ScheduleRequest(Resource request, List hosts) { this.request = request; this.hosts = hosts; } @Override public String toString() { - return "ScheduleRequest{" + "request=" + request + ", hosts=" + Arrays.toString(hosts) + '}'; + return "ScheduleRequest{" + "request=" + request + ", hosts=" + hosts + '}'; } } // ScheduleResponse is the scheduling result responded by MagicMatch static class ScheduleResponse { String error; - String[] hosts; + List hosts; @Override public String toString() { return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" - + Arrays.toString(hosts) + '}'; + + hosts + '}'; } } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java index eb1bbb306..23dbc4571 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java @@ -44,6 +44,7 @@ public class HttpPluginConfig { private URL url; private Config config; + private int timeoutSeconds; public HttpPluginConfig() throws MalformedURLException { // load file @@ -61,7 +62,9 @@ public HttpPluginConfig() throws MalformedURLException { LOG.error(CONFIG_FILE + " is invalid."); } else { this.url = new URL(config.url); - LOG.info("Aurora-scheduler uses HttpOfferSetModule " + this.url); + this.timeoutSeconds = config.timeoutSeconds; + LOG.info("HttpOfferSetModule url: " + this.url + + ", timeout (seconds): " + this.timeoutSeconds + "\n"); } } } @@ -70,6 +73,11 @@ public URL getUrl() { return this.url; } + // getTimeout returns timeout in milliseconds. + public int getTimeout() { + return this.timeoutSeconds * 1000; + } + public boolean isDebug() { if (config != null) { return this.config.debug; @@ -89,5 +97,6 @@ static class Config { String url; boolean debug = false; int logStepInTaskNum = DEFAULT_LOG_STEP; + int timeoutSeconds; } } From 06a162dbaec3c08f328af8218d66d8f52ea12d94 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Fri, 30 Oct 2020 08:55:47 -0700 Subject: [PATCH 10/24] change log messages --- .../scheduler/offers/HttpOfferSetImpl.java | 13 ++++++++----- .../scheduler/offers/HttpPluginConfig.java | 16 +++++++++++----- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 2eefa93e2..1897397a9 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -97,14 +97,17 @@ private void monitor(long startTime) { numOfTasks++; long timeElapsed = System.nanoTime() - startTime; totalSchedTime += timeElapsed; + currTotalSchedTime += timeElapsed; if (worstSchedTime < timeElapsed) { worstSchedTime = timeElapsed; } - if (numOfTasks == plugin.getLogStepInTaskNum()) { + if (currWorstSchedTime getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - if (plugin != null) { + if (plugin != null && plugin.isReady()) { long current = System.nanoTime(); List orderedOffers = getOffersFromPlugin(resourceRequest); if (plugin.isDebug()) { @@ -208,12 +211,12 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { for (String host : scheduleResponse.hosts) { HostOffer offer = offerMap.get(host); if (offer == null) { - LOG.error("Cannot find this host " + host + " in " + offerMap.toString()); + LOG.warn("Cannot find host " + host + " in the response"); } else { orderedOffers.add(offer); } } - LOG.info("Sorted offers: " + String.join(",", + LOG.debug("Sorted offers: " + String.join(",", scheduleResponse.hosts.subList(0, Math.min(5, scheduleResponse.hosts.size())) + "...")); return orderedOffers; } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java index 23dbc4571..34ab45393 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java @@ -44,7 +44,8 @@ public class HttpPluginConfig { private URL url; private Config config; - private int timeoutSeconds; + private int timeoutMillisec; + private boolean ready; public HttpPluginConfig() throws MalformedURLException { // load file @@ -62,9 +63,10 @@ public HttpPluginConfig() throws MalformedURLException { LOG.error(CONFIG_FILE + " is invalid."); } else { this.url = new URL(config.url); - this.timeoutSeconds = config.timeoutSeconds; + this.timeoutMillisec = config.timeoutMillisec; + this.ready = true; LOG.info("HttpOfferSetModule url: " + this.url - + ", timeout (seconds): " + this.timeoutSeconds + "\n"); + + ", timeout (seconds): " + this.timeoutMillisec + "\n"); } } } @@ -73,9 +75,13 @@ public URL getUrl() { return this.url; } + public boolean isReady() { + return this.ready; + } + // getTimeout returns timeout in milliseconds. public int getTimeout() { - return this.timeoutSeconds * 1000; + return this.timeoutMillisec; } public boolean isDebug() { @@ -97,6 +103,6 @@ static class Config { String url; boolean debug = false; int logStepInTaskNum = DEFAULT_LOG_STEP; - int timeoutSeconds; + int timeoutMillisec; } } From 691dd746c62ef26559842d1db596127040837258 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Fri, 30 Oct 2020 09:21:41 -0700 Subject: [PATCH 11/24] fix compiling errors --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 1897397a9..86ee4843b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -101,10 +101,10 @@ private void monitor(long startTime) { if (worstSchedTime < timeElapsed) { worstSchedTime = timeElapsed; } - if (currWorstSchedTime Date: Wed, 18 Nov 2020 15:16:52 -0800 Subject: [PATCH 12/24] update configuration and add export stats to /vars --- .../scheduler/offers/HttpOfferSetImpl.java | 213 +++++++++++------- .../scheduler/offers/HttpOfferSetModule.java | 115 +++++++++- .../scheduler/offers/HttpOfferSetUtil.java | 77 +++++++ .../scheduler/offers/HttpPluginConfig.java | 108 --------- .../scheduler/offers/StatCalculator.java | 88 ++++++++ .../offers/HttpOfferSetImplTest.java | 116 ++++++++++ 6 files changed, 520 insertions(+), 197 deletions(-) create mode 100644 src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java delete mode 100644 src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java create mode 100644 src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java create mode 100644 src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 86ee4843b..25505a30b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -16,13 +16,18 @@ import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.ProtocolException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; +import javax.inject.Qualifier; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Ordering; import com.google.gson.Gson; @@ -33,10 +38,16 @@ import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferSet; import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * Implementation for OfferSet. */ @@ -46,22 +57,46 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; private final Gson gson; - private long numOfTasks; - private long totalSchedTime; - private long currTotalSchedTime; - private long worstSchedTime; - private long currWorstSchedTime; - private HttpPluginConfig plugin; + private Integer timeoutMs; + private URL endpoint; + private boolean enabled; + + public HttpOfferSetImpl() { + offers = new HashSet<>(); + gson = new Gson(); + } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface Endpoint { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface TimeoutMs { } @Inject - public HttpOfferSetImpl(Ordering ordering) { + public HttpOfferSetImpl(Ordering ordering, + @TimeoutMs Integer timeoutMs, + @Endpoint String url) { offers = new ConcurrentSkipListSet<>(ordering); gson = new Gson(); + enabled = true; try { - plugin = new HttpPluginConfig(); + endpoint = new URL(url); } catch (MalformedURLException e) { - LOG.error("URL of Config Plugin is malformed.", e); + LOG.error("http_offer_set_endpoint is malformed. ", e); + enabled = false; } + if (enabled) { + LOG.info("HttpOfferSetModule Enabled."); + } else { + LOG.info("HttpOfferSetModule Disabled."); + } + this.timeoutMs = timeoutMs; + LOG.info("HttpOfferSet's endpoint: " + this.endpoint); + LOG.info("HttpOfferSet's timeout: " + this.timeoutMs + " (ms)"); } @Override @@ -92,48 +127,39 @@ public Iterable values() { return offers; } - // monitor prints the scheduling time statistics - private void monitor(long startTime) { - numOfTasks++; - long timeElapsed = System.nanoTime() - startTime; - totalSchedTime += timeElapsed; - currTotalSchedTime += timeElapsed; - if (worstSchedTime < timeElapsed) { - worstSchedTime = timeElapsed; - } - if (currWorstSchedTime < timeElapsed) { - currWorstSchedTime = timeElapsed; - } - if (numOfTasks % plugin.getLogStepInTaskNum() == 0) { - String msg = numOfTasks + "," + currTotalSchedTime + "," + currWorstSchedTime + "," - + totalSchedTime + "," + worstSchedTime; - LOG.info(msg); - currTotalSchedTime = 0; - currWorstSchedTime = 0; - } - } - @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { - if (plugin != null && plugin.isReady()) { - long current = System.nanoTime(); - List orderedOffers = getOffersFromPlugin(resourceRequest); - if (plugin.isDebug()) { - this.monitor(current); - } - if (orderedOffers == null) { - LOG.warn("Unable to get orderedOffers from the external plugin."); - } else { - return orderedOffers; - } + // if there are no available offers, do nothing. + if (offers.isEmpty() || !this.enabled) { + return offers; + } + + List orderedOffers = null; + try { + long startTime = System.nanoTime(); + // create json request & send the Rest API request to the scheduler plugin + ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); + LOG.info("Sending request " + scheduleRequest.jobKey); + String responseStr = this.sendRequest(scheduleRequest); + orderedOffers = processResponse(responseStr); + LOG.info("received response for " + scheduleRequest.jobKey); + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + } catch (IOException e) { + LOG.error("Failed to schedule the task of " + + resourceRequest.getTask().getJob().toString() + + " using HttpOfferSet. ", e); + HttpOfferSetModule.incFailureCount(); + } + if (orderedOffers != null) { + return orderedOffers; } // fall back to default scheduler. - LOG.warn("Failed to schedule the task. Falling back on default ordering."); + LOG.warn("Falling back on default ordering."); return offers; } //createScheduleRequest creates the ScheduleRequest to be sent out to the plugin. - private ScheduleRequest createScheduleRequest(ResourceRequest resourceRequest) { + private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) { Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); @@ -150,36 +176,31 @@ private ScheduleRequest createScheduleRequest(ResourceRequest resourceRequest) { host.offer = new Resource(cpu, memory, disk); hosts.add(host); } - return new ScheduleRequest(req, hosts); + IJobKey jobKey = resourceRequest.getTask().getJob(); + String jobKeyStr = jobKey.getRole() + "-" + jobKey.getEnvironment() + "-" + jobKey.getName() + + "@" + startTime; + return new ScheduleRequest(req, hosts, jobKeyStr); } - // getOffersFromPlugin gets the offers from MagicMatch. - private List getOffersFromPlugin(ResourceRequest resourceRequest) { - List orderedOffers = new ArrayList<>(); - Map offerMap = new HashMap<>(); - for (HostOffer offer : offers) { - offerMap.put(offer.getAttributes().getHost(), offer); - } - // create json request & send the Rest API request to the scheduler plugin - ScheduleRequest scheduleRequest = createScheduleRequest(resourceRequest); - LOG.debug(scheduleRequest.toString()); - + // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. + private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { + LOG.debug("Sending request for " + scheduleRequest.toString()); // create connection HttpURLConnection con; try { - con = (HttpURLConnection) plugin.getUrl().openConnection(); - con.setConnectTimeout(plugin.getTimeout()); - con.setReadTimeout(plugin.getTimeout()); + con = (HttpURLConnection) this.endpoint.openConnection(); + con.setConnectTimeout(this.timeoutMs); + con.setReadTimeout(this.timeoutMs); con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json; utf-8"); con.setRequestProperty("Accept", "application/json"); con.setDoOutput(true); } catch (ProtocolException pe) { - LOG.error("The HTTP protocol was not setup correctly.", pe); - return null; + LOG.error("The HTTP protocol was not setup correctly."); + throw pe; } catch (IOException ioe) { - LOG.error("Unable to open HTTP connection.", ioe); - return null; + LOG.error("Unable to open HTTP connection."); + throw ioe; } String jsonStr = gson.toJson(scheduleRequest); LOG.debug("request to plugin: " + jsonStr); @@ -187,8 +208,9 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { byte[] input = jsonStr.getBytes(StandardCharsets.UTF_8); os.write(input, 0, input.length); } catch (IOException ioe) { - LOG.error("Unable to send scheduleRequest to http endpoint " + plugin.getUrl(), ioe); - return null; + LOG.error("Unable to send scheduleRequest to http endpoint " + + this.endpoint, ioe); + throw ioe; } // read response @@ -198,30 +220,49 @@ private List getOffersFromPlugin(ResourceRequest resourceRequest) { response.append(responseLine.trim()); } catch (UnsupportedEncodingException uee) { LOG.error("Response is not valid.", uee); - return null; + throw uee; } catch (IOException ioe) { LOG.error("Unable to read the response from the http-plugin.", ioe); - return null; + throw ioe; } - ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class); - LOG.debug("plugin response: " + response.toString()); - - // process the scheduleResponse - if (scheduleResponse.error.equals("") && scheduleResponse.hosts != null) { - for (String host : scheduleResponse.hosts) { - HostOffer offer = offerMap.get(host); - if (offer == null) { - LOG.warn("Cannot find host " + host + " in the response"); - } else { - orderedOffers.add(offer); + return response.toString(); + } + + List processResponse(String responseStr) throws IOException { + // process the response + ScheduleResponse response = gson.fromJson(responseStr, ScheduleResponse.class); + LOG.debug("Response: " + responseStr); + Map offerMap = new HashMap<>(); + for (HostOffer offer : offers) { + offerMap.put(offer.getAttributes().getHost(), offer); + } + List orderedOffers = new ArrayList<>(); + if (response.error.trim().isEmpty()) { + if (response.hosts == null) { + LOG.error("Get no offers from the HttpOfferSet endpoint."); + throw new IOException(); + } else { + for (String host : response.hosts) { + HostOffer offer = offerMap.get(host); + if (offer == null) { + LOG.warn("Cannot find host " + host + " in the response"); + } else { + orderedOffers.add(offer); + } } + LOG.debug("Sorted offers: " + String.join(",", + response.hosts.subList(0, Math.min(5, response.hosts.size())) + "...")); + } + if (orderedOffers.isEmpty()) { + LOG.warn("Cannot find any offers for this task. " + + "Please check the condition of these hosts: " + + HttpOfferSetUtil.getHostnames(offers)); } - LOG.debug("Sorted offers: " + String.join(",", - scheduleResponse.hosts.subList(0, Math.min(5, scheduleResponse.hosts.size())) + "...")); return orderedOffers; + } else { + LOG.error("Unable to get sorted offers due to " + response.error); + throw new IOException(response.error); } - LOG.error("Unable to get sorted offers due to " + scheduleResponse.error); - return null; } // Host represents for each host offer. @@ -255,17 +296,20 @@ public String toString() { // ScheduleRequest is the request sent to MagicMatch. static class ScheduleRequest { + String jobKey; Resource request; List hosts; - ScheduleRequest(Resource request, List hosts) { + ScheduleRequest(Resource request, List hosts, String jobKey) { this.request = request; this.hosts = hosts; + this.jobKey = jobKey; } @Override public String toString() { - return "ScheduleRequest{" + "request=" + request + ", hosts=" + hosts + '}'; + return "ScheduleRequest{" + "jobKey=" + jobKey + "request=" + request + + ", hosts=" + hosts + '}'; } } @@ -276,8 +320,7 @@ static class ScheduleResponse { @Override public String toString() { - return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" - + hosts + '}'; + return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" + hosts + '}'; } } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 2ccb76dd9..382c6d69c 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -13,39 +13,146 @@ */ package io.github.aurora.scheduler.offers; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.inject.Qualifier; import javax.inject.Singleton; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.config.CommandLine; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferOrderBuilder; import org.apache.aurora.scheduler.offers.OfferSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + public class HttpOfferSetModule extends AbstractModule { - private final CliOptions options; + private final CliOptions cliOptions; + private final Options options; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); + static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); + private static long failureCount = 0; + + public static synchronized void incFailureCount() { + HttpOfferSetModule.failureCount++; + } + + public static synchronized long getFailureCount() { + return HttpOfferSetModule.failureCount; + } + + public static synchronized void resetFailureCount() { + HttpOfferSetModule.failureCount = 0; + } + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-http_offer_set_endpoint") + String httpOfferSetEndpoint = "http://localhost:9092/v1/offerset"; + + @Parameter(names = "-http_offer_set_timeout_ms") + int httpOfferSetTimeoutMs = 100; + } + + static { + // Statically register custom options for CLI parsing. + CommandLine.registerCustomOptions(new Options()); + } public HttpOfferSetModule(CliOptions options) { - this.options = options; + this.cliOptions = options; + this.options = options.getCustom(Options.class); } @Override protected void configure() { - LOG.info("HttpOfferSetModule Enabled."); + install(new PrivateModule() { @Override protected void configure() { bind(new TypeLiteral>() { - }).toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); + }).toInstance(OfferOrderBuilder.create(cliOptions.offer.offerOrder)); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) + .toInstance(options.httpOfferSetTimeoutMs); + bind(String.class) + .annotatedWith(HttpOfferSetImpl.Endpoint.class) + .toInstance(options.httpOfferSetEndpoint); bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } }); + + bind(StatCalculator.class).in(com.google.inject.Singleton.class); + bind(ScheduledExecutorService.class) + .annotatedWith(Executor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); + bind(Integer.class) + .annotatedWith(RefreshRateMs.class) + .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS).intValue()); + bind(StatUpdater.class).in(com.google.inject.Singleton.class); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(StatUpdater.class); + } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface Executor { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface RefreshRateMs { } + + static class StatUpdater extends AbstractIdleService { + private final ScheduledExecutorService executor; + private final StatCalculator calculator; + private final Integer refreshRateMs; + + @Inject + StatUpdater( + @Executor ScheduledExecutorService executor, + StatCalculator calculator, + @RefreshRateMs Integer refreshRateMs) { + this.executor = requireNonNull(executor); + this.calculator = requireNonNull(calculator); + this.refreshRateMs = refreshRateMs; + } + + @Override + protected void startUp() { + long interval = this.refreshRateMs; + executor.scheduleAtFixedRate(calculator, 0, interval, TimeUnit.MILLISECONDS); + } + + @Override + protected void shutDown() { + // Ignored. VM shutdown is required to stop computing. + } } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java new file mode 100644 index 000000000..682fbda56 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.math.Quantiles; + +import org.apache.aurora.scheduler.offers.HostOffer; + +final class HttpOfferSetUtil { + + private HttpOfferSetUtil() { + // Utility class. + } + + static Number percentile(List list, double percentile) { + if (list.isEmpty()) { + return 0.0; + } + + // index should be a full integer. use quantile scale to allow reporting of percentile values + // such as p99.9. + double percentileCopy = percentile; + int quantileScale = 100; + while ((percentileCopy - Math.floor(percentileCopy)) > 0) { + quantileScale *= 10; + percentileCopy *= 10; + } + + return Quantiles.scale(quantileScale).index((int) Math.floor(quantileScale - percentileCopy)) + .compute(list); + } + + static long max(List list) { + long max = 0; + for (long e : list) { + if (e > max) { + max = e; + } + } + return max; + } + + static long avg(List list) { + if (list.isEmpty()) { + return 0; + } + + long avg = 0; + for (long e : list) { + avg += e; + } + return avg / list.size(); + } + + static List getHostnames(Set offers) { + List hostnames = new LinkedList<>(); + for (HostOffer offer: offers) { + hostnames.add(offer.getOffer().getHostname()); + } + return hostnames; + } +} diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java b/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java deleted file mode 100644 index 34ab45393..000000000 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpPluginConfig.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.github.aurora.scheduler.offers; - -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.charset.StandardCharsets; - -import com.google.gson.Gson; - -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * HttpPluginConfig is used to represent the configuration this HttpOfferSetModule. - * It has the host & port of the external scheduling unit that this plugin communicates with. - * It loads the configuration from a file. An file example is as follows. - * http-plugin.json file: - * { - * "url": "http://localhost:9090/v1/offerset", - * "debug": true, - * "logStepInTaskNum": 100 - * } - */ -public class HttpPluginConfig { - private static final Logger LOG = LoggerFactory.getLogger(HttpPluginConfig.class); - private static final int DEFAULT_LOG_STEP = 1000; - - public static final String CONFIG_FILE = "/etc/aurora-scheduler/http-endpoint.json"; - - private URL url; - private Config config; - private int timeoutMillisec; - private boolean ready; - - public HttpPluginConfig() throws MalformedURLException { - // load file - String jsonStr = null; - try { - jsonStr = FileUtils.readFileToString(new File(CONFIG_FILE), StandardCharsets.UTF_8); - } catch (IOException io) { - LOG.error("Cannot load " + CONFIG_FILE + "\n " + io.toString()); - } - if (jsonStr == null || "".equals(jsonStr)) { - LOG.error(CONFIG_FILE + " is empty"); - } else { - config = new Gson().fromJson(jsonStr, Config.class); - if (config == null) { - LOG.error(CONFIG_FILE + " is invalid."); - } else { - this.url = new URL(config.url); - this.timeoutMillisec = config.timeoutMillisec; - this.ready = true; - LOG.info("HttpOfferSetModule url: " + this.url - + ", timeout (seconds): " + this.timeoutMillisec + "\n"); - } - } - } - - public URL getUrl() { - return this.url; - } - - public boolean isReady() { - return this.ready; - } - - // getTimeout returns timeout in milliseconds. - public int getTimeout() { - return this.timeoutMillisec; - } - - public boolean isDebug() { - if (config != null) { - return this.config.debug; - } - return false; - } - - public int getLogStepInTaskNum() { - if (config != null) { - return this.config.logStepInTaskNum; - } - return DEFAULT_LOG_STEP; - } - - // for parsing json config file. - static class Config { - String url; - boolean debug = false; - int logStepInTaskNum = DEFAULT_LOG_STEP; - int timeoutMillisec; - } -} diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java new file mode 100644 index 000000000..aaa337e52 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -0,0 +1,88 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.inject.Inject; + +import com.google.common.base.Supplier; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.apache.aurora.common.stats.StatsProvider; + +import static java.util.Objects.requireNonNull; + +public class StatCalculator implements Runnable { + private final LoadingCache metricCache; + + private static class Counter implements Supplier { + private final AtomicReference value = new AtomicReference<>((Number) 0); + private final StatsProvider statsProvider; + private boolean exported; + + Counter(StatsProvider statsProvider) { + this.statsProvider = statsProvider; + } + + @Override + public Number get() { + return value.get(); + } + + private void set(String name, Number newValue) { + if (!exported) { + statsProvider.makeGauge(name, this); + exported = true; + } + value.set(newValue); + } + } + + @Inject + StatCalculator(final StatsProvider statsProvider) { + requireNonNull(statsProvider); + this.metricCache = CacheBuilder.newBuilder().build( + new CacheLoader() { + public StatCalculator.Counter load(String key) { + return new StatCalculator.Counter(statsProvider.untracked()); + } + }); + } + + @Override + public void run() { + float medianLatency = + HttpOfferSetUtil.percentile(HttpOfferSetModule.latencyMsList, 50.0) + .floatValue() / 1000000; + float avgLatency = + (float) HttpOfferSetUtil.avg(HttpOfferSetModule.latencyMsList) / 1000000; + float worstLatency = + (float) HttpOfferSetUtil.max(HttpOfferSetModule.latencyMsList) / 1000000; + String medianLatencyName = "http_offer_set_median_latency_ms"; + metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); + String worstLatencyName = "http_offer_set_worst_latency_ms"; + metricCache.getUnchecked(worstLatencyName).set(worstLatencyName, worstLatency); + String avgLatencyName = "http_offer_set_avg_latency_ms"; + metricCache.getUnchecked(avgLatencyName).set(avgLatencyName, avgLatency); + String failureCountName = "http_offer_set_failure_count"; + metricCache.getUnchecked(failureCountName).set(failureCountName, + HttpOfferSetModule.getFailureCount()); + // reset the stats. +// HttpOfferSetModule.latencyMsList.clear(); +// HttpOfferSetModule.resetFailureCount(); + } +} diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java new file mode 100644 index 000000000..1e8ecd5d4 --- /dev/null +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -0,0 +1,116 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.io.IOException; +import java.util.List; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HttpOfferSetImplTest extends EasyMockTest { + + private static final String HOST_A = "HOST_A"; + private static final IHostAttributes HOST_ATTRIBUTES_A = + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A)); + private static final HostOffer OFFER_A = new HostOffer( + Offers.makeOffer("OFFER_A", HOST_A), + HOST_ATTRIBUTES_A); + private static final String HOST_B = "HOST_B"; + private static final IHostAttributes HOST_ATTRIBUTES_B = + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_B)); + private static final HostOffer OFFER_B = new HostOffer( + Offers.makeOffer("OFFER_B", HOST_B), + HOST_ATTRIBUTES_B); + private static final String HOST_C = "HOST_C"; + private static final HostOffer OFFER_C = new HostOffer( + Offers.makeOffer("OFFER_C", HOST_C), + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); + private static final String HOST_D = "HOST_D"; + + private HttpOfferSetImpl httpOfferSet; + + @Before + public void setUp() throws IOException { + httpOfferSet = new HttpOfferSetImpl(); + httpOfferSet.add(OFFER_A); + httpOfferSet.add(OFFER_B); + httpOfferSet.add(OFFER_C); + } + + @Test + public void testProcessResponse() throws IOException { + control.replay(); + String responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + List sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 3); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + + // plugin returns less offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + + // plugin returns more offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_D + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 3); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + + responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + boolean isException = false; + try { + httpOfferSet.processResponse(responseStr); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + + responseStr = "{\"error\": \"\"}"; + isException = false; + try { + httpOfferSet.processResponse(responseStr); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + } +} From 373b8af36606d228b78865ef1195b01ea8b9739e Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Mon, 23 Nov 2020 12:03:54 -0800 Subject: [PATCH 13/24] add max retries --- .../scheduler/offers/HttpOfferSetImpl.java | 28 ++++++++++++++----- .../scheduler/offers/HttpOfferSetModule.java | 17 ++++++++++- .../scheduler/offers/StatCalculator.java | 12 ++++---- .../{HttpOfferSetUtil.java => Util.java} | 13 ++++----- 4 files changed, 50 insertions(+), 20 deletions(-) rename src/main/java/io/github/aurora/scheduler/offers/{HttpOfferSetUtil.java => Util.java} (85%) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 25505a30b..a756e819b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -59,7 +59,7 @@ public class HttpOfferSetImpl implements OfferSet { private final Gson gson; private Integer timeoutMs; private URL endpoint; - private boolean enabled; + private Integer maxRetries; public HttpOfferSetImpl() { offers = new HashSet<>(); @@ -71,6 +71,11 @@ public HttpOfferSetImpl() { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface Endpoint { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxRetries { } + @VisibleForTesting @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @@ -79,24 +84,26 @@ public HttpOfferSetImpl() { @Inject public HttpOfferSetImpl(Ordering ordering, @TimeoutMs Integer timeoutMs, - @Endpoint String url) { + @Endpoint String url, + @MaxRetries Integer maxRetries) { offers = new ConcurrentSkipListSet<>(ordering); gson = new Gson(); - enabled = true; try { endpoint = new URL(url); } catch (MalformedURLException e) { LOG.error("http_offer_set_endpoint is malformed. ", e); - enabled = false; + HttpOfferSetModule.enable(false); } - if (enabled) { + if (HttpOfferSetModule.isEnabled()) { LOG.info("HttpOfferSetModule Enabled."); } else { LOG.info("HttpOfferSetModule Disabled."); } this.timeoutMs = timeoutMs; + this.maxRetries = maxRetries; LOG.info("HttpOfferSet's endpoint: " + this.endpoint); LOG.info("HttpOfferSet's timeout: " + this.timeoutMs + " (ms)"); + LOG.info("HttpOfferSet's max retries: " + this.maxRetries); } @Override @@ -130,7 +137,7 @@ public Iterable values() { @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { // if there are no available offers, do nothing. - if (offers.isEmpty() || !this.enabled) { + if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) { return offers; } @@ -149,10 +156,17 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res + resourceRequest.getTask().getJob().toString() + " using HttpOfferSet. ", e); HttpOfferSetModule.incFailureCount(); + } finally { + // shutdown HttpOfferSet if failure is consistent. + if (HttpOfferSetModule.getFailureCount() >= maxRetries) { + LOG.error("Reaches " + maxRetries + ". HttpOfferSet Disabled."); + HttpOfferSetModule.enable(false); + } } if (orderedOffers != null) { return orderedOffers; } + // fall back to default scheduler. LOG.warn("Falling back on default ordering."); return offers; @@ -256,7 +270,7 @@ List processResponse(String responseStr) throws IOException { if (orderedOffers.isEmpty()) { LOG.warn("Cannot find any offers for this task. " + "Please check the condition of these hosts: " - + HttpOfferSetUtil.getHostnames(offers)); + + Util.getHostnames(offers)); } return orderedOffers; } else { diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 382c6d69c..d31440613 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -57,6 +57,7 @@ public class HttpOfferSetModule extends AbstractModule { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); private static long failureCount = 0; + private static boolean enabled = false; public static synchronized void incFailureCount() { HttpOfferSetModule.failureCount++; @@ -70,13 +71,24 @@ public static synchronized void resetFailureCount() { HttpOfferSetModule.failureCount = 0; } + public static synchronized void enable(boolean mEnabled) { + HttpOfferSetModule.enabled = mEnabled; + } + + public static synchronized boolean isEnabled() { + return HttpOfferSetModule.enabled; + } + @Parameters(separators = "=") public static class Options { @Parameter(names = "-http_offer_set_endpoint") - String httpOfferSetEndpoint = "http://localhost:9092/v1/offerset"; + String httpOfferSetEndpoint = "http://localhost:9090/v1/offerset"; @Parameter(names = "-http_offer_set_timeout_ms") int httpOfferSetTimeoutMs = 100; + + @Parameter(names = "-http_offer_set_max_retries") + int httpOfferSetMaxRetries = 10; } static { @@ -103,6 +115,9 @@ protected void configure() { bind(String.class) .annotatedWith(HttpOfferSetImpl.Endpoint.class) .toInstance(options.httpOfferSetEndpoint); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.MaxRetries.class) + .toInstance(options.httpOfferSetMaxRetries); bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index aaa337e52..4ef1c056b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -66,12 +66,13 @@ public StatCalculator.Counter load(String key) { @Override public void run() { float medianLatency = - HttpOfferSetUtil.percentile(HttpOfferSetModule.latencyMsList, 50.0) + Util.percentile(HttpOfferSetModule.latencyMsList, 50.0) .floatValue() / 1000000; float avgLatency = - (float) HttpOfferSetUtil.avg(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.avg(HttpOfferSetModule.latencyMsList) / 1000000; float worstLatency = - (float) HttpOfferSetUtil.max(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.max(HttpOfferSetModule.latencyMsList) / 1000000; + String medianLatencyName = "http_offer_set_median_latency_ms"; metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); String worstLatencyName = "http_offer_set_worst_latency_ms"; @@ -81,8 +82,9 @@ public void run() { String failureCountName = "http_offer_set_failure_count"; metricCache.getUnchecked(failureCountName).set(failureCountName, HttpOfferSetModule.getFailureCount()); + // reset the stats. -// HttpOfferSetModule.latencyMsList.clear(); -// HttpOfferSetModule.resetFailureCount(); + HttpOfferSetModule.latencyMsList.clear(); + HttpOfferSetModule.resetFailureCount(); } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java b/src/main/java/io/github/aurora/scheduler/offers/Util.java similarity index 85% rename from src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java rename to src/main/java/io/github/aurora/scheduler/offers/Util.java index 682fbda56..43da3b970 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetUtil.java +++ b/src/main/java/io/github/aurora/scheduler/offers/Util.java @@ -15,19 +15,18 @@ import java.util.LinkedList; import java.util.List; -import java.util.Set; import com.google.common.math.Quantiles; import org.apache.aurora.scheduler.offers.HostOffer; -final class HttpOfferSetUtil { +public final class Util { - private HttpOfferSetUtil() { + private Util() { // Utility class. } - static Number percentile(List list, double percentile) { + public static Number percentile(List list, double percentile) { if (list.isEmpty()) { return 0.0; } @@ -45,7 +44,7 @@ static Number percentile(List list, double percentile) { .compute(list); } - static long max(List list) { + public static long max(List list) { long max = 0; for (long e : list) { if (e > max) { @@ -55,7 +54,7 @@ static long max(List list) { return max; } - static long avg(List list) { + public static long avg(List list) { if (list.isEmpty()) { return 0; } @@ -67,7 +66,7 @@ static long avg(List list) { return avg / list.size(); } - static List getHostnames(Set offers) { + public static List getHostnames(Iterable offers) { List hostnames = new LinkedList<>(); for (HostOffer offer: offers) { hostnames.add(offer.getOffer().getHostname()); From 1e7d1ec4d120d7113f528d4ea373104f4c6b0704 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 1 Dec 2020 14:51:28 -0800 Subject: [PATCH 14/24] disable HttpOfferSet after maxRetries --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index a756e819b..6a5b9f7d6 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -90,13 +90,11 @@ public HttpOfferSetImpl(Ordering ordering, gson = new Gson(); try { endpoint = new URL(url); + HttpOfferSetModule.enable(true); + LOG.info("HttpOfferSetModule Enabled."); } catch (MalformedURLException e) { LOG.error("http_offer_set_endpoint is malformed. ", e); HttpOfferSetModule.enable(false); - } - if (HttpOfferSetModule.isEnabled()) { - LOG.info("HttpOfferSetModule Enabled."); - } else { LOG.info("HttpOfferSetModule Disabled."); } this.timeoutMs = timeoutMs; From 70a5d783839163afd5774d1ac505b92cd92b9f43 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 1 Dec 2020 15:09:35 -0800 Subject: [PATCH 15/24] add docs for HttpOfferSetImpl --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 6a5b9f7d6..585bf2ee8 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -50,6 +50,9 @@ /** * Implementation for OfferSet. + * HttpOfferSetImpl sorts offers using an external endpoint. + * It sends the request (request + offers) to the external endpoint + * and receives the response (sorted offers). */ @VisibleForTesting public class HttpOfferSetImpl implements OfferSet { From e7a3cde963d49e7e1f2492ca3c56549e44bd5403 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 1 Dec 2020 15:29:04 -0800 Subject: [PATCH 16/24] resolve minor PR comments --- .../scheduler/offers/HttpOfferSetImpl.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 585bf2ee8..e73adf598 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -180,15 +180,13 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); List hosts = new LinkedList<>(); for (HostOffer offer : offers) { - Host host = new Host(); - host.name = offer.getAttributes().getHost(); double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) + offer.getResourceBag(false).valueOf(ResourceType.CPUS); double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); - host.offer = new Resource(cpu, memory, disk); + Host host = new Host(offer.getAttributes().getHost(), new Resource(cpu, memory, disk)); hosts.add(host); } IJobKey jobKey = resourceRequest.getTask().getJob(); @@ -246,28 +244,27 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { List processResponse(String responseStr) throws IOException { // process the response ScheduleResponse response = gson.fromJson(responseStr, ScheduleResponse.class); - LOG.debug("Response: " + responseStr); + if (response.error == null || response.hosts == null) { + LOG.info("Response: " + responseStr); + throw new IOException("response is malformed"); + } + Map offerMap = new HashMap<>(); for (HostOffer offer : offers) { offerMap.put(offer.getAttributes().getHost(), offer); } List orderedOffers = new ArrayList<>(); if (response.error.trim().isEmpty()) { - if (response.hosts == null) { - LOG.error("Get no offers from the HttpOfferSet endpoint."); - throw new IOException(); - } else { - for (String host : response.hosts) { - HostOffer offer = offerMap.get(host); - if (offer == null) { - LOG.warn("Cannot find host " + host + " in the response"); - } else { - orderedOffers.add(offer); - } + for (String host : response.hosts) { + HostOffer offer = offerMap.get(host); + if (offer == null) { + LOG.warn("Cannot find host " + host + " in the response"); + } else { + orderedOffers.add(offer); } - LOG.debug("Sorted offers: " + String.join(",", - response.hosts.subList(0, Math.min(5, response.hosts.size())) + "...")); } + LOG.debug("Sorted offers: " + String.join(",", + response.hosts.subList(0, Math.min(5, response.hosts.size())) + "...")); if (orderedOffers.isEmpty()) { LOG.warn("Cannot find any offers for this task. " + "Please check the condition of these hosts: " @@ -285,6 +282,11 @@ static class Host { String name; Resource offer; + Host(String mName, Resource mOffer) { + name = mName; + offer = mOffer; + } + @Override public String toString() { return "Host{" + "name='" + name + '\'' + ", offer=" + offer + '}'; From bafeb3e3cc2b9415d44e79b44dd29b7e08627292 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 15 Dec 2020 21:11:53 -0800 Subject: [PATCH 17/24] Address PR reviews --- .../scheduler/offers/HttpOfferSetImpl.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index e73adf598..f04072e1d 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; import javax.inject.Qualifier; @@ -178,17 +179,14 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); - List hosts = new LinkedList<>(); - for (HostOffer offer : offers) { - double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) - + offer.getResourceBag(false).valueOf(ResourceType.CPUS); - double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) - + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB); - double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) - + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB); - Host host = new Host(offer.getAttributes().getHost(), new Resource(cpu, memory, disk)); - hosts.add(host); - } + List hosts = offers.stream().map(offer -> new Host(offer.getAttributes().getHost(), + new Resource(offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS), + offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB), + offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB)))) + .collect(Collectors.toList()); IJobKey jobKey = resourceRequest.getTask().getJob(); String jobKeyStr = jobKey.getRole() + "-" + jobKey.getEnvironment() + "-" + jobKey.getName() + "@" + startTime; @@ -249,10 +247,8 @@ List processResponse(String responseStr) throws IOException { throw new IOException("response is malformed"); } - Map offerMap = new HashMap<>(); - for (HostOffer offer : offers) { - offerMap.put(offer.getAttributes().getHost(), offer); - } + Map offerMap = offers.stream() + .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); List orderedOffers = new ArrayList<>(); if (response.error.trim().isEmpty()) { for (String host : response.hosts) { From fdc476ae7a1351a444426ddf6cc1f6a791d59480 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 22 Dec 2020 17:46:21 -0800 Subject: [PATCH 18/24] optimize code using httpclient --- .../scheduler/offers/HttpOfferSetImpl.java | 79 ++++++++----------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index f04072e1d..effe5703b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -14,15 +14,10 @@ package io.github.aurora.scheduler.offers; import java.io.IOException; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.net.HttpURLConnection; import java.net.MalformedURLException; -import java.net.ProtocolException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; @@ -40,7 +35,14 @@ import org.apache.aurora.scheduler.offers.OfferSet; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,47 +198,34 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { LOG.debug("Sending request for " + scheduleRequest.toString()); - // create connection - HttpURLConnection con; + CloseableHttpClient httpClient = HttpClients.createDefault(); try { - con = (HttpURLConnection) this.endpoint.openConnection(); - con.setConnectTimeout(this.timeoutMs); - con.setReadTimeout(this.timeoutMs); - con.setRequestMethod("POST"); - con.setRequestProperty("Content-Type", "application/json; utf-8"); - con.setRequestProperty("Accept", "application/json"); - con.setDoOutput(true); - } catch (ProtocolException pe) { - LOG.error("The HTTP protocol was not setup correctly."); - throw pe; - } catch (IOException ioe) { - LOG.error("Unable to open HTTP connection."); - throw ioe; - } - String jsonStr = gson.toJson(scheduleRequest); - LOG.debug("request to plugin: " + jsonStr); - try (OutputStream os = con.getOutputStream()) { - byte[] input = jsonStr.getBytes(StandardCharsets.UTF_8); - os.write(input, 0, input.length); - } catch (IOException ioe) { - LOG.error("Unable to send scheduleRequest to http endpoint " - + this.endpoint, ioe); - throw ioe; - } - - // read response - StringBuilder response = new StringBuilder(); - try { - String responseLine = IOUtils.toString(con.getInputStream(), StandardCharsets.UTF_8); - response.append(responseLine.trim()); - } catch (UnsupportedEncodingException uee) { - LOG.error("Response is not valid.", uee); - throw uee; - } catch (IOException ioe) { - LOG.error("Unable to read the response from the http-plugin.", ioe); - throw ioe; + HttpPost request = new HttpPost(this.endpoint.toString()); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(this.timeoutMs) + .setConnectTimeout(this.timeoutMs) + .setSocketTimeout(this.timeoutMs) + .build(); + request.setConfig(requestConfig); + request.addHeader("Content-Type", "application/json; utf-8"); + request.addHeader("Accept", "application/json"); + request.setEntity(new StringEntity(gson.toJson(scheduleRequest))); + CloseableHttpResponse response = httpClient.execute(request); + try { + HttpEntity entity = response.getEntity(); + if (entity == null) { + throw new IOException("Empty response from the external http endpoint."); + } else { + String responseStr = EntityUtils.toString(entity); + LOG.info("response: " + responseStr); + return responseStr; + } + } finally { + response.close(); + } + } finally { + httpClient.close(); } - return response.toString(); } List processResponse(String responseStr) throws IOException { From 10ff13e53870f74ce6f047d736ac3d96b0c0219d Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Tue, 22 Dec 2020 17:49:49 -0800 Subject: [PATCH 19/24] optimize code using httpclient --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index effe5703b..5edb46ef8 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -216,9 +216,7 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { if (entity == null) { throw new IOException("Empty response from the external http endpoint."); } else { - String responseStr = EntityUtils.toString(entity); - LOG.info("response: " + responseStr); - return responseStr; + return EntityUtils.toString(entity); } } finally { response.close(); From 118d4bd974c147fb0cdc1f4ae17550bb3903b07d Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Thu, 24 Dec 2020 20:47:22 -0800 Subject: [PATCH 20/24] address PR reviews --- .../github/aurora/scheduler/offers/HttpOfferSetImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 5edb46ef8..936848964 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -68,7 +68,7 @@ public class HttpOfferSetImpl implements OfferSet { private Integer maxRetries; public HttpOfferSetImpl() { - offers = new HashSet<>(); + offers = new ConcurrentSkipListSet<>(); gson = new Gson(); } @@ -150,7 +150,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); - LOG.info("Sending request " + scheduleRequest.jobKey); + LOG.debug("Sending request " + scheduleRequest.jobKey); String responseStr = this.sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr); LOG.info("received response for " + scheduleRequest.jobKey); @@ -163,7 +163,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res } finally { // shutdown HttpOfferSet if failure is consistent. if (HttpOfferSetModule.getFailureCount() >= maxRetries) { - LOG.error("Reaches " + maxRetries + ". HttpOfferSet Disabled."); + LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled."); HttpOfferSetModule.enable(false); } } @@ -230,7 +230,7 @@ List processResponse(String responseStr) throws IOException { // process the response ScheduleResponse response = gson.fromJson(responseStr, ScheduleResponse.class); if (response.error == null || response.hosts == null) { - LOG.info("Response: " + responseStr); + LOG.error("Response: " + responseStr); throw new IOException("response is malformed"); } From a93778e542f3b6cfbf2ca4c78f750c7511d839e0 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Thu, 24 Dec 2020 20:52:21 -0800 Subject: [PATCH 21/24] address PR reviews --- .../io/github/aurora/scheduler/offers/HttpOfferSetModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index d31440613..d0347a647 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -167,7 +167,7 @@ protected void startUp() { @Override protected void shutDown() { - // Ignored. VM shutdown is required to stop computing. + // Ignored. } } } From 99e7ee031c1269789e9d903ec716a5d9ade1a4bf Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Fri, 25 Dec 2020 19:26:45 -0800 Subject: [PATCH 22/24] address PR review --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 936848964..a35012c97 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -153,7 +153,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res LOG.debug("Sending request " + scheduleRequest.jobKey); String responseStr = this.sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr); - LOG.info("received response for " + scheduleRequest.jobKey); + LOG.debug("received response for " + scheduleRequest.jobKey); HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); } catch (IOException e) { LOG.error("Failed to schedule the task of " From 7aa6071862f2447d11e32fc6da5219acf9d7f1bd Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Fri, 25 Dec 2020 20:15:42 -0800 Subject: [PATCH 23/24] fix CI build error --- .../github/aurora/scheduler/offers/HttpOfferSetImpl.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index a35012c97..6af34971b 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -62,14 +62,13 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; - private final Gson gson; + private final Gson gson = new Gson(); private Integer timeoutMs; private URL endpoint; private Integer maxRetries; - public HttpOfferSetImpl() { - offers = new ConcurrentSkipListSet<>(); - gson = new Gson(); + public HttpOfferSetImpl(Set offers) { + this.offers = offers; } @VisibleForTesting @@ -93,7 +92,6 @@ public HttpOfferSetImpl(Ordering ordering, @Endpoint String url, @MaxRetries Integer maxRetries) { offers = new ConcurrentSkipListSet<>(ordering); - gson = new Gson(); try { endpoint = new URL(url); HttpOfferSetModule.enable(true); From 0e2171389ac98be830c340e11400ebd5809409a7 Mon Sep 17 00:00:00 2001 From: Nhat Tan Le Date: Fri, 25 Dec 2020 20:24:54 -0800 Subject: [PATCH 24/24] fix CI build error --- .../github/aurora/scheduler/offers/HttpOfferSetImplTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 1e8ecd5d4..475a5bc78 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -14,6 +14,7 @@ package io.github.aurora.scheduler.offers; import java.io.IOException; +import java.util.HashSet; import java.util.List; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -52,7 +53,7 @@ public class HttpOfferSetImplTest extends EasyMockTest { @Before public void setUp() throws IOException { - httpOfferSet = new HttpOfferSetImpl(); + httpOfferSet = new HttpOfferSetImpl(new HashSet<>()); httpOfferSet.add(OFFER_A); httpOfferSet.add(OFFER_B); httpOfferSet.add(OFFER_C);