From d8f6f1f67af50df54c16c05e01178428d2aa84ff Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 14 May 2021 14:42:35 -0700 Subject: [PATCH 01/35] performance & monitoring improvement --- .../scheduler/offers/HttpOfferSetImpl.java | 109 +++++++++++++++++- .../scheduler/offers/HttpOfferSetModule.java | 1 + .../scheduler/offers/StatCalculator.java | 6 + 3 files changed, 110 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 52b0bdd6d..02f9d069f 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -19,6 +19,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,9 +28,9 @@ import javax.inject.Qualifier; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Ordering; -import com.google.gson.Gson; import com.google.inject.Inject; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -65,7 +66,8 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; - private final Gson gson = new Gson(); + private final ObjectMapper jsonMapper = new ObjectMapper(); + private Integer timeoutMs; private URL endpoint; private Integer maxRetries; @@ -151,7 +153,8 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); - LOG.debug("Sending request " + scheduleRequest.jobKey); + LOG.info("Sending request " + scheduleRequest.jobKey + " with " + this.offers.size() + + " offers"); String responseStr = this.sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr); LOG.debug("received response for " + scheduleRequest.jobKey); @@ -210,7 +213,7 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { request.setConfig(requestConfig); request.addHeader("Content-Type", "application/json; utf-8"); request.addHeader("Accept", "application/json"); - request.setEntity(new StringEntity(gson.toJson(scheduleRequest))); + request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); CloseableHttpResponse response = httpClient.execute(request); try { HttpEntity entity = response.getEntity(); @@ -228,12 +231,14 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { } List processResponse(String responseStr) throws IOException { + List extraOffers = new LinkedList<>(); // process the response - ScheduleResponse response = gson.fromJson(responseStr, ScheduleResponse.class); + ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); if (response.error == null || response.hosts == null) { LOG.error("Response: " + responseStr); throw new IOException("response is malformed"); } + LOG.info("Received " + response.hosts.size() + " offers"); Map offerMap = offers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); @@ -242,7 +247,7 @@ List processResponse(String responseStr) throws IOException { for (String host : response.hosts) { HostOffer offer = offerMap.get(host); if (offer == null) { - LOG.warn("Cannot find host " + host + " in the response"); + extraOffers.add(host); } else { orderedOffers.add(offer); } @@ -254,6 +259,18 @@ List processResponse(String responseStr) throws IOException { + "Please check the condition of these hosts: " + Util.getHostnames(offers)); } + + long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + + extraOffers.size(); + HttpOfferSetModule.offerSetDiff.add(offSetDiff); + if (offSetDiff > 0) { + LOG.warn("The number of different offers between the original and received offer sets is " + + offSetDiff); + if (!extraOffers.isEmpty()) { + LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); + } + } + return orderedOffers; } else { LOG.error("Unable to get sorted offers due to " + response.error); @@ -275,6 +292,22 @@ static class Host { public String toString() { return "Host{" + "name='" + name + '\'' + ", offer=" + offer + '}'; } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Resource getOffer() { + return offer; + } + + public void setOffer(Resource offer) { + this.offer = offer; + } } // Resource is used between Aurora and MagicMatch. @@ -293,6 +326,30 @@ static class Resource { public String toString() { return "Resource{" + "cpu=" + cpu + ", memory=" + memory + ", disk=" + disk + '}'; } + + public double getCpu() { + return cpu; + } + + public void setCpu(double cpu) { + this.cpu = cpu; + } + + public double getMemory() { + return memory; + } + + public void setMemory(double memory) { + this.memory = memory; + } + + public double getDisk() { + return disk; + } + + public void setDisk(double disk) { + this.disk = disk; + } } // ScheduleRequest is the request sent to MagicMatch. @@ -312,6 +369,30 @@ public String toString() { return "ScheduleRequest{" + "jobKey=" + jobKey + "request=" + request + ", hosts=" + hosts + '}'; } + + public String getJobKey() { + return jobKey; + } + + public void setJobKey(String jobKey) { + this.jobKey = jobKey; + } + + public Resource getRequest() { + return request; + } + + public void setRequest(Resource request) { + this.request = request; + } + + public List getHosts() { + return hosts; + } + + public void setHosts(List hosts) { + this.hosts = hosts; + } } // ScheduleResponse is the scheduling result responded by MagicMatch @@ -323,5 +404,21 @@ static class ScheduleResponse { public String toString() { return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" + hosts + '}'; } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public List getHosts() { + return hosts; + } + + public void setHosts(List hosts) { + this.hosts = hosts; + } } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index d0347a647..00435caf3 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -56,6 +56,7 @@ public class HttpOfferSetModule extends AbstractModule { private final Options options; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); + static List offerSetDiff = Collections.synchronizedList(new LinkedList<>()); private static long failureCount = 0; private static boolean enabled = false; diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index 4ef1c056b..a1baf5629 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -83,8 +83,14 @@ public void run() { metricCache.getUnchecked(failureCountName).set(failureCountName, HttpOfferSetModule.getFailureCount()); + long maxOfferSetDiff = Util.max(HttpOfferSetModule.offerSetDiff); + String maxOffSetDiffName = "http_offer_set_max_diff"; + metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, + maxOfferSetDiff); + // reset the stats. HttpOfferSetModule.latencyMsList.clear(); HttpOfferSetModule.resetFailureCount(); +// HttpOfferSetModule.offerSetDiff.clear(); } } From c4e650b027987cd7eed610657a76e0381a16fff0 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 14 May 2021 16:54:25 -0700 Subject: [PATCH 02/35] use the same CloseableHttpClient for multiple requests --- .../scheduler/offers/HttpOfferSetImpl.java | 41 +++++++++---------- .../scheduler/offers/StatCalculator.java | 2 +- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 02f9d069f..6b2622c94 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -67,6 +67,8 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; private final ObjectMapper jsonMapper = new ObjectMapper(); + // we can reuse CloseableHttpClient for multiple requests + private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createDefault(); private Integer timeoutMs; private URL endpoint; @@ -202,31 +204,26 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { LOG.debug("Sending request for " + scheduleRequest.toString()); - CloseableHttpClient httpClient = HttpClients.createDefault(); + HttpPost request = new HttpPost(this.endpoint.toString()); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(this.timeoutMs) + .setConnectTimeout(this.timeoutMs) + .setSocketTimeout(this.timeoutMs) + .build(); + request.setConfig(requestConfig); + request.addHeader("Content-Type", "application/json; utf-8"); + request.addHeader("Accept", "application/json"); + request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); + CloseableHttpResponse response = HTTP_CLIENT.execute(request); try { - HttpPost request = new HttpPost(this.endpoint.toString()); - RequestConfig requestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(this.timeoutMs) - .setConnectTimeout(this.timeoutMs) - .setSocketTimeout(this.timeoutMs) - .build(); - request.setConfig(requestConfig); - request.addHeader("Content-Type", "application/json; utf-8"); - request.addHeader("Accept", "application/json"); - request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); - CloseableHttpResponse response = httpClient.execute(request); - try { - HttpEntity entity = response.getEntity(); - if (entity == null) { - throw new IOException("Empty response from the external http endpoint."); - } else { - return EntityUtils.toString(entity); - } - } finally { - response.close(); + HttpEntity entity = response.getEntity(); + if (entity == null) { + throw new IOException("Empty response from the external http endpoint."); + } else { + return EntityUtils.toString(entity); } } finally { - httpClient.close(); + response.close(); } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index a1baf5629..791299f06 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -91,6 +91,6 @@ public void run() { // reset the stats. HttpOfferSetModule.latencyMsList.clear(); HttpOfferSetModule.resetFailureCount(); -// HttpOfferSetModule.offerSetDiff.clear(); + HttpOfferSetModule.offerSetDiff.clear(); } } From d3d77a235bcc6457ea2df7d7ea3b99431c6f513d Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 14 May 2021 20:08:53 -0700 Subject: [PATCH 03/35] fix compiling error --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 6b2622c94..b990bcc4a 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -67,8 +67,8 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; private final ObjectMapper jsonMapper = new ObjectMapper(); - // we can reuse CloseableHttpClient for multiple requests - private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createDefault(); + // Using CloseableHttpClient for multiple requests + private final CloseableHttpClient httpClient = HttpClients.createDefault(); private Integer timeoutMs; private URL endpoint; @@ -214,7 +214,7 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { request.addHeader("Content-Type", "application/json; utf-8"); request.addHeader("Accept", "application/json"); request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); - CloseableHttpResponse response = HTTP_CLIENT.execute(request); + CloseableHttpResponse response = httpClient.execute(request); try { HttpEntity entity = response.getEntity(); if (entity == null) { From 42bf2f7789d0a59ad7b0b91ff182c0ddd040e394 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 18 May 2021 16:31:47 -0700 Subject: [PATCH 04/35] fix CI error --- .../scheduler/offers/HttpOfferSetImpl.java | 40 ++++++------------- .../scheduler/offers/HttpOfferSetModule.java | 2 +- .../scheduler/offers/StatCalculator.java | 4 +- .../offers/HttpOfferSetImplTest.java | 27 ++++++++++--- 4 files changed, 37 insertions(+), 36 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index b990bcc4a..fcdadecf4 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -18,8 +18,6 @@ import java.lang.annotation.Target; import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,11 +61,10 @@ */ @VisibleForTesting public class HttpOfferSetImpl implements OfferSet { - private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); + private final Set offers; private final ObjectMapper jsonMapper = new ObjectMapper(); - // Using CloseableHttpClient for multiple requests private final CloseableHttpClient httpClient = HttpClients.createDefault(); private Integer timeoutMs; @@ -158,8 +155,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res LOG.info("Sending request " + scheduleRequest.jobKey + " with " + this.offers.size() + " offers"); String responseStr = this.sendRequest(scheduleRequest); - orderedOffers = processResponse(responseStr); - LOG.debug("received response for " + scheduleRequest.jobKey); + orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); } catch (IOException e) { LOG.error("Failed to schedule the task of " @@ -213,7 +209,7 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { request.setConfig(requestConfig); request.addHeader("Content-Type", "application/json; utf-8"); request.addHeader("Accept", "application/json"); - request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); + request.setEntity(new StringEntity(new ObjectMapper().writeValueAsString(scheduleRequest))); CloseableHttpResponse response = httpClient.execute(request); try { HttpEntity entity = response.getEntity(); @@ -227,8 +223,8 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { } } - List processResponse(String responseStr) throws IOException { - List extraOffers = new LinkedList<>(); + List processResponse(String responseStr, List offerSetDiffList) + throws IOException { // process the response ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); if (response.error == null || response.hosts == null) { @@ -239,27 +235,15 @@ List processResponse(String responseStr) throws IOException { Map offerMap = offers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); - List orderedOffers = new ArrayList<>(); if (response.error.trim().isEmpty()) { - for (String host : response.hosts) { - HostOffer offer = offerMap.get(host); - if (offer == null) { - extraOffers.add(host); - } else { - orderedOffers.add(offer); - } - } - LOG.debug("Sorted offers: " + String.join(",", - response.hosts.subList(0, Math.min(5, response.hosts.size())) + "...")); - if (orderedOffers.isEmpty()) { - LOG.warn("Cannot find any offers for this task. " - + "Please check the condition of these hosts: " - + Util.getHostnames(offers)); - } + List orderedOffers = response.hosts.stream().map(host -> offerMap.get(host)). + filter(offer -> offer != null).collect(Collectors.toList()); + List extraOffers = response.hosts.stream().filter(host -> offerMap.get(host) == null) + .collect(Collectors.toList()); long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + extraOffers.size(); - HttpOfferSetModule.offerSetDiff.add(offSetDiff); + offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is " + offSetDiff); @@ -270,13 +254,13 @@ List processResponse(String responseStr) throws IOException { return orderedOffers; } else { - LOG.error("Unable to get sorted offers due to " + response.error); + LOG.error("Unable to receive offers from " + this.endpoint + " due to " + response.error); throw new IOException(response.error); } } // Host represents for each host offer. - static class Host { + public static class Host { String name; Resource offer; diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 00435caf3..e2c33333d 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -56,7 +56,7 @@ public class HttpOfferSetModule extends AbstractModule { private final Options options; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); - static List offerSetDiff = Collections.synchronizedList(new LinkedList<>()); + static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); private static long failureCount = 0; private static boolean enabled = false; diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index 791299f06..ed21d9100 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -83,7 +83,7 @@ public void run() { metricCache.getUnchecked(failureCountName).set(failureCountName, HttpOfferSetModule.getFailureCount()); - long maxOfferSetDiff = Util.max(HttpOfferSetModule.offerSetDiff); + long maxOfferSetDiff = Util.max(HttpOfferSetModule.offerSetDiffList); String maxOffSetDiffName = "http_offer_set_max_diff"; metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, maxOfferSetDiff); @@ -91,6 +91,6 @@ public void run() { // reset the stats. HttpOfferSetModule.latencyMsList.clear(); HttpOfferSetModule.resetFailureCount(); - HttpOfferSetModule.offerSetDiff.clear(); + HttpOfferSetModule.offerSetDiffList.clear(); } } diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 475a5bc78..36cb1fc5e 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -66,20 +67,24 @@ public void testProcessResponse() throws IOException { + HOST_A + "\",\"" + HOST_B + "\",\"" + HOST_C + "\"]}"; - List sortedOffers = httpOfferSet.processResponse(responseStr); + List offerSetDiffList = new LinkedList<>(); + + List sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + assertEquals((long) offerSetDiffList.get(0), 0); // plugin returns less offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr); + sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + assertEquals((long) offerSetDiffList.get(1), 1); // plugin returns more offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" @@ -87,11 +92,23 @@ public void testProcessResponse() throws IOException { + HOST_B + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr); + sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + assertEquals((long) offerSetDiffList.get(2), 1); + + // plugin omits 1 offer & returns 1 extra offer + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_D + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + assertEquals((long) offerSetDiffList.get(3), 2); responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + HOST_A + "\",\"" @@ -99,7 +116,7 @@ public void testProcessResponse() throws IOException { + HOST_C + "\"]}"; boolean isException = false; try { - httpOfferSet.processResponse(responseStr); + httpOfferSet.processResponse(responseStr, offerSetDiffList); } catch (IOException ioe) { isException = true; } @@ -108,7 +125,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"error\": \"\"}"; isException = false; try { - httpOfferSet.processResponse(responseStr); + httpOfferSet.processResponse(responseStr, new LinkedList<>()); } catch (IOException ioe) { isException = true; } From 2a35900cb84404bf0f59b56ac29744a136d4dbfb Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 18 May 2021 16:46:41 -0700 Subject: [PATCH 05/35] fix CI error --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index fcdadecf4..30484a49d 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -209,7 +209,7 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { request.setConfig(requestConfig); request.addHeader("Content-Type", "application/json; utf-8"); request.addHeader("Accept", "application/json"); - request.setEntity(new StringEntity(new ObjectMapper().writeValueAsString(scheduleRequest))); + request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); CloseableHttpResponse response = httpClient.execute(request); try { HttpEntity entity = response.getEntity(); From 1196da517ecede3d991f7e6eb3bd5b6ce9e35ddc Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 18 May 2021 21:44:03 -0700 Subject: [PATCH 06/35] resolve review comments --- docs/features/custom-plugins.md | 10 ++++ .../scheduler/offers/HttpOfferSetImpl.java | 48 ++++++++++--------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index ade0dd42b..e9f63bd35 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -58,3 +58,13 @@ The server returns the response in the following format. } ``` In the above example, the external REST API sorts the offers based on the number of available vcpus. + +How to monitor HTTP OfferSet? +We can monitor this plugin by looking at the endpoint `/vars`. The following metrics are available when HTTP OfferSet is enabled: +- `http_offer_set_avg_latency_ms`: The average latency per scheduling cycle in milliseconds. +- `http_offer_set_median_latency_ms`: The median latency per scheduling cycle in milliseconds. +- `http_offer_set_worst_latency_ms`: The worst latency per scheduling cycle in milliseconds. +- `http_offer_set_failure_count`: The number of scheduling failures. +- `http_offer_set_max_diff`: The number of different offers between the original `OfferSet` and the received one. + +HTTP OfferSet resets the above metrics every `sla_stat_refresh_interval`. diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 30484a49d..403c0d2ac 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -183,14 +183,16 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); - List hosts = offers.stream().map(offer -> new Host(offer.getAttributes().getHost(), - new Resource(offer.getResourceBag(true).valueOf(ResourceType.CPUS) - + offer.getResourceBag(false).valueOf(ResourceType.CPUS), - offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) - + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB), - offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) - + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB)))) - .collect(Collectors.toList()); + List hosts = + offers.stream() + .map(offer -> new Host(offer.getAttributes().getHost(), + new Resource(offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS), + offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB), + offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB)))) + .collect(Collectors.toList()); IJobKey jobKey = resourceRequest.getTask().getJob(); String jobKeyStr = jobKey.getRole() + "-" + jobKey.getEnvironment() + "-" + jobKey.getName() + "@" + startTime; @@ -215,9 +217,8 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { HttpEntity entity = response.getEntity(); if (entity == null) { throw new IOException("Empty response from the external http endpoint."); - } else { - return EntityUtils.toString(entity); } + return EntityUtils.toString(entity); } finally { response.close(); } @@ -227,7 +228,7 @@ List processResponse(String responseStr, List offerSetDiffList) throws IOException { // process the response ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); - if (response.error == null || response.hosts == null) { + if (response == null || response.error == null || response.hosts == null) { LOG.error("Response: " + responseStr); throw new IOException("response is malformed"); } @@ -236,10 +237,13 @@ List processResponse(String responseStr, List offerSetDiffList) Map offerMap = offers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); if (response.error.trim().isEmpty()) { - List orderedOffers = response.hosts.stream().map(host -> offerMap.get(host)). - filter(offer -> offer != null).collect(Collectors.toList()); - List extraOffers = response.hosts.stream().filter(host -> offerMap.get(host) == null) - .collect(Collectors.toList()); + List orderedOffers = response.hosts.stream() + .map(host -> offerMap.get(host)) + .filter(offer -> offer != null) + .collect(Collectors.toList()); + List extraOffers = response.hosts.stream() + .filter(host -> offerMap.get(host) == null) + .collect(Collectors.toList()); long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + extraOffers.size(); @@ -247,20 +251,20 @@ List processResponse(String responseStr, List offerSetDiffList) if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is " + offSetDiff); - if (!extraOffers.isEmpty()) { - LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); - } + } + if (!extraOffers.isEmpty()) { + LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); } return orderedOffers; - } else { - LOG.error("Unable to receive offers from " + this.endpoint + " due to " + response.error); - throw new IOException(response.error); } + + LOG.error("Unable to receive offers from " + this.endpoint + " due to " + response.error); + throw new IOException(response.error); } // Host represents for each host offer. - public static class Host { + static class Host { String name; Resource offer; From 81b887c12864f36c821052d972ef3d69b2f359d7 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Wed, 19 May 2021 14:01:10 -0700 Subject: [PATCH 07/35] resolve PR comments --- .../scheduler/offers/HttpOfferSetImpl.java | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 403c0d2ac..bc9e7b5a4 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.inject.Qualifier; import com.fasterxml.jackson.databind.ObjectMapper; @@ -213,25 +214,17 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { request.addHeader("Accept", "application/json"); request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); CloseableHttpResponse response = httpClient.execute(request); - try { - HttpEntity entity = response.getEntity(); - if (entity == null) { - throw new IOException("Empty response from the external http endpoint."); - } - return EntityUtils.toString(entity); - } finally { - response.close(); + HttpEntity entity = response.getEntity(); + if (entity == null) { + throw new IOException("Empty response from the external http endpoint."); } + return EntityUtils.toString(entity); } List processResponse(String responseStr, List offerSetDiffList) throws IOException { // process the response ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); - if (response == null || response.error == null || response.hosts == null) { - LOG.error("Response: " + responseStr); - throw new IOException("response is malformed"); - } LOG.info("Received " + response.hosts.size() + " offers"); Map offerMap = offers.stream() @@ -246,7 +239,7 @@ List processResponse(String responseStr, List offerSetDiffList) .collect(Collectors.toList()); long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) - + extraOffers.size(); + + extraOffers.size(); offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is " @@ -264,8 +257,11 @@ List processResponse(String responseStr, List offerSetDiffList) } // Host represents for each host offer. + @Nonnull static class Host { + @Nonnull String name; + @Nonnull Resource offer; Host(String mName, Resource mOffer) { @@ -296,6 +292,7 @@ public void setOffer(Resource offer) { } // Resource is used between Aurora and MagicMatch. + @Nonnull static class Resource { double cpu; double memory; @@ -339,8 +336,11 @@ public void setDisk(double disk) { // ScheduleRequest is the request sent to MagicMatch. static class ScheduleRequest { + @Nonnull String jobKey; + @Nonnull Resource request; + @Nonnull List hosts; ScheduleRequest(Resource request, List hosts, String jobKey) { @@ -381,10 +381,18 @@ public void setHosts(List hosts) { } // ScheduleResponse is the scheduling result responded by MagicMatch + @Nonnull static class ScheduleResponse { + @Nonnull String error; + @Nonnull List hosts; + ScheduleResponse(String error, List hosts) { + this.error = error; + this.hosts = hosts; + } + @Override public String toString() { return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" + hosts + '}'; From 6d836eabad9fe06d3749fab9672593c2073bdd1f Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Wed, 19 May 2021 22:31:29 -0700 Subject: [PATCH 08/35] reflect comments --- .../scheduler/offers/HttpOfferSetImpl.java | 110 +++++++++--------- .../offers/HttpOfferSetImplTest.java | 11 +- 2 files changed, 68 insertions(+), 53 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index bc9e7b5a4..12d418502 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -18,6 +18,7 @@ import java.lang.annotation.Target; import java.net.MalformedURLException; import java.net.URL; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,8 +126,7 @@ public void remove(HostOffer removed) { @Override public int size() { - // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more - // expensive. + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. // Could track this separately if it turns out to pose problems. return offers.size(); } @@ -153,8 +153,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); - LOG.info("Sending request " + scheduleRequest.jobKey + " with " + this.offers.size() - + " offers"); + LOG.info("Sending request " + scheduleRequest.jobKey); String responseStr = this.sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); @@ -203,27 +202,30 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { LOG.debug("Sending request for " + scheduleRequest.toString()); - HttpPost request = new HttpPost(this.endpoint.toString()); + HttpPost request = new HttpPost(endpoint.toString()); RequestConfig requestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(this.timeoutMs) - .setConnectTimeout(this.timeoutMs) - .setSocketTimeout(this.timeoutMs) + .setConnectionRequestTimeout(timeoutMs) + .setConnectTimeout(timeoutMs) + .setSocketTimeout(timeoutMs) .build(); request.setConfig(requestConfig); request.addHeader("Content-Type", "application/json; utf-8"); request.addHeader("Accept", "application/json"); request.setEntity(new StringEntity(jsonMapper.writeValueAsString(scheduleRequest))); - CloseableHttpResponse response = httpClient.execute(request); - HttpEntity entity = response.getEntity(); - if (entity == null) { - throw new IOException("Empty response from the external http endpoint."); + try { + CloseableHttpResponse response = httpClient.execute(request); + HttpEntity entity = response.getEntity(); + if (entity == null) { + throw new IOException("Empty response from the external http endpoint."); + } + return EntityUtils.toString(entity); + } catch (IOException ie) { + throw ie; } - return EntityUtils.toString(entity); } List processResponse(String responseStr, List offerSetDiffList) throws IOException { - // process the response ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); LOG.info("Received " + response.hosts.size() + " offers"); @@ -238,6 +240,7 @@ List processResponse(String responseStr, List offerSetDiffList) .filter(host -> offerMap.get(host) == null) .collect(Collectors.toList()); + //offSetDiff is the total number of missing offers and the extra offers long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + extraOffers.size(); offerSetDiffList.add(offSetDiff); @@ -245,14 +248,21 @@ List processResponse(String responseStr, List offerSetDiffList) LOG.warn("The number of different offers between the original and received offer sets is " + offSetDiff); } + if (LOG.isDebugEnabled() && offSetDiff > 0) { + List missedOffers = offers.stream() + .map(offer -> offer.getAttributes().getHost()) + .filter(host -> !response.hosts.contains(host)) + .collect(Collectors.toList()); + LOG.debug("missed offers: " + missedOffers); + LOG.debug("extra offers: " + extraOffers); + } if (!extraOffers.isEmpty()) { LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); } return orderedOffers; } - - LOG.error("Unable to receive offers from " + this.endpoint + " due to " + response.error); + LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error); throw new IOException(response.error); } @@ -260,9 +270,9 @@ List processResponse(String responseStr, List offerSetDiffList) @Nonnull static class Host { @Nonnull - String name; + String name = ""; @Nonnull - Resource offer; + Resource offer = new Resource(0, 0, 0); Host(String mName, Resource mOffer) { name = mName; @@ -278,16 +288,16 @@ public String getName() { return name; } - public void setName(String name) { - this.name = name; + public void setName(String mName) { + name = mName; } public Resource getOffer() { return offer; } - public void setOffer(Resource offer) { - this.offer = offer; + public void setOffer(Resource mOffer) { + offer = mOffer; } } @@ -298,10 +308,10 @@ static class Resource { double memory; double disk; - Resource(double cpu, double memory, double disk) { - this.cpu = cpu; - this.memory = memory; - this.disk = disk; + Resource(double mCpu, double mMemory, double mDisk) { + cpu = mCpu; + memory = mMemory; + disk = mDisk; } @Override @@ -313,40 +323,41 @@ public double getCpu() { return cpu; } - public void setCpu(double cpu) { - this.cpu = cpu; + public void setCpu(double mCpu) { + cpu = mCpu; } public double getMemory() { return memory; } - public void setMemory(double memory) { - this.memory = memory; + public void setMemory(double mMemory) { + memory = mMemory; } public double getDisk() { return disk; } - public void setDisk(double disk) { - this.disk = disk; + public void setDisk(double mDisk) { + disk = mDisk; } } // ScheduleRequest is the request sent to MagicMatch. + @Nonnull static class ScheduleRequest { @Nonnull - String jobKey; + String jobKey = ""; @Nonnull - Resource request; + Resource request = new Resource(0, 0, 0); @Nonnull - List hosts; + List hosts = new LinkedList<>();; - ScheduleRequest(Resource request, List hosts, String jobKey) { - this.request = request; - this.hosts = hosts; - this.jobKey = jobKey; + ScheduleRequest(Resource mRequest, List mHosts, String mJobKey) { + request = mRequest; + hosts = mHosts; + jobKey = mJobKey; } @Override @@ -359,24 +370,24 @@ public String getJobKey() { return jobKey; } - public void setJobKey(String jobKey) { - this.jobKey = jobKey; + public void setJobKey(String mJobKey) { + jobKey = mJobKey; } public Resource getRequest() { return request; } - public void setRequest(Resource request) { - this.request = request; + public void setRequest(Resource mRequest) { + this.request = mRequest; } public List getHosts() { return hosts; } - public void setHosts(List hosts) { - this.hosts = hosts; + public void setHosts(List mHosts) { + this.hosts = mHosts; } } @@ -384,14 +395,9 @@ public void setHosts(List hosts) { @Nonnull static class ScheduleResponse { @Nonnull - String error; + String error = ""; @Nonnull - List hosts; - - ScheduleResponse(String error, List hosts) { - this.error = error; - this.hosts = hosts; - } + List hosts = new LinkedList<>(); @Override public String toString() { diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 36cb1fc5e..f390355f5 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -122,7 +122,16 @@ public void testProcessResponse() throws IOException { } assertTrue(isException); - responseStr = "{\"error\": \"\"}"; + responseStr = "{\"error\": \"error\"}"; + isException = false; + try { + httpOfferSet.processResponse(responseStr, new LinkedList<>()); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + + responseStr = "{\"weird\": \"cannot decode this json string\"}"; isException = false; try { httpOfferSet.processResponse(responseStr, new LinkedList<>()); From 113a0bcb368bdcf4d20738fdea9d19778d4e28eb Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 20 May 2021 10:28:43 -0700 Subject: [PATCH 09/35] polish code --- .../scheduler/offers/HttpOfferSetImpl.java | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 12d418502..aeb5deefd 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -37,6 +37,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferSet; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.http.HttpEntity; @@ -185,13 +186,7 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); List hosts = offers.stream() - .map(offer -> new Host(offer.getAttributes().getHost(), - new Resource(offer.getResourceBag(true).valueOf(ResourceType.CPUS) - + offer.getResourceBag(false).valueOf(ResourceType.CPUS), - offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) - + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB), - offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) - + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB)))) + .map(offer -> new Host(offer.getAttributes().getHost(), new Resource(offer))) .collect(Collectors.toList()); IJobKey jobKey = resourceRequest.getTask().getJob(); String jobKeyStr = jobKey.getRole() + "-" + jobKey.getEnvironment() + "-" + jobKey.getName() @@ -231,42 +226,42 @@ List processResponse(String responseStr, List offerSetDiffList) Map offerMap = offers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); - if (response.error.trim().isEmpty()) { - List orderedOffers = response.hosts.stream() - .map(host -> offerMap.get(host)) - .filter(offer -> offer != null) + if (!response.error.trim().isEmpty()) { + LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error); + throw new IOException(response.error); + } + + List orderedOffers = response.hosts.stream() + .map(host -> offerMap.get(host)) + .filter(offer -> offer != null) + .collect(Collectors.toList()); + List extraOffers = response.hosts.stream() + .filter(host -> offerMap.get(host) == null) + .collect(Collectors.toList()); + + //offSetDiff is the total number of missing offers and the extra offers + long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + + extraOffers.size(); + offerSetDiffList.add(offSetDiff); + if (offSetDiff > 0) { + LOG.warn("The number of different offers between the original and received offer sets is " + + offSetDiff); + } + if (LOG.isDebugEnabled() && offSetDiff > 0) { + List missedOffers = offers.stream() + .map(offer -> offer.getAttributes().getHost()) + .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); - List extraOffers = response.hosts.stream() - .filter(host -> offerMap.get(host) == null) - .collect(Collectors.toList()); - - //offSetDiff is the total number of missing offers and the extra offers - long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) - + extraOffers.size(); - offerSetDiffList.add(offSetDiff); - if (offSetDiff > 0) { - LOG.warn("The number of different offers between the original and received offer sets is " - + offSetDiff); - } - if (LOG.isDebugEnabled() && offSetDiff > 0) { - List missedOffers = offers.stream() - .map(offer -> offer.getAttributes().getHost()) - .filter(host -> !response.hosts.contains(host)) - .collect(Collectors.toList()); - LOG.debug("missed offers: " + missedOffers); - LOG.debug("extra offers: " + extraOffers); - } - if (!extraOffers.isEmpty()) { - LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); - } - - return orderedOffers; + LOG.debug("missed offers: " + missedOffers); + LOG.debug("extra offers: " + extraOffers); + } + if (!extraOffers.isEmpty()) { + LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); } - LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error); - throw new IOException(response.error); + + return orderedOffers; } - // Host represents for each host offer. @Nonnull static class Host { @Nonnull @@ -301,13 +296,20 @@ public void setOffer(Resource mOffer) { } } - // Resource is used between Aurora and MagicMatch. @Nonnull static class Resource { double cpu; double memory; double disk; + Resource(HostOffer offer) { + ResourceBag revocable = offer.getResourceBag(true); + ResourceBag nonRevocable = offer.getResourceBag(false); + cpu = revocable.valueOf(ResourceType.CPUS) + nonRevocable.valueOf(ResourceType.CPUS); + memory = revocable.valueOf(ResourceType.RAM_MB) + nonRevocable.valueOf(ResourceType.RAM_MB); + disk = revocable.valueOf(ResourceType.DISK_MB) + nonRevocable.valueOf(ResourceType.DISK_MB); + } + Resource(double mCpu, double mMemory, double mDisk) { cpu = mCpu; memory = mMemory; @@ -344,7 +346,6 @@ public void setDisk(double mDisk) { } } - // ScheduleRequest is the request sent to MagicMatch. @Nonnull static class ScheduleRequest { @Nonnull @@ -391,7 +392,6 @@ public void setHosts(List mHosts) { } } - // ScheduleResponse is the scheduling result responded by MagicMatch @Nonnull static class ScheduleResponse { @Nonnull From dc594d54ae832e3d763861b115c1bce1028f48b6 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 20 May 2021 11:24:09 -0700 Subject: [PATCH 10/35] polish code --- .../aurora/scheduler/offers/HttpOfferSetImpl.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index aeb5deefd..a1b0fc010 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -246,15 +246,16 @@ List processResponse(String responseStr, List offerSetDiffList) if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is " + offSetDiff); - } - if (LOG.isDebugEnabled() && offSetDiff > 0) { - List missedOffers = offers.stream() + if (LOG.isDebugEnabled()) { + List missedOffers = offers.stream() .map(offer -> offer.getAttributes().getHost()) .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); - LOG.debug("missed offers: " + missedOffers); - LOG.debug("extra offers: " + extraOffers); + LOG.debug("missed offers: " + missedOffers); + LOG.debug("extra offers: " + extraOffers); + } } + if (!extraOffers.isEmpty()) { LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); } From 9d6a7f6e4e703c4104f72695bf8498c347c08f48 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 20 May 2021 11:45:02 -0700 Subject: [PATCH 11/35] remove this. --- .../scheduler/offers/HttpOfferSetImpl.java | 34 +++++++++---------- .../scheduler/offers/HttpOfferSetModule.java | 21 ++++++------ .../scheduler/offers/StatCalculator.java | 6 ++-- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index a1b0fc010..38ac5aaa1 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -74,8 +74,8 @@ public class HttpOfferSetImpl implements OfferSet { private URL endpoint; private Integer maxRetries; - public HttpOfferSetImpl(Set offers) { - this.offers = offers; + public HttpOfferSetImpl(Set mOffers) { + offers = mOffers; } @VisibleForTesting @@ -95,9 +95,9 @@ public HttpOfferSetImpl(Set offers) { @Inject public HttpOfferSetImpl(Ordering ordering, - @TimeoutMs Integer timeoutMs, + @TimeoutMs Integer mTimeoutMs, @Endpoint String url, - @MaxRetries Integer maxRetries) { + @MaxRetries Integer mMaxRetries) { offers = new ConcurrentSkipListSet<>(ordering); try { endpoint = new URL(url); @@ -108,11 +108,11 @@ public HttpOfferSetImpl(Ordering ordering, HttpOfferSetModule.enable(false); LOG.info("HttpOfferSetModule Disabled."); } - this.timeoutMs = timeoutMs; - this.maxRetries = maxRetries; - LOG.info("HttpOfferSet's endpoint: " + this.endpoint); - LOG.info("HttpOfferSet's timeout: " + this.timeoutMs + " (ms)"); - LOG.info("HttpOfferSet's max retries: " + this.maxRetries); + timeoutMs = mTimeoutMs; + maxRetries = mMaxRetries; + LOG.info("HttpOfferSet's endpoint: " + endpoint); + LOG.info("HttpOfferSet's timeout: " + timeoutMs + " (ms)"); + LOG.info("HttpOfferSet's max retries: " + maxRetries); } @Override @@ -153,9 +153,9 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res try { long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin - ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); + ScheduleRequest scheduleRequest = createRequest(resourceRequest, startTime); LOG.info("Sending request " + scheduleRequest.jobKey); - String responseStr = this.sendRequest(scheduleRequest); + String responseStr = sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); } catch (IOException e) { @@ -381,7 +381,7 @@ public Resource getRequest() { } public void setRequest(Resource mRequest) { - this.request = mRequest; + request = mRequest; } public List getHosts() { @@ -389,7 +389,7 @@ public List getHosts() { } public void setHosts(List mHosts) { - this.hosts = mHosts; + hosts = mHosts; } } @@ -409,16 +409,16 @@ public String getError() { return error; } - public void setError(String error) { - this.error = error; + public void setError(String mError) { + error = mError; } public List getHosts() { return hosts; } - public void setHosts(List hosts) { - this.hosts = hosts; + public void setHosts(List mHosts) { + hosts = mHosts; } } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index e2c33333d..3195c271e 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -97,9 +97,9 @@ public static class Options { CommandLine.registerCustomOptions(new Options()); } - public HttpOfferSetModule(CliOptions options) { - this.cliOptions = options; - this.options = options.getCustom(Options.class); + public HttpOfferSetModule(CliOptions mOptions) { + cliOptions = mOptions; + options = mOptions.getCustom(Options.class); } @Override @@ -152,18 +152,17 @@ static class StatUpdater extends AbstractIdleService { @Inject StatUpdater( - @Executor ScheduledExecutorService executor, - StatCalculator calculator, - @RefreshRateMs Integer refreshRateMs) { - this.executor = requireNonNull(executor); - this.calculator = requireNonNull(calculator); - this.refreshRateMs = refreshRateMs; + @Executor ScheduledExecutorService mExecutor, + StatCalculator mCalculator, + @RefreshRateMs Integer mRefreshRateMs) { + executor = requireNonNull(mExecutor); + calculator = requireNonNull(mCalculator); + refreshRateMs = mRefreshRateMs; } @Override protected void startUp() { - long interval = this.refreshRateMs; - executor.scheduleAtFixedRate(calculator, 0, interval, TimeUnit.MILLISECONDS); + executor.scheduleAtFixedRate(calculator, 0, refreshRateMs, TimeUnit.MILLISECONDS); } @Override diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index ed21d9100..64e2674a1 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -34,8 +34,8 @@ private static class Counter implements Supplier { private final StatsProvider statsProvider; private boolean exported; - Counter(StatsProvider statsProvider) { - this.statsProvider = statsProvider; + Counter(StatsProvider mStatsProvider) { + statsProvider = mStatsProvider; } @Override @@ -55,7 +55,7 @@ private void set(String name, Number newValue) { @Inject StatCalculator(final StatsProvider statsProvider) { requireNonNull(statsProvider); - this.metricCache = CacheBuilder.newBuilder().build( + metricCache = CacheBuilder.newBuilder().build( new CacheLoader() { public StatCalculator.Counter load(String key) { return new StatCalculator.Counter(statsProvider.untracked()); From 7bd29c3ca4ca8086f02d9bbe56ef8d45ddcdc658 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 17 Jun 2021 18:10:50 -0700 Subject: [PATCH 12/35] update aurora version 0.24.0 --- .auroraversion | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.auroraversion b/.auroraversion index 86159ebb2..8769e66c8 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.25.0-SNAPSHOT +0.24.2-SNAPSHOT From bd36a7210078e2c6d0dc7fe587e1cfd176ae3d59 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 17 Jun 2021 18:23:23 -0700 Subject: [PATCH 13/35] fix release-candidate error --- build-support/release/release-candidate | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build-support/release/release-candidate b/build-support/release/release-candidate index c3c69c3f4..32bc06888 100755 --- a/build-support/release/release-candidate +++ b/build-support/release/release-candidate @@ -228,7 +228,8 @@ gh api graphql \ -F name='aurora' \ -f query="$graphql_query" | python3 ./build-support/release/changelog.py $current_version > CHANGELOG.tmp -cat CHANGELOG >> CHANGELOG.tmp && mv CHANGELOG.tmp CHANGELOG +#cat CHANGELOG >> CHANGELOG.tmp && mv CHANGELOG.tmp CHANGELOG +mv CHANGELOG.tmp CHANGELOG git add CHANGELOG git commit -m "Updating CHANGELOG for ${current_version} release." From ff112010909c88910a1f7b7430fb32aee40543d0 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 17 Jun 2021 18:23:39 -0700 Subject: [PATCH 14/35] Updating CHANGELOG for 0.24.2 release. --- CHANGELOG | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 CHANGELOG diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 000000000..d1e75e9ef --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,41 @@ +Aurora Scheduler 0.24.2 +-------------------------------------------------------------------------------- +## Pull Requests +* [#132](https://github.com/aurora-scheduler/scheduler/pull/132) - Bump shiroRev from 1.4.0 to 1.7.0 +* [#135](https://github.com/aurora-scheduler/scheduler/pull/135) - Bump protobuf-java from 3.5.1 to 3.14.0 +* [#136](https://github.com/aurora-scheduler/scheduler/pull/136) - Bump eslint-plugin-react from 7.21.5 to 7.22.0 in /ui +* [#137](https://github.com/aurora-scheduler/scheduler/pull/137) - Update Vagrant box reference +* [#141](https://github.com/aurora-scheduler/scheduler/pull/141) - Bump httpcore from 4.4.13 to 4.4.14 +* [#146](https://github.com/aurora-scheduler/scheduler/pull/146) - Bump objenesis from 2.2 to 3.1 +* [#147](https://github.com/aurora-scheduler/scheduler/pull/147) - Bump jacksonRev from 2.12.0 to 2.12.1 +* [#148](https://github.com/aurora-scheduler/scheduler/pull/148) - Bump shiroRev from 1.7.0 to 1.7.1 +* [#149](https://github.com/aurora-scheduler/scheduler/pull/149) - Pants upgrade to 1.26 +* [#150](https://github.com/aurora-scheduler/scheduler/pull/150) - Bump com.github.node-gradle.node from 2.2.4 to 3.0.1 +* [#151](https://github.com/aurora-scheduler/scheduler/pull/151) - Bump me.champeau.gradle.jmh from 0.5.2 to 0.5.3 +* [#153](https://github.com/aurora-scheduler/scheduler/pull/153) - Bump react from 16.14.0 to 17.0.1 in /ui +* [#156](https://github.com/aurora-scheduler/scheduler/pull/156) - Bump bootstrap from 3.4.1 to 4.6.0 in /ui +* [#158](https://github.com/aurora-scheduler/scheduler/pull/158) - Bump com.github.spotbugs from 4.6.0 to 4.6.2 +* [#159](https://github.com/aurora-scheduler/scheduler/pull/159) - Bump protobuf-java from 3.14.0 to 3.15.2 +* [#160](https://github.com/aurora-scheduler/scheduler/pull/160) - Bump junit from 4.12 to 4.13.2 +* [#164](https://github.com/aurora-scheduler/scheduler/pull/164) - Bump ajv from 6.12.6 to 7.1.1 in /ui +* [#165](https://github.com/aurora-scheduler/scheduler/pull/165) - Bump react-test-renderer from 16.14.0 to 17.0.1 in /ui +* [#167](https://github.com/aurora-scheduler/scheduler/pull/167) - Bump ajv-keywords from 3.5.2 to 4.0.0 in /ui +* [#168](https://github.com/aurora-scheduler/scheduler/pull/168) - Bump react-dom from 16.14.0 to 17.0.1 in /ui +* [#169](https://github.com/aurora-scheduler/scheduler/pull/169) - Bump commons-lang3 from 3.11 to 3.12.0 +* [#171](https://github.com/aurora-scheduler/scheduler/pull/171) - Bump protobuf-java from 3.15.2 to 3.15.3 +* [#177](https://github.com/aurora-scheduler/scheduler/pull/177) - Bump jacksonRev from 2.12.1 to 2.12.2 +* [#181](https://github.com/aurora-scheduler/scheduler/pull/181) - Bump protobuf-java from 3.15.3 to 3.15.6 +* [#187](https://github.com/aurora-scheduler/scheduler/pull/187) - Bump jcommander from 1.78 to 1.81 +* [#189](https://github.com/aurora-scheduler/scheduler/pull/189) - Bump com.github.ben-manes.versions from 0.36.0 to 0.38.0 +* [#191](https://github.com/aurora-scheduler/scheduler/pull/191) - Bump eslint-plugin-react from 7.22.0 to 7.23.1 in /ui +* [#193](https://github.com/aurora-scheduler/scheduler/pull/193) - Bump objenesis from 3.1 to 3.2 +* [#200](https://github.com/aurora-scheduler/scheduler/pull/200) - Bump eslint-plugin-promise from 4.3.1 to 5.1.0 in /ui +* [#211](https://github.com/aurora-scheduler/scheduler/pull/211) - Bump bootstrap from 4.6.0 to 5.0.0 in /ui +* [#213](https://github.com/aurora-scheduler/scheduler/pull/213) - Bump eslint-plugin-chai-friendly from 0.6.0 to 0.7.1 in /ui +* [#214](https://github.com/aurora-scheduler/scheduler/pull/214) - Improve HttpOfferSet performance +* [#216](https://github.com/aurora-scheduler/scheduler/pull/216) - Disable pauses for auto pause enabled updates +* [#221](https://github.com/aurora-scheduler/scheduler/pull/221) - Bump eslint-plugin-react from 7.23.1 to 7.24.0 in /ui +* [#223](https://github.com/aurora-scheduler/scheduler/pull/223) - Bump jest from 26.6.3 to 27.0.4 in /ui +* [#230](https://github.com/aurora-scheduler/scheduler/pull/230) - Bump com.github.node-gradle.node from 3.0.1 to 3.1.0 +* [#231](https://github.com/aurora-scheduler/scheduler/pull/231) - Bump com.github.ben-manes.versions from 0.38.0 to 0.39.0 + From e6008678414f8abc1e67e078f2df06278c12a710 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 17 Jun 2021 18:23:39 -0700 Subject: [PATCH 15/35] Incrementing snapshot version to 0.25.0-SNAPSHOT. --- .auroraversion | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.auroraversion b/.auroraversion index 8769e66c8..86159ebb2 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.24.2-SNAPSHOT +0.25.0-SNAPSHOT From c4a6fc307d1b422e7bd314412a48931e86ce5833 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 18 Jun 2021 06:58:53 -0700 Subject: [PATCH 16/35] fixed release candidate --- build-support/release/release-candidate | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build-support/release/release-candidate b/build-support/release/release-candidate index 32bc06888..90f840ff0 100755 --- a/build-support/release/release-candidate +++ b/build-support/release/release-candidate @@ -228,8 +228,7 @@ gh api graphql \ -F name='aurora' \ -f query="$graphql_query" | python3 ./build-support/release/changelog.py $current_version > CHANGELOG.tmp -#cat CHANGELOG >> CHANGELOG.tmp && mv CHANGELOG.tmp CHANGELOG -mv CHANGELOG.tmp CHANGELOG +cat CHANGELOG.md >> CHANGELOG.tmp && mv CHANGELOG.tmp CHANGELOG.md git add CHANGELOG git commit -m "Updating CHANGELOG for ${current_version} release." From 48e75b3ce077159bb82579409d283f593a5b8dc2 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 18 Jun 2021 08:44:15 -0700 Subject: [PATCH 17/35] fixed release candidate script --- CHANGELOG | 41 ------------------------- build-support/release/release-candidate | 6 ++-- 2 files changed, 3 insertions(+), 44 deletions(-) delete mode 100644 CHANGELOG diff --git a/CHANGELOG b/CHANGELOG deleted file mode 100644 index d1e75e9ef..000000000 --- a/CHANGELOG +++ /dev/null @@ -1,41 +0,0 @@ -Aurora Scheduler 0.24.2 --------------------------------------------------------------------------------- -## Pull Requests -* [#132](https://github.com/aurora-scheduler/scheduler/pull/132) - Bump shiroRev from 1.4.0 to 1.7.0 -* [#135](https://github.com/aurora-scheduler/scheduler/pull/135) - Bump protobuf-java from 3.5.1 to 3.14.0 -* [#136](https://github.com/aurora-scheduler/scheduler/pull/136) - Bump eslint-plugin-react from 7.21.5 to 7.22.0 in /ui -* [#137](https://github.com/aurora-scheduler/scheduler/pull/137) - Update Vagrant box reference -* [#141](https://github.com/aurora-scheduler/scheduler/pull/141) - Bump httpcore from 4.4.13 to 4.4.14 -* [#146](https://github.com/aurora-scheduler/scheduler/pull/146) - Bump objenesis from 2.2 to 3.1 -* [#147](https://github.com/aurora-scheduler/scheduler/pull/147) - Bump jacksonRev from 2.12.0 to 2.12.1 -* [#148](https://github.com/aurora-scheduler/scheduler/pull/148) - Bump shiroRev from 1.7.0 to 1.7.1 -* [#149](https://github.com/aurora-scheduler/scheduler/pull/149) - Pants upgrade to 1.26 -* [#150](https://github.com/aurora-scheduler/scheduler/pull/150) - Bump com.github.node-gradle.node from 2.2.4 to 3.0.1 -* [#151](https://github.com/aurora-scheduler/scheduler/pull/151) - Bump me.champeau.gradle.jmh from 0.5.2 to 0.5.3 -* [#153](https://github.com/aurora-scheduler/scheduler/pull/153) - Bump react from 16.14.0 to 17.0.1 in /ui -* [#156](https://github.com/aurora-scheduler/scheduler/pull/156) - Bump bootstrap from 3.4.1 to 4.6.0 in /ui -* [#158](https://github.com/aurora-scheduler/scheduler/pull/158) - Bump com.github.spotbugs from 4.6.0 to 4.6.2 -* [#159](https://github.com/aurora-scheduler/scheduler/pull/159) - Bump protobuf-java from 3.14.0 to 3.15.2 -* [#160](https://github.com/aurora-scheduler/scheduler/pull/160) - Bump junit from 4.12 to 4.13.2 -* [#164](https://github.com/aurora-scheduler/scheduler/pull/164) - Bump ajv from 6.12.6 to 7.1.1 in /ui -* [#165](https://github.com/aurora-scheduler/scheduler/pull/165) - Bump react-test-renderer from 16.14.0 to 17.0.1 in /ui -* [#167](https://github.com/aurora-scheduler/scheduler/pull/167) - Bump ajv-keywords from 3.5.2 to 4.0.0 in /ui -* [#168](https://github.com/aurora-scheduler/scheduler/pull/168) - Bump react-dom from 16.14.0 to 17.0.1 in /ui -* [#169](https://github.com/aurora-scheduler/scheduler/pull/169) - Bump commons-lang3 from 3.11 to 3.12.0 -* [#171](https://github.com/aurora-scheduler/scheduler/pull/171) - Bump protobuf-java from 3.15.2 to 3.15.3 -* [#177](https://github.com/aurora-scheduler/scheduler/pull/177) - Bump jacksonRev from 2.12.1 to 2.12.2 -* [#181](https://github.com/aurora-scheduler/scheduler/pull/181) - Bump protobuf-java from 3.15.3 to 3.15.6 -* [#187](https://github.com/aurora-scheduler/scheduler/pull/187) - Bump jcommander from 1.78 to 1.81 -* [#189](https://github.com/aurora-scheduler/scheduler/pull/189) - Bump com.github.ben-manes.versions from 0.36.0 to 0.38.0 -* [#191](https://github.com/aurora-scheduler/scheduler/pull/191) - Bump eslint-plugin-react from 7.22.0 to 7.23.1 in /ui -* [#193](https://github.com/aurora-scheduler/scheduler/pull/193) - Bump objenesis from 3.1 to 3.2 -* [#200](https://github.com/aurora-scheduler/scheduler/pull/200) - Bump eslint-plugin-promise from 4.3.1 to 5.1.0 in /ui -* [#211](https://github.com/aurora-scheduler/scheduler/pull/211) - Bump bootstrap from 4.6.0 to 5.0.0 in /ui -* [#213](https://github.com/aurora-scheduler/scheduler/pull/213) - Bump eslint-plugin-chai-friendly from 0.6.0 to 0.7.1 in /ui -* [#214](https://github.com/aurora-scheduler/scheduler/pull/214) - Improve HttpOfferSet performance -* [#216](https://github.com/aurora-scheduler/scheduler/pull/216) - Disable pauses for auto pause enabled updates -* [#221](https://github.com/aurora-scheduler/scheduler/pull/221) - Bump eslint-plugin-react from 7.23.1 to 7.24.0 in /ui -* [#223](https://github.com/aurora-scheduler/scheduler/pull/223) - Bump jest from 26.6.3 to 27.0.4 in /ui -* [#230](https://github.com/aurora-scheduler/scheduler/pull/230) - Bump com.github.node-gradle.node from 3.0.1 to 3.1.0 -* [#231](https://github.com/aurora-scheduler/scheduler/pull/231) - Bump com.github.ben-manes.versions from 0.38.0 to 0.39.0 - diff --git a/build-support/release/release-candidate b/build-support/release/release-candidate index 90f840ff0..2dfbc75df 100755 --- a/build-support/release/release-candidate +++ b/build-support/release/release-candidate @@ -230,8 +230,8 @@ gh api graphql \ cat CHANGELOG.md >> CHANGELOG.tmp && mv CHANGELOG.tmp CHANGELOG.md -git add CHANGELOG -git commit -m "Updating CHANGELOG for ${current_version} release." +git add CHANGELOG.md +git commit -m "Updating CHANGELOG.md for ${current_version} release." echo "Committing updated .auroraversion on master" echo $new_snapshot_version > .auroraversion @@ -313,7 +313,7 @@ The RELEASE NOTES for the release are available at: ${aurora_git_web_url}/blob/${rc_version_tag}/RELEASE-NOTES.md The CHANGELOG for the release is available at: -${aurora_git_web_url}/blob/${rc_version_tag}/CHANGELOG +${aurora_git_web_url}/blob/${rc_version_tag}/CHANGELOG.md The tag used to create the release candidate is: ${aurora_git_web_url}/tree/${rc_version_tag} From 22bc79ecad5fac58b9439a03f18ee4b3758b0968 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 18 Jun 2021 08:44:53 -0700 Subject: [PATCH 18/35] aurora version 0.24.2 --- .auroraversion | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.auroraversion b/.auroraversion index 86159ebb2..8769e66c8 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.25.0-SNAPSHOT +0.24.2-SNAPSHOT From 6aff891884f5a33c920d1029b807f2ddd09ec3ec Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 18 Jun 2021 08:45:54 -0700 Subject: [PATCH 19/35] Updating CHANGELOG.md for 0.24.2 release. --- CHANGELOG.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e052beaad..38681984e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,44 @@ +Aurora Scheduler 0.24.2 +-------------------------------------------------------------------------------- +## Pull Requests +* [#132](https://github.com/aurora-scheduler/scheduler/pull/132) - Bump shiroRev from 1.4.0 to 1.7.0 +* [#135](https://github.com/aurora-scheduler/scheduler/pull/135) - Bump protobuf-java from 3.5.1 to 3.14.0 +* [#136](https://github.com/aurora-scheduler/scheduler/pull/136) - Bump eslint-plugin-react from 7.21.5 to 7.22.0 in /ui +* [#137](https://github.com/aurora-scheduler/scheduler/pull/137) - Update Vagrant box reference +* [#141](https://github.com/aurora-scheduler/scheduler/pull/141) - Bump httpcore from 4.4.13 to 4.4.14 +* [#146](https://github.com/aurora-scheduler/scheduler/pull/146) - Bump objenesis from 2.2 to 3.1 +* [#147](https://github.com/aurora-scheduler/scheduler/pull/147) - Bump jacksonRev from 2.12.0 to 2.12.1 +* [#148](https://github.com/aurora-scheduler/scheduler/pull/148) - Bump shiroRev from 1.7.0 to 1.7.1 +* [#149](https://github.com/aurora-scheduler/scheduler/pull/149) - Pants upgrade to 1.26 +* [#150](https://github.com/aurora-scheduler/scheduler/pull/150) - Bump com.github.node-gradle.node from 2.2.4 to 3.0.1 +* [#151](https://github.com/aurora-scheduler/scheduler/pull/151) - Bump me.champeau.gradle.jmh from 0.5.2 to 0.5.3 +* [#153](https://github.com/aurora-scheduler/scheduler/pull/153) - Bump react from 16.14.0 to 17.0.1 in /ui +* [#156](https://github.com/aurora-scheduler/scheduler/pull/156) - Bump bootstrap from 3.4.1 to 4.6.0 in /ui +* [#158](https://github.com/aurora-scheduler/scheduler/pull/158) - Bump com.github.spotbugs from 4.6.0 to 4.6.2 +* [#159](https://github.com/aurora-scheduler/scheduler/pull/159) - Bump protobuf-java from 3.14.0 to 3.15.2 +* [#160](https://github.com/aurora-scheduler/scheduler/pull/160) - Bump junit from 4.12 to 4.13.2 +* [#164](https://github.com/aurora-scheduler/scheduler/pull/164) - Bump ajv from 6.12.6 to 7.1.1 in /ui +* [#165](https://github.com/aurora-scheduler/scheduler/pull/165) - Bump react-test-renderer from 16.14.0 to 17.0.1 in /ui +* [#167](https://github.com/aurora-scheduler/scheduler/pull/167) - Bump ajv-keywords from 3.5.2 to 4.0.0 in /ui +* [#168](https://github.com/aurora-scheduler/scheduler/pull/168) - Bump react-dom from 16.14.0 to 17.0.1 in /ui +* [#169](https://github.com/aurora-scheduler/scheduler/pull/169) - Bump commons-lang3 from 3.11 to 3.12.0 +* [#171](https://github.com/aurora-scheduler/scheduler/pull/171) - Bump protobuf-java from 3.15.2 to 3.15.3 +* [#177](https://github.com/aurora-scheduler/scheduler/pull/177) - Bump jacksonRev from 2.12.1 to 2.12.2 +* [#181](https://github.com/aurora-scheduler/scheduler/pull/181) - Bump protobuf-java from 3.15.3 to 3.15.6 +* [#187](https://github.com/aurora-scheduler/scheduler/pull/187) - Bump jcommander from 1.78 to 1.81 +* [#189](https://github.com/aurora-scheduler/scheduler/pull/189) - Bump com.github.ben-manes.versions from 0.36.0 to 0.38.0 +* [#191](https://github.com/aurora-scheduler/scheduler/pull/191) - Bump eslint-plugin-react from 7.22.0 to 7.23.1 in /ui +* [#193](https://github.com/aurora-scheduler/scheduler/pull/193) - Bump objenesis from 3.1 to 3.2 +* [#200](https://github.com/aurora-scheduler/scheduler/pull/200) - Bump eslint-plugin-promise from 4.3.1 to 5.1.0 in /ui +* [#211](https://github.com/aurora-scheduler/scheduler/pull/211) - Bump bootstrap from 4.6.0 to 5.0.0 in /ui +* [#213](https://github.com/aurora-scheduler/scheduler/pull/213) - Bump eslint-plugin-chai-friendly from 0.6.0 to 0.7.1 in /ui +* [#214](https://github.com/aurora-scheduler/scheduler/pull/214) - Improve HttpOfferSet performance +* [#216](https://github.com/aurora-scheduler/scheduler/pull/216) - Disable pauses for auto pause enabled updates +* [#221](https://github.com/aurora-scheduler/scheduler/pull/221) - Bump eslint-plugin-react from 7.23.1 to 7.24.0 in /ui +* [#223](https://github.com/aurora-scheduler/scheduler/pull/223) - Bump jest from 26.6.3 to 27.0.4 in /ui +* [#230](https://github.com/aurora-scheduler/scheduler/pull/230) - Bump com.github.node-gradle.node from 3.0.1 to 3.1.0 +* [#231](https://github.com/aurora-scheduler/scheduler/pull/231) - Bump com.github.ben-manes.versions from 0.38.0 to 0.39.0 + Aurora Scheduler 0.24.0 -------------------------------------------------------------------------------- ## Pull Requests From daffeee3f5aa043075893601f3b4a7c2ef598e41 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 18 Jun 2021 08:45:54 -0700 Subject: [PATCH 20/35] Incrementing snapshot version to 0.25.0-SNAPSHOT. --- .auroraversion | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.auroraversion b/.auroraversion index 8769e66c8..86159ebb2 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.24.2-SNAPSHOT +0.25.0-SNAPSHOT From 8fae700eaef63b590f232b5ee3cf16e5e993bf67 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 15 Oct 2021 12:02:09 -0700 Subject: [PATCH 21/35] put the bad offers with too many starting tasks to the bottom --- .../scheduler/offers/HttpOfferSetImpl.java | 138 +++++++++++++----- .../scheduler/offers/HttpOfferSetModule.java | 9 ++ .../offers/HttpOfferSetImplTest.java | 74 +++++++++- 3 files changed, 187 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 38ac5aaa1..da787aaef 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -18,12 +18,16 @@ import java.lang.annotation.Target; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.inject.Qualifier; @@ -33,13 +37,17 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferSet; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; @@ -67,15 +75,27 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); private final Set offers; + private final Storage storage; private final ObjectMapper jsonMapper = new ObjectMapper(); private final CloseableHttpClient httpClient = HttpClients.createDefault(); + private final int timeoutMs; + private final int maxRetries; + private final int maxStartingTasksPerSlave; - private Integer timeoutMs; private URL endpoint; - private Integer maxRetries; - public HttpOfferSetImpl(Set mOffers) { + public HttpOfferSetImpl(Set mOffers, + Storage mStorage, + int mTimeoutMs, + URL mEndpoint, + int mMaxRetries, + int mMaxStartingTasksPerSlave) { offers = mOffers; + storage = mStorage; + timeoutMs = mTimeoutMs; + endpoint = mEndpoint; + maxRetries = mMaxRetries; + maxStartingTasksPerSlave = mMaxStartingTasksPerSlave; } @VisibleForTesting @@ -93,14 +113,22 @@ public HttpOfferSetImpl(Set mOffers) { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface TimeoutMs { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxStartingTaskPerSlave { } + @Inject public HttpOfferSetImpl(Ordering ordering, + Storage mStorage, @TimeoutMs Integer mTimeoutMs, @Endpoint String url, - @MaxRetries Integer mMaxRetries) { + @MaxRetries Integer mMaxRetries, + @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) { offers = new ConcurrentSkipListSet<>(ordering); + storage = mStorage; try { - endpoint = new URL(url); + endpoint = new URL(Objects.requireNonNull(url)); HttpOfferSetModule.enable(true); LOG.info("HttpOfferSetModule Enabled."); } catch (MalformedURLException e) { @@ -108,11 +136,13 @@ public HttpOfferSetImpl(Ordering ordering, HttpOfferSetModule.enable(false); LOG.info("HttpOfferSetModule Disabled."); } - timeoutMs = mTimeoutMs; - maxRetries = mMaxRetries; - LOG.info("HttpOfferSet's endpoint: " + endpoint); - LOG.info("HttpOfferSet's timeout: " + timeoutMs + " (ms)"); - LOG.info("HttpOfferSet's max retries: " + maxRetries); + timeoutMs = Objects.requireNonNull(mTimeoutMs); + maxRetries = Objects.requireNonNull(mMaxRetries); + maxStartingTasksPerSlave = Objects.requireNonNull(mMaxStartingTasksPerSlave); + LOG.info("HttpOfferSet's endpoint: {}", endpoint); + LOG.info("HttpOfferSet's timeout: {} (ms)", timeoutMs); + LOG.info("HttpOfferSet's max retries: {}", maxRetries); + LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave); } @Override @@ -144,48 +174,90 @@ public Iterable values() { @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + long startTime = System.nanoTime(); // if there are no available offers, do nothing. - if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) { + if (offers.isEmpty()) { + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); return offers; } + // count the number of starting tasks per slave + Map hostTaskCountMap = new HashMap<>(); + Iterable startingTasks = + Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(ScheduleStatus.STARTING)); + for (IScheduledTask task : startingTasks) { + String slaveId = task.getAssignedTask().getSlaveId(); + hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1); + } + + // find the bad offers and put them at the bottom of the list + List badOffers = new ArrayList<>(); + List goodOffers = new ArrayList<>(); + if (maxStartingTasksPerSlave > 0) { + badOffers = offers.stream() + .filter(offer -> hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + >= maxStartingTasksPerSlave) + .collect(Collectors.toList()); + goodOffers = offers.stream() + .filter(offer -> hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + < maxStartingTasksPerSlave) + .collect(Collectors.toList()); + + if (!badOffers.isEmpty()) { + LOG.info("the number of bad offers: {}", badOffers.size()); + } + + if (goodOffers.isEmpty()) { + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + return badOffers; + } + } + + // if the external http endpoint was not reachable + if (!HttpOfferSetModule.isEnabled()) { + goodOffers.addAll(badOffers); + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + return goodOffers; + } + List orderedOffers = null; try { - long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin - ScheduleRequest scheduleRequest = createRequest(resourceRequest, startTime); - LOG.info("Sending request " + scheduleRequest.jobKey); + ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime); + LOG.info("Sending request {}", scheduleRequest.jobKey); String responseStr = sendRequest(scheduleRequest); orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); } catch (IOException e) { - LOG.error("Failed to schedule the task of " - + resourceRequest.getTask().getJob().toString() - + " using HttpOfferSet. ", e); + LOG.error("Failed to schedule the task of {} using {} ", + resourceRequest.getTask().getJob().toString(), endpoint, e); HttpOfferSetModule.incFailureCount(); } finally { // shutdown HttpOfferSet if failure is consistent. if (HttpOfferSetModule.getFailureCount() >= maxRetries) { - LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled."); + LOG.error("Reaches {} retries. {} is disabled", maxRetries, endpoint); HttpOfferSetModule.enable(false); } } if (orderedOffers != null) { + orderedOffers.addAll(badOffers); + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); return orderedOffers; } - // fall back to default scheduler. - LOG.warn("Falling back on default ordering."); - return offers; + goodOffers.addAll(badOffers); + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + return goodOffers; } //createScheduleRequest creates the ScheduleRequest to be sent out to the plugin. - private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) { + private ScheduleRequest createRequest(List mOffers, + ResourceRequest resourceRequest, + long startTime) { Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); List hosts = - offers.stream() + mOffers.stream() .map(offer -> new Host(offer.getAttributes().getHost(), new Resource(offer))) .collect(Collectors.toList()); IJobKey jobKey = resourceRequest.getTask().getJob(); @@ -194,9 +266,9 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star return new ScheduleRequest(req, hosts, jobKeyStr); } - // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. + // sendRequest sends ScheduleRequest to the external endpoint and gets ordered offers in json private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { - LOG.debug("Sending request for " + scheduleRequest.toString()); + LOG.debug("Sending request for {}", scheduleRequest); HttpPost request = new HttpPost(endpoint.toString()); RequestConfig requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(timeoutMs) @@ -222,12 +294,12 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { List processResponse(String responseStr, List offerSetDiffList) throws IOException { ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); - LOG.info("Received " + response.hosts.size() + " offers"); + LOG.info("Received {} offers", response.hosts.size()); Map offerMap = offers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); if (!response.error.trim().isEmpty()) { - LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error); + LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error); throw new IOException(response.error); } @@ -244,20 +316,20 @@ List processResponse(String responseStr, List offerSetDiffList) + extraOffers.size(); offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { - LOG.warn("The number of different offers between the original and received offer sets is " - + offSetDiff); + LOG.warn("The number of different offers between the original and received offer sets is {}", + offSetDiff); if (LOG.isDebugEnabled()) { List missedOffers = offers.stream() .map(offer -> offer.getAttributes().getHost()) .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); - LOG.debug("missed offers: " + missedOffers); - LOG.debug("extra offers: " + extraOffers); + LOG.debug("missed offers: {}", missedOffers); + LOG.debug("extra offers: {}", extraOffers); } } if (!extraOffers.isEmpty()) { - LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); + LOG.error("Cannot find offers {} in the original offer set", extraOffers); } return orderedOffers; diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 3195c271e..801e5e5bc 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -90,6 +90,12 @@ public static class Options { @Parameter(names = "-http_offer_set_max_retries") int httpOfferSetMaxRetries = 10; + + // the slaves have more than or equal to the httpOfferSetMaxStartingTasksPerSlave + // are put in the bottom of the offerset. If you want to enable this feature, set + // httpOfferSetMaxStartingTasksPerSlave less than or equal to zero + @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave") + int httpOfferSetMaxStartingTasksPerSlave = 0; } static { @@ -119,6 +125,9 @@ protected void configure() { bind(Integer.class) .annotatedWith(HttpOfferSetImpl.MaxRetries.class) .toInstance(options.httpOfferSetMaxRetries); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.MaxStartingTaskPerSlave.class) + .toInstance(options.httpOfferSetMaxStartingTasksPerSlave); bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index f390355f5..9ad7f5e4b 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -14,19 +14,35 @@ package io.github.aurora.scheduler.offers; import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -54,7 +70,42 @@ public class HttpOfferSetImplTest extends EasyMockTest { @Before public void setUp() throws IOException { - httpOfferSet = new HttpOfferSetImpl(new HashSet<>()); + Storage storage = MemStorageModule.newEmptyStorage(); + storage.write((Storage.MutateWork.NoResult.Quiet) sp -> { + ScheduledTask t0 = makeTask("t0", JOB) + .newBuilder() + .setStatus(ScheduleStatus.PENDING); + + ScheduledTask t1 = makeTask("t1", JOB) + .newBuilder() + .setStatus(ScheduleStatus.STARTING); + t1.setAssignedTask(new AssignedTask("t1", + OFFER_B.getOffer().getAgentId().getValue(), + OFFER_B.getOffer().getHostname(), + t1.getAssignedTask().getTask(), + new HashMap<>(), + 0)); + + ScheduledTask t2 = makeTask("t2", JOB) + .newBuilder() + .setStatus(ScheduleStatus.RUNNING); + t2.setAssignedTask(new AssignedTask("t2", + OFFER_C.getOffer().getAgentId().getValue(), + OFFER_C.getOffer().getHostname(), + t1.getAssignedTask().getTask(), + new HashMap<>(), + 0)); + + sp.getUnsafeTaskStore().saveTasks( + IScheduledTask.setFromBuilders(ImmutableList.of(t0, t1, t2))); + }); + + httpOfferSet = new HttpOfferSetImpl(new HashSet<>(), + storage, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 1); httpOfferSet.add(OFFER_A); httpOfferSet.add(OFFER_B); httpOfferSet.add(OFFER_C); @@ -140,4 +191,25 @@ public void testProcessResponse() throws IOException { } assertTrue(isException); } + + @Test + public void testGetOrdered() { + control.replay(); + + // OFFER_B is put in the bottom of list as it has 1 starting task. + IScheduledTask task = makeTask("id", JOB); + Iterable sortedOffers = httpOfferSet.getOrdered( + TaskGroupKey.from(task.getAssignedTask().getTask()), + TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask())); + + List expectedOffers = new ArrayList<>(); + expectedOffers.add(OFFER_A); + expectedOffers.add(OFFER_C); + expectedOffers.add(OFFER_B); + assertEquals(expectedOffers.size(), Iterables.size(sortedOffers)); + int i = 0; + for (HostOffer o: sortedOffers) { + assertEquals(expectedOffers.get(i++), o); + } + } } From e4fccf2fa79a65115dea74632e3be726f6881d82 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 15 Oct 2021 12:07:57 -0700 Subject: [PATCH 22/35] put the bad offers with too many starting tasks to the bottom --- .../aurora/scheduler/offers/HttpOfferSetImpl.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index da787aaef..d9c7bfe53 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.inject.Qualifier; @@ -195,12 +194,14 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res List goodOffers = new ArrayList<>(); if (maxStartingTasksPerSlave > 0) { badOffers = offers.stream() - .filter(offer -> hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) - >= maxStartingTasksPerSlave) + .filter(offer -> + hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + >= maxStartingTasksPerSlave) .collect(Collectors.toList()); goodOffers = offers.stream() - .filter(offer -> hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) - < maxStartingTasksPerSlave) + .filter(offer -> + hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + < maxStartingTasksPerSlave) .collect(Collectors.toList()); if (!badOffers.isEmpty()) { From e04afb90eb862935676c4128bc50801b0f5b754e Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 15 Oct 2021 14:47:07 -0700 Subject: [PATCH 23/35] fix unit test --- .../aurora/scheduler/offers/HttpOfferSetImplTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 9ad7f5e4b..df087dc0a 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -202,14 +202,11 @@ public void testGetOrdered() { TaskGroupKey.from(task.getAssignedTask().getTask()), TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask())); - List expectedOffers = new ArrayList<>(); - expectedOffers.add(OFFER_A); - expectedOffers.add(OFFER_C); - expectedOffers.add(OFFER_B); - assertEquals(expectedOffers.size(), Iterables.size(sortedOffers)); - int i = 0; + assertEquals(3, Iterables.size(sortedOffers)); + HostOffer lastOffer = null; for (HostOffer o: sortedOffers) { - assertEquals(expectedOffers.get(i++), o); + lastOffer = o; } + assertEquals(OFFER_B, lastOffer); } } From 93d85f9eb2e7876fce4603db4a0b50165a315b4f Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 15 Oct 2021 14:52:19 -0700 Subject: [PATCH 24/35] fix unit test --- .../io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index df087dc0a..dfe5ec8fa 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -15,7 +15,6 @@ import java.io.IOException; import java.net.URL; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; From a1f8c8407247e3eb8c920d4c3cd2c0ae77a3d582 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Fri, 15 Oct 2021 15:22:00 -0700 Subject: [PATCH 25/35] fix unit test --- .../aurora/scheduler/offers/HttpOfferSetImplTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index dfe5ec8fa..5e2b0f74b 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -30,6 +30,7 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.Offers; import org.apache.aurora.scheduler.storage.Storage; @@ -197,9 +198,10 @@ public void testGetOrdered() { // OFFER_B is put in the bottom of list as it has 1 starting task. IScheduledTask task = makeTask("id", JOB); - Iterable sortedOffers = httpOfferSet.getOrdered( - TaskGroupKey.from(task.getAssignedTask().getTask()), - TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask())); + TaskGroupKey groupKey = TaskGroupKey.from(task.getAssignedTask().getTask()); + SchedulingFilter.ResourceRequest resourceRequest = + TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask()); + Iterable sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); assertEquals(3, Iterables.size(sortedOffers)); HostOffer lastOffer = null; From 4c15ab05c68592c9c69b88a6caa11f806ddd2036 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Mon, 18 Oct 2021 17:04:03 -0700 Subject: [PATCH 26/35] update on unit tests --- .../scheduler/offers/HttpOfferSetImpl.java | 78 +++++++++++------- .../scheduler/offers/HttpOfferSetModule.java | 27 ------- .../scheduler/offers/StatCalculator.java | 16 ++-- .../offers/HttpOfferSetImplTest.java | 79 +++++++++++++------ 4 files changed, 112 insertions(+), 88 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index d9c7bfe53..7d568850d 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -18,7 +18,7 @@ import java.lang.annotation.Target; import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -73,6 +73,11 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); + static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); + static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); + private static long failureCount = 0; + private static boolean useEndpoint = false; + private final Set offers; private final Storage storage; private final ObjectMapper jsonMapper = new ObjectMapper(); @@ -128,12 +133,12 @@ public HttpOfferSetImpl(Ordering ordering, storage = mStorage; try { endpoint = new URL(Objects.requireNonNull(url)); - HttpOfferSetModule.enable(true); - LOG.info("HttpOfferSetModule Enabled."); + HttpOfferSetImpl.setUseEndpoint(true); + LOG.info("HttpOfferSetImpl Enabled."); } catch (MalformedURLException e) { LOG.error("http_offer_set_endpoint is malformed. ", e); - HttpOfferSetModule.enable(false); - LOG.info("HttpOfferSetModule Disabled."); + HttpOfferSetImpl.setUseEndpoint(false); + LOG.info("HttpOfferSetImpl Disabled."); } timeoutMs = Objects.requireNonNull(mTimeoutMs); maxRetries = Objects.requireNonNull(mMaxRetries); @@ -144,6 +149,26 @@ public HttpOfferSetImpl(Ordering ordering, LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave); } + public static synchronized void incFailureCount() { + HttpOfferSetImpl.failureCount++; + } + + public static synchronized long getFailureCount() { + return HttpOfferSetImpl.failureCount; + } + + public static synchronized void resetFailureCount() { + HttpOfferSetImpl.failureCount = 0; + } + + public static synchronized void setUseEndpoint(boolean mEnabled) { + HttpOfferSetImpl.useEndpoint = mEnabled; + } + + public static synchronized boolean isUseEndpoint() { + return HttpOfferSetImpl.useEndpoint; + } + @Override public void add(HostOffer offer) { offers.add(offer); @@ -176,7 +201,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res long startTime = System.nanoTime(); // if there are no available offers, do nothing. if (offers.isEmpty()) { - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); return offers; } @@ -190,8 +215,8 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res } // find the bad offers and put them at the bottom of the list - List badOffers = new ArrayList<>(); - List goodOffers = new ArrayList<>(); + List badOffers = new LinkedList<>(); + List goodOffers = new LinkedList<>(); if (maxStartingTasksPerSlave > 0) { badOffers = offers.stream() .filter(offer -> @@ -207,17 +232,14 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res if (!badOffers.isEmpty()) { LOG.info("the number of bad offers: {}", badOffers.size()); } - - if (goodOffers.isEmpty()) { - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); - return badOffers; - } + } else { + goodOffers = offers.stream().collect(Collectors.toList()); } - // if the external http endpoint was not reachable - if (!HttpOfferSetModule.isEnabled()) { + // if the external http endpoint was not reachable or we have nothing to send out + if (!HttpOfferSetImpl.isUseEndpoint() || goodOffers.isEmpty()) { goodOffers.addAll(badOffers); - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); return goodOffers; } @@ -227,26 +249,24 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime); LOG.info("Sending request {}", scheduleRequest.jobKey); String responseStr = sendRequest(scheduleRequest); - orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); + orderedOffers = processResponse(goodOffers, responseStr); } catch (IOException e) { LOG.error("Failed to schedule the task of {} using {} ", resourceRequest.getTask().getJob().toString(), endpoint, e); - HttpOfferSetModule.incFailureCount(); + HttpOfferSetImpl.incFailureCount(); } finally { - // shutdown HttpOfferSet if failure is consistent. - if (HttpOfferSetModule.getFailureCount() >= maxRetries) { + // stop reaching the endpoint if failure is consistent. + if (HttpOfferSetImpl.getFailureCount() >= maxRetries) { LOG.error("Reaches {} retries. {} is disabled", maxRetries, endpoint); - HttpOfferSetModule.enable(false); + HttpOfferSetImpl.setUseEndpoint(false); } } if (orderedOffers != null) { - orderedOffers.addAll(badOffers); - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); - return orderedOffers; + goodOffers = orderedOffers; } goodOffers.addAll(badOffers); - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); return goodOffers; } @@ -292,12 +312,12 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { } } - List processResponse(String responseStr, List offerSetDiffList) + List processResponse(List mOffers, String responseStr) throws IOException { ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); LOG.info("Received {} offers", response.hosts.size()); - Map offerMap = offers.stream() + Map offerMap = mOffers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); if (!response.error.trim().isEmpty()) { LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error); @@ -313,14 +333,14 @@ List processResponse(String responseStr, List offerSetDiffList) .collect(Collectors.toList()); //offSetDiff is the total number of missing offers and the extra offers - long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size()) + extraOffers.size(); offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is {}", offSetDiff); if (LOG.isDebugEnabled()) { - List missedOffers = offers.stream() + List missedOffers = mOffers.stream() .map(offer -> offer.getAttributes().getHost()) .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 801e5e5bc..eeba01a28 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -15,9 +15,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -55,30 +52,6 @@ public class HttpOfferSetModule extends AbstractModule { private final CliOptions cliOptions; private final Options options; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); - static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); - static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); - private static long failureCount = 0; - private static boolean enabled = false; - - public static synchronized void incFailureCount() { - HttpOfferSetModule.failureCount++; - } - - public static synchronized long getFailureCount() { - return HttpOfferSetModule.failureCount; - } - - public static synchronized void resetFailureCount() { - HttpOfferSetModule.failureCount = 0; - } - - public static synchronized void enable(boolean mEnabled) { - HttpOfferSetModule.enabled = mEnabled; - } - - public static synchronized boolean isEnabled() { - return HttpOfferSetModule.enabled; - } @Parameters(separators = "=") public static class Options { diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index 64e2674a1..229b4675f 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -66,12 +66,12 @@ public StatCalculator.Counter load(String key) { @Override public void run() { float medianLatency = - Util.percentile(HttpOfferSetModule.latencyMsList, 50.0) + Util.percentile(HttpOfferSetImpl.latencyMsList, 50.0) .floatValue() / 1000000; float avgLatency = - (float) Util.avg(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.avg(HttpOfferSetImpl.latencyMsList) / 1000000; float worstLatency = - (float) Util.max(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.max(HttpOfferSetImpl.latencyMsList) / 1000000; String medianLatencyName = "http_offer_set_median_latency_ms"; metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); @@ -81,16 +81,16 @@ public void run() { metricCache.getUnchecked(avgLatencyName).set(avgLatencyName, avgLatency); String failureCountName = "http_offer_set_failure_count"; metricCache.getUnchecked(failureCountName).set(failureCountName, - HttpOfferSetModule.getFailureCount()); + HttpOfferSetImpl.getFailureCount()); - long maxOfferSetDiff = Util.max(HttpOfferSetModule.offerSetDiffList); + long maxOfferSetDiff = Util.max(HttpOfferSetImpl.offerSetDiffList); String maxOffSetDiffName = "http_offer_set_max_diff"; metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, maxOfferSetDiff); // reset the stats. - HttpOfferSetModule.latencyMsList.clear(); - HttpOfferSetModule.resetFailureCount(); - HttpOfferSetModule.offerSetDiffList.clear(); + HttpOfferSetImpl.latencyMsList.clear(); + HttpOfferSetImpl.resetFailureCount(); + HttpOfferSetImpl.offerSetDiffList.clear(); } } diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 5e2b0f74b..da9893710 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -17,8 +17,8 @@ import java.net.URL; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -66,16 +66,17 @@ public class HttpOfferSetImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); private static final String HOST_D = "HOST_D"; + private final Storage storage = MemStorageModule.newEmptyStorage(); + private HttpOfferSetImpl httpOfferSet; + private Set offers; @Before public void setUp() throws IOException { - Storage storage = MemStorageModule.newEmptyStorage(); storage.write((Storage.MutateWork.NoResult.Quiet) sp -> { ScheduledTask t0 = makeTask("t0", JOB) .newBuilder() .setStatus(ScheduleStatus.PENDING); - ScheduledTask t1 = makeTask("t1", JOB) .newBuilder() .setStatus(ScheduleStatus.STARTING); @@ -100,15 +101,17 @@ public void setUp() throws IOException { IScheduledTask.setFromBuilders(ImmutableList.of(t0, t1, t2))); }); - httpOfferSet = new HttpOfferSetImpl(new HashSet<>(), + offers = new HashSet<>(); + offers.add(OFFER_A); + offers.add(OFFER_B); + offers.add(OFFER_C); + + httpOfferSet = new HttpOfferSetImpl(offers, storage, 0, new URL("http://localhost:9090/v1/offerset"), 0, - 1); - httpOfferSet.add(OFFER_A); - httpOfferSet.add(OFFER_B); - httpOfferSet.add(OFFER_C); + 0); } @Test @@ -118,24 +121,25 @@ public void testProcessResponse() throws IOException { + HOST_A + "\",\"" + HOST_B + "\",\"" + HOST_C + "\"]}"; - List offerSetDiffList = new LinkedList<>(); - List sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + List mOffers = ImmutableList.copyOf(httpOfferSet.values()); + + List sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(0), 0); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(0), 0); // plugin returns less offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(1), 1); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(1), 1); // plugin returns more offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" @@ -143,23 +147,23 @@ public void testProcessResponse() throws IOException { + HOST_B + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(2), 1); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(2), 1); // plugin omits 1 offer & returns 1 extra offer responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(3), 2); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(3), 2); responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + HOST_A + "\",\"" @@ -167,7 +171,7 @@ public void testProcessResponse() throws IOException { + HOST_C + "\"]}"; boolean isException = false; try { - httpOfferSet.processResponse(responseStr, offerSetDiffList); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } @@ -176,7 +180,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"error\": \"error\"}"; isException = false; try { - httpOfferSet.processResponse(responseStr, new LinkedList<>()); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } @@ -185,7 +189,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"weird\": \"cannot decode this json string\"}"; isException = false; try { - httpOfferSet.processResponse(responseStr, new LinkedList<>()); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } @@ -193,17 +197,44 @@ public void testProcessResponse() throws IOException { } @Test - public void testGetOrdered() { + public void testGetOrdered() throws IOException { control.replay(); - - // OFFER_B is put in the bottom of list as it has 1 starting task. IScheduledTask task = makeTask("id", JOB); TaskGroupKey groupKey = TaskGroupKey.from(task.getAssignedTask().getTask()); SchedulingFilter.ResourceRequest resourceRequest = TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask()); + + // return the same set of offers Iterable sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + + httpOfferSet = new HttpOfferSetImpl(offers, + storage, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + -1); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + + httpOfferSet = new HttpOfferSetImpl(offers, + storage, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 2); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); - assertEquals(3, Iterables.size(sortedOffers)); + // OFFER_B is put in the bottom of list as it has 1 starting task. + httpOfferSet = new HttpOfferSetImpl(offers, + storage, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 1); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); HostOffer lastOffer = null; for (HostOffer o: sortedOffers) { lastOffer = o; From 5d489115d93661c9fbb5d484e305c4217cf0592c Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 19 Oct 2021 08:15:41 -0700 Subject: [PATCH 27/35] when no offers, skip calculating stats --- .../java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 7d568850d..4ba042c80 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -201,7 +201,6 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res long startTime = System.nanoTime(); // if there are no available offers, do nothing. if (offers.isEmpty()) { - HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); return offers; } From 7b30d1b6714d091a4f2236133cf6f8bf5cc1ac82 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 19 Oct 2021 10:40:04 -0700 Subject: [PATCH 28/35] fix java.util.ConcurrentModificationException --- .../scheduler/offers/StatCalculator.java | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index 229b4675f..311f4bf34 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -13,7 +13,9 @@ */ package io.github.aurora.scheduler.offers; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -65,13 +67,29 @@ public StatCalculator.Counter load(String key) { @Override public void run() { + // make copies of stat data as synchronizedList is not thread-safe iterating + List latencyList; + synchronized (HttpOfferSetImpl.latencyMsList) { + latencyList = HttpOfferSetImpl.latencyMsList.stream().collect(Collectors.toList()); + } + List offerSetDiffList; + synchronized (HttpOfferSetImpl.offerSetDiffList) { + offerSetDiffList = HttpOfferSetImpl.offerSetDiffList.stream().collect(Collectors.toList()); + } + long failureCount = HttpOfferSetImpl.getFailureCount(); + + // reset the stats. + HttpOfferSetImpl.latencyMsList.clear(); + HttpOfferSetImpl.resetFailureCount(); + HttpOfferSetImpl.offerSetDiffList.clear(); + float medianLatency = - Util.percentile(HttpOfferSetImpl.latencyMsList, 50.0) + Util.percentile(latencyList, 50.0) .floatValue() / 1000000; float avgLatency = - (float) Util.avg(HttpOfferSetImpl.latencyMsList) / 1000000; + (float) Util.avg(latencyList) / 1000000; float worstLatency = - (float) Util.max(HttpOfferSetImpl.latencyMsList) / 1000000; + (float) Util.max(latencyList) / 1000000; String medianLatencyName = "http_offer_set_median_latency_ms"; metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); @@ -80,17 +98,10 @@ public void run() { String avgLatencyName = "http_offer_set_avg_latency_ms"; metricCache.getUnchecked(avgLatencyName).set(avgLatencyName, avgLatency); String failureCountName = "http_offer_set_failure_count"; - metricCache.getUnchecked(failureCountName).set(failureCountName, - HttpOfferSetImpl.getFailureCount()); + metricCache.getUnchecked(failureCountName).set(failureCountName, failureCount); - long maxOfferSetDiff = Util.max(HttpOfferSetImpl.offerSetDiffList); + long maxOfferSetDiff = Util.max(offerSetDiffList); String maxOffSetDiffName = "http_offer_set_max_diff"; - metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, - maxOfferSetDiff); - - // reset the stats. - HttpOfferSetImpl.latencyMsList.clear(); - HttpOfferSetImpl.resetFailureCount(); - HttpOfferSetImpl.offerSetDiffList.clear(); + metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, maxOfferSetDiff); } } From 574bb622b102f5a46881a1acf63a08abbde18b60 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Tue, 19 Oct 2021 17:51:49 -0700 Subject: [PATCH 29/35] periodically fetching the starting tasks --- docs/features/custom-plugins.md | 11 +++- .../scheduler/offers/HttpOfferSetImpl.java | 21 ++++---- .../scheduler/offers/HttpOfferSetModule.java | 50 ++++++++++++++++--- .../aurora/scheduler/offers/TaskFetcher.java | 32 ++++++++++++ .../offers/HttpOfferSetImplTest.java | 5 +- 5 files changed, 95 insertions(+), 24 deletions(-) create mode 100644 src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index 84524e41a..2f82710b6 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -17,10 +17,17 @@ We added HTTP OfferSet `io.github.aurora.scheduler.offers.HttpOfferSetModule` th How to configure HTTP OfferSet? - offer_set_module=io.github.aurora.scheduler.offers.HttpOfferSetModule -- http_offer_set_timeout_ms is http timeout in milliseconds. +- http_offer_set_timeout is http timeout value. `100ms` is the default value. - http_offer_set_max_retries is the number of retries if the module fails to connects to the external REST API server. -If it exceeds the number of retries, Aurora uses the native offerset implementation. +`10` is the default value. +If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore. - http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. +- http_offer_set_max_starting_tasks_per_slave is the maximum starting tasks per slave. +If a slave has more this number of starting tasks, it will be put at the end of offer list. +If you want to use this feature, please set this parameter a positive integer number. +It is disabled by default. +- http_offer_set_task_fetch_interval determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. +By default, it is `1secs`. How to implement the external REST API server? The REST API needs to handle the request in the following format: diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 4ba042c80..0eb39d525 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -75,11 +75,11 @@ public class HttpOfferSetImpl implements OfferSet { static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); + private static Iterable startingTasks = new LinkedList<>(); private static long failureCount = 0; private static boolean useEndpoint = false; private final Set offers; - private final Storage storage; private final ObjectMapper jsonMapper = new ObjectMapper(); private final CloseableHttpClient httpClient = HttpClients.createDefault(); private final int timeoutMs; @@ -89,13 +89,11 @@ public class HttpOfferSetImpl implements OfferSet { private URL endpoint; public HttpOfferSetImpl(Set mOffers, - Storage mStorage, int mTimeoutMs, URL mEndpoint, int mMaxRetries, int mMaxStartingTasksPerSlave) { offers = mOffers; - storage = mStorage; timeoutMs = mTimeoutMs; endpoint = mEndpoint; maxRetries = mMaxRetries; @@ -124,13 +122,11 @@ public HttpOfferSetImpl(Set mOffers, @Inject public HttpOfferSetImpl(Ordering ordering, - Storage mStorage, @TimeoutMs Integer mTimeoutMs, @Endpoint String url, @MaxRetries Integer mMaxRetries, @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) { offers = new ConcurrentSkipListSet<>(ordering); - storage = mStorage; try { endpoint = new URL(Objects.requireNonNull(url)); HttpOfferSetImpl.setUseEndpoint(true); @@ -169,6 +165,11 @@ public static synchronized boolean isUseEndpoint() { return HttpOfferSetImpl.useEndpoint; } + public static synchronized void fetchStartingTasks(Storage storage) { + startingTasks = Storage.Util.fetchTasks(storage, + Query.unscoped().byStatus(ScheduleStatus.STARTING)); + } + @Override public void add(HostOffer offer) { offers.add(offer); @@ -206,11 +207,11 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res // count the number of starting tasks per slave Map hostTaskCountMap = new HashMap<>(); - Iterable startingTasks = - Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(ScheduleStatus.STARTING)); - for (IScheduledTask task : startingTasks) { - String slaveId = task.getAssignedTask().getSlaveId(); - hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1); + synchronized (startingTasks) { + for (IScheduledTask task : startingTasks) { + String slaveId = task.getAssignedTask().getSlaveId(); + hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1); + } } // find the bad offers and put them at the bottom of the list diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index eeba01a28..b02565f24 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -36,6 +36,7 @@ import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.config.CliOptions; import org.apache.aurora.scheduler.config.CommandLine; +import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferOrderBuilder; import org.apache.aurora.scheduler.offers.OfferSet; @@ -58,8 +59,8 @@ public static class Options { @Parameter(names = "-http_offer_set_endpoint") String httpOfferSetEndpoint = "http://localhost:9090/v1/offerset"; - @Parameter(names = "-http_offer_set_timeout_ms") - int httpOfferSetTimeoutMs = 100; + @Parameter(names = "-http_offer_set_timeout") + TimeAmount httpOfferSetTimeout = new TimeAmount(100, Time.MILLISECONDS); @Parameter(names = "-http_offer_set_max_retries") int httpOfferSetMaxRetries = 10; @@ -69,6 +70,9 @@ public static class Options { // httpOfferSetMaxStartingTasksPerSlave less than or equal to zero @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave") int httpOfferSetMaxStartingTasksPerSlave = 0; + + @Parameter(names = "-http_offer_set_task_fetch_interval") + TimeAmount httpOfferSetTaskFetchInterval = new TimeAmount(1, Time.SECONDS); } static { @@ -91,7 +95,7 @@ protected void configure() { }).toInstance(OfferOrderBuilder.create(cliOptions.offer.offerOrder)); bind(Integer.class) .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) - .toInstance(options.httpOfferSetTimeoutMs); + .toInstance(options.httpOfferSetTimeout.getValue().intValue()); bind(String.class) .annotatedWith(HttpOfferSetImpl.Endpoint.class) .toInstance(options.httpOfferSetEndpoint); @@ -107,12 +111,20 @@ protected void configure() { }); bind(StatCalculator.class).in(com.google.inject.Singleton.class); + bind(TaskFetcher.class).in(com.google.inject.Singleton.class); + bind(ScheduledExecutorService.class) .annotatedWith(Executor.class) .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); - bind(Integer.class) + bind(Long.class) .annotatedWith(RefreshRateMs.class) - .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS).intValue()); + .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS)); + bind(Long.class) + .annotatedWith(TaskFetcherRateSec.class) + .toInstance(options.httpOfferSetTaskFetchInterval.as(Time.MILLISECONDS)); + bind(Integer.class) + .annotatedWith(MaxStartingTaskPerSlave.class) + .toInstance(options.httpOfferSetMaxStartingTasksPerSlave); bind(StatUpdater.class).in(com.google.inject.Singleton.class); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(StatUpdater.class); } @@ -127,24 +139,46 @@ protected void configure() { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface RefreshRateMs { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface TaskFetcherRateSec { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxStartingTaskPerSlave { } + static class StatUpdater extends AbstractIdleService { private final ScheduledExecutorService executor; private final StatCalculator calculator; - private final Integer refreshRateMs; + private final TaskFetcher taskFetcher; + private final Long refreshRateMs; + private final Long taskFetcherRateMs; + private final Integer maxStartingTasksPerSlave; @Inject StatUpdater( @Executor ScheduledExecutorService mExecutor, StatCalculator mCalculator, - @RefreshRateMs Integer mRefreshRateMs) { + TaskFetcher mTaskFetcher, + @RefreshRateMs Long mRefreshRateMs, + @TaskFetcherRateSec Long mTaskFetcherRateMs, + @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) { executor = requireNonNull(mExecutor); calculator = requireNonNull(mCalculator); - refreshRateMs = mRefreshRateMs; + taskFetcher = requireNonNull(mTaskFetcher); + refreshRateMs = requireNonNull(mRefreshRateMs); + taskFetcherRateMs = requireNonNull(mTaskFetcherRateMs); + maxStartingTasksPerSlave = requireNonNull(mMaxStartingTasksPerSlave); } @Override protected void startUp() { executor.scheduleAtFixedRate(calculator, 0, refreshRateMs, TimeUnit.MILLISECONDS); + if (maxStartingTasksPerSlave > 0) { + executor.scheduleAtFixedRate(taskFetcher, 0, taskFetcherRateMs, TimeUnit.MILLISECONDS); + } } @Override diff --git a/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java b/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java new file mode 100644 index 000000000..a91f50174 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import javax.inject.Inject; + +import org.apache.aurora.scheduler.storage.Storage; + +public class TaskFetcher implements Runnable { + private final Storage storage; + + @Inject + TaskFetcher(Storage mStorage) { + storage = mStorage; + } + + @Override + public void run() { + HttpOfferSetImpl.fetchStartingTasks(storage); + } +} diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index da9893710..ce5cde16b 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -107,7 +107,6 @@ public void setUp() throws IOException { offers.add(OFFER_C); httpOfferSet = new HttpOfferSetImpl(offers, - storage, 0, new URL("http://localhost:9090/v1/offerset"), 0, @@ -203,13 +202,13 @@ public void testGetOrdered() throws IOException { TaskGroupKey groupKey = TaskGroupKey.from(task.getAssignedTask().getTask()); SchedulingFilter.ResourceRequest resourceRequest = TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask()); + HttpOfferSetImpl.fetchStartingTasks(storage); // return the same set of offers Iterable sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); assertEquals(offers.size(), Iterables.size(sortedOffers)); httpOfferSet = new HttpOfferSetImpl(offers, - storage, 0, new URL("http://localhost:9090/v1/offerset"), 0, @@ -218,7 +217,6 @@ public void testGetOrdered() throws IOException { assertEquals(offers.size(), Iterables.size(sortedOffers)); httpOfferSet = new HttpOfferSetImpl(offers, - storage, 0, new URL("http://localhost:9090/v1/offerset"), 0, @@ -228,7 +226,6 @@ public void testGetOrdered() throws IOException { // OFFER_B is put in the bottom of list as it has 1 starting task. httpOfferSet = new HttpOfferSetImpl(offers, - storage, 0, new URL("http://localhost:9090/v1/offerset"), 0, From 7026337ea55f6960e4632697b3fea9622868e0c1 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Thu, 21 Oct 2021 14:12:20 -0700 Subject: [PATCH 30/35] Update docs/features/custom-plugins.md Co-authored-by: Bipra De --- docs/features/custom-plugins.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index 2f82710b6..3ba2bb5d7 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -23,7 +23,7 @@ How to configure HTTP OfferSet? If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore. - http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. - http_offer_set_max_starting_tasks_per_slave is the maximum starting tasks per slave. -If a slave has more this number of starting tasks, it will be put at the end of offer list. +If a slave has more than this number of starting tasks, it will be put at the end of offer list. If you want to use this feature, please set this parameter a positive integer number. It is disabled by default. - http_offer_set_task_fetch_interval determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. From 0aaf21b84436f8fccec6f8bd82bc99e51fa8feb7 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Thu, 21 Oct 2021 14:20:54 -0700 Subject: [PATCH 31/35] Update src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java Co-authored-by: Bipra De --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 0eb39d525..856835acc 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -145,7 +145,7 @@ public HttpOfferSetImpl(Ordering ordering, LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave); } - public static synchronized void incFailureCount() { + public static synchronized void incrementFailureCount() { HttpOfferSetImpl.failureCount++; } From c11658699d36044bb60fc40afd433e2bb0c47a1a Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 21 Oct 2021 14:48:13 -0700 Subject: [PATCH 32/35] address comment --- .../io/github/aurora/scheduler/offers/HttpOfferSetImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 856835acc..467d69760 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -253,7 +253,7 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res } catch (IOException e) { LOG.error("Failed to schedule the task of {} using {} ", resourceRequest.getTask().getJob().toString(), endpoint, e); - HttpOfferSetImpl.incFailureCount(); + HttpOfferSetImpl.incrementFailureCount(); } finally { // stop reaching the endpoint if failure is consistent. if (HttpOfferSetImpl.getFailureCount() >= maxRetries) { From 5664d8934f040ba9bea954aacd815ff63b861098 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Thu, 21 Oct 2021 15:38:01 -0700 Subject: [PATCH 33/35] Apply suggestions from code review Co-authored-by: Bipra De --- .../io/github/aurora/scheduler/offers/HttpOfferSetModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index b02565f24..bf3af23e0 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -66,7 +66,7 @@ public static class Options { int httpOfferSetMaxRetries = 10; // the slaves have more than or equal to the httpOfferSetMaxStartingTasksPerSlave - // are put in the bottom of the offerset. If you want to enable this feature, set + // are put in the bottom of the offerset. If you want to disable this feature, set // httpOfferSetMaxStartingTasksPerSlave less than or equal to zero @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave") int httpOfferSetMaxStartingTasksPerSlave = 0; From 4af9021f2784cfc4f13233d41f6e5ac0d33e5c24 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Thu, 21 Oct 2021 15:50:59 -0700 Subject: [PATCH 34/35] addressed comments --- docs/features/custom-plugins.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index 3ba2bb5d7..36fff698d 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -24,8 +24,12 @@ If it exceeds the number of retries, HTTP OfferSet will not reach the external e - http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. - http_offer_set_max_starting_tasks_per_slave is the maximum starting tasks per slave. If a slave has more than this number of starting tasks, it will be put at the end of offer list. -If you want to use this feature, please set this parameter a positive integer number. +This feature is useful when a node is vulnerable to a certain number of tasks starting at the same time. +A task often demands a lot of resources during STARTING (e.g., pulling docker images and frequent healchecks). +Too many of them starting at the same time may overload the nodes. It is disabled by default. +If you want to use this feature, please set this parameter a positive integer number. + - http_offer_set_task_fetch_interval determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. By default, it is `1secs`. From 1621f863ccd87f8e0da0de13aff3ef1b02c5e624 Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Mon, 25 Oct 2021 09:27:34 -0700 Subject: [PATCH 35/35] add parameter --- docs/features/custom-plugins.md | 15 ++++--- .../scheduler/offers/HttpOfferSetImpl.java | 27 +++++++++--- .../scheduler/offers/HttpOfferSetModule.java | 44 ++++++++++++------- .../offers/HttpOfferSetImplTest.java | 22 ++++++++-- 4 files changed, 74 insertions(+), 34 deletions(-) diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index 36fff698d..b8a092eb7 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -17,20 +17,21 @@ We added HTTP OfferSet `io.github.aurora.scheduler.offers.HttpOfferSetModule` th How to configure HTTP OfferSet? - offer_set_module=io.github.aurora.scheduler.offers.HttpOfferSetModule -- http_offer_set_timeout is http timeout value. `100ms` is the default value. -- http_offer_set_max_retries is the number of retries if the module fails to connects to the external REST API server. +- `http_offer_set_timeout` is http timeout value. `100ms` is the default value. +- `http_offer_set_max_retries` is the number of retries if the module fails to connects to the external REST API server. `10` is the default value. If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore. -- http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. -- http_offer_set_max_starting_tasks_per_slave is the maximum starting tasks per slave. -If a slave has more than this number of starting tasks, it will be put at the end of offer list. +- `http_offer_set_endpoint` is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. +- `http_offer_set_filter_enabled` is used to filter bad offers out. By default, it is disabled. +- `http_offer_set_max_starting_tasks_per_slave` is the maximum starting tasks per slave. +If a slave has more than this number of starting tasks, it will be put at the end of offer list (`http_offer_set_filter_enabled=false`) +or taken out (`http_offer_set_filter_enabled=true`). This feature is useful when a node is vulnerable to a certain number of tasks starting at the same time. A task often demands a lot of resources during STARTING (e.g., pulling docker images and frequent healchecks). Too many of them starting at the same time may overload the nodes. It is disabled by default. If you want to use this feature, please set this parameter a positive integer number. - -- http_offer_set_task_fetch_interval determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. +- `http_offer_set_task_fetch_interval` determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. By default, it is `1secs`. How to implement the external REST API server? diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 467d69760..d8f741a18 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -85,6 +85,7 @@ public class HttpOfferSetImpl implements OfferSet { private final int timeoutMs; private final int maxRetries; private final int maxStartingTasksPerSlave; + private final boolean filterEnabled; private URL endpoint; @@ -92,12 +93,14 @@ public HttpOfferSetImpl(Set mOffers, int mTimeoutMs, URL mEndpoint, int mMaxRetries, - int mMaxStartingTasksPerSlave) { + int mMaxStartingTasksPerSlave, + boolean mFilterEnabled) { offers = mOffers; timeoutMs = mTimeoutMs; endpoint = mEndpoint; maxRetries = mMaxRetries; maxStartingTasksPerSlave = mMaxStartingTasksPerSlave; + filterEnabled = mFilterEnabled; } @VisibleForTesting @@ -120,12 +123,18 @@ public HttpOfferSetImpl(Set mOffers, @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface MaxStartingTaskPerSlave { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface FilterEnabled { } + @Inject public HttpOfferSetImpl(Ordering ordering, @TimeoutMs Integer mTimeoutMs, @Endpoint String url, @MaxRetries Integer mMaxRetries, - @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) { + @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave, + @FilterEnabled Boolean mFilterEnabled) { offers = new ConcurrentSkipListSet<>(ordering); try { endpoint = new URL(Objects.requireNonNull(url)); @@ -139,9 +148,11 @@ public HttpOfferSetImpl(Ordering ordering, timeoutMs = Objects.requireNonNull(mTimeoutMs); maxRetries = Objects.requireNonNull(mMaxRetries); maxStartingTasksPerSlave = Objects.requireNonNull(mMaxStartingTasksPerSlave); + filterEnabled = Objects.requireNonNull(mFilterEnabled); LOG.info("HttpOfferSet's endpoint: {}", endpoint); LOG.info("HttpOfferSet's timeout: {} (ms)", timeoutMs); LOG.info("HttpOfferSet's max retries: {}", maxRetries); + LOG.info("HttpOfferSet's filter enabled: {}", filterEnabled); LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave); } @@ -218,11 +229,13 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res List badOffers = new LinkedList<>(); List goodOffers = new LinkedList<>(); if (maxStartingTasksPerSlave > 0) { - badOffers = offers.stream() - .filter(offer -> - hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) - >= maxStartingTasksPerSlave) - .collect(Collectors.toList()); + if (!filterEnabled) { + badOffers = offers.stream() + .filter(offer -> + hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + >= maxStartingTasksPerSlave) + .collect(Collectors.toList()); + } goodOffers = offers.stream() .filter(offer -> hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index bf3af23e0..aee24f6e6 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -56,22 +56,32 @@ public class HttpOfferSetModule extends AbstractModule { @Parameters(separators = "=") public static class Options { - @Parameter(names = "-http_offer_set_endpoint") + @Parameter(names = "-http_offer_set_endpoint", + description = "http_offer_set endpoint") String httpOfferSetEndpoint = "http://localhost:9090/v1/offerset"; - @Parameter(names = "-http_offer_set_timeout") + @Parameter(names = "-http_offer_set_timeout", + description = "http_offer_set timeout") TimeAmount httpOfferSetTimeout = new TimeAmount(100, Time.MILLISECONDS); - @Parameter(names = "-http_offer_set_max_retries") + @Parameter(names = "-http_offer_set_max_retries", + description = "Maximum number of tries to reach the http_offer_set_endpoint") int httpOfferSetMaxRetries = 10; // the slaves have more than or equal to the httpOfferSetMaxStartingTasksPerSlave // are put in the bottom of the offerset. If you want to disable this feature, set // httpOfferSetMaxStartingTasksPerSlave less than or equal to zero - @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave") + @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave", + description = "Maximum number of starting tasks per slave are allowed") int httpOfferSetMaxStartingTasksPerSlave = 0; - @Parameter(names = "-http_offer_set_task_fetch_interval") + @Parameter(names = "-http_offer_set_filter_enabled", + description = "Allow to filter out the bad offers", + arity = 1) + boolean httpOfferSetFilterEnabled = false; + + @Parameter(names = "-http_offer_set_task_fetch_interval", + description = "Interval of fetching starting tasks from task_store") TimeAmount httpOfferSetTaskFetchInterval = new TimeAmount(1, Time.SECONDS); } @@ -87,24 +97,26 @@ public HttpOfferSetModule(CliOptions mOptions) { @Override protected void configure() { - install(new PrivateModule() { @Override protected void configure() { bind(new TypeLiteral>() { }).toInstance(OfferOrderBuilder.create(cliOptions.offer.offerOrder)); bind(Integer.class) - .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) - .toInstance(options.httpOfferSetTimeout.getValue().intValue()); + .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) + .toInstance(options.httpOfferSetTimeout.getValue().intValue()); bind(String.class) - .annotatedWith(HttpOfferSetImpl.Endpoint.class) - .toInstance(options.httpOfferSetEndpoint); + .annotatedWith(HttpOfferSetImpl.Endpoint.class) + .toInstance(options.httpOfferSetEndpoint); bind(Integer.class) - .annotatedWith(HttpOfferSetImpl.MaxRetries.class) - .toInstance(options.httpOfferSetMaxRetries); + .annotatedWith(HttpOfferSetImpl.MaxRetries.class) + .toInstance(options.httpOfferSetMaxRetries); bind(Integer.class) .annotatedWith(HttpOfferSetImpl.MaxStartingTaskPerSlave.class) .toInstance(options.httpOfferSetMaxStartingTasksPerSlave); + bind(Boolean.class) + .annotatedWith(HttpOfferSetImpl.FilterEnabled.class) + .toInstance(options.httpOfferSetFilterEnabled); bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } @@ -114,11 +126,11 @@ protected void configure() { bind(TaskFetcher.class).in(com.google.inject.Singleton.class); bind(ScheduledExecutorService.class) - .annotatedWith(Executor.class) - .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); + .annotatedWith(Executor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); bind(Long.class) - .annotatedWith(RefreshRateMs.class) - .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS)); + .annotatedWith(RefreshRateMs.class) + .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS)); bind(Long.class) .annotatedWith(TaskFetcherRateSec.class) .toInstance(options.httpOfferSetTaskFetchInterval.as(Time.MILLISECONDS)); diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index ce5cde16b..3007924e5 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -110,7 +110,8 @@ public void setUp() throws IOException { 0, new URL("http://localhost:9090/v1/offerset"), 0, - 0); + 0, + false); } @Test @@ -212,7 +213,8 @@ public void testGetOrdered() throws IOException { 0, new URL("http://localhost:9090/v1/offerset"), 0, - -1); + -1, + false); sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); assertEquals(offers.size(), Iterables.size(sortedOffers)); @@ -220,7 +222,8 @@ public void testGetOrdered() throws IOException { 0, new URL("http://localhost:9090/v1/offerset"), 0, - 2); + 2, + false); sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); assertEquals(offers.size(), Iterables.size(sortedOffers)); @@ -229,7 +232,8 @@ public void testGetOrdered() throws IOException { 0, new URL("http://localhost:9090/v1/offerset"), 0, - 1); + 1, + false); sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); assertEquals(offers.size(), Iterables.size(sortedOffers)); HostOffer lastOffer = null; @@ -237,5 +241,15 @@ public void testGetOrdered() throws IOException { lastOffer = o; } assertEquals(OFFER_B, lastOffer); + + // filter OFFER_B out + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 1, + true); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size() - 1, Iterables.size(sortedOffers)); } }