diff --git a/docs/features/custom-plugins.md b/docs/features/custom-plugins.md index 84524e41a..b8a092eb7 100644 --- a/docs/features/custom-plugins.md +++ b/docs/features/custom-plugins.md @@ -17,10 +17,22 @@ We added HTTP OfferSet `io.github.aurora.scheduler.offers.HttpOfferSetModule` th How to configure HTTP OfferSet? - offer_set_module=io.github.aurora.scheduler.offers.HttpOfferSetModule -- http_offer_set_timeout_ms is http timeout in milliseconds. -- http_offer_set_max_retries is the number of retries if the module fails to connects to the external REST API server. -If it exceeds the number of retries, Aurora uses the native offerset implementation. -- http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. +- `http_offer_set_timeout` is http timeout value. `100ms` is the default value. +- `http_offer_set_max_retries` is the number of retries if the module fails to connects to the external REST API server. +`10` is the default value. +If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore. +- `http_offer_set_endpoint` is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset. +- `http_offer_set_filter_enabled` is used to filter bad offers out. By default, it is disabled. +- `http_offer_set_max_starting_tasks_per_slave` is the maximum starting tasks per slave. +If a slave has more than this number of starting tasks, it will be put at the end of offer list (`http_offer_set_filter_enabled=false`) +or taken out (`http_offer_set_filter_enabled=true`). +This feature is useful when a node is vulnerable to a certain number of tasks starting at the same time. +A task often demands a lot of resources during STARTING (e.g., pulling docker images and frequent healchecks). +Too many of them starting at the same time may overload the nodes. +It is disabled by default. +If you want to use this feature, please set this parameter a positive integer number. +- `http_offer_set_task_fetch_interval` determine how often HTTP OfferSet fetches the starting tasks from the `task_store`. +By default, it is `1secs`. How to implement the external REST API server? The REST API needs to handle the request in the following format: diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index 38ac5aaa1..d8f741a18 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -18,9 +18,12 @@ import java.lang.annotation.Target; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; @@ -33,13 +36,17 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferSet; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; @@ -66,16 +73,34 @@ public class HttpOfferSetImpl implements OfferSet { private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); + static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); + static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); + private static Iterable startingTasks = new LinkedList<>(); + private static long failureCount = 0; + private static boolean useEndpoint = false; + private final Set offers; private final ObjectMapper jsonMapper = new ObjectMapper(); private final CloseableHttpClient httpClient = HttpClients.createDefault(); + private final int timeoutMs; + private final int maxRetries; + private final int maxStartingTasksPerSlave; + private final boolean filterEnabled; - private Integer timeoutMs; private URL endpoint; - private Integer maxRetries; - public HttpOfferSetImpl(Set mOffers) { + public HttpOfferSetImpl(Set mOffers, + int mTimeoutMs, + URL mEndpoint, + int mMaxRetries, + int mMaxStartingTasksPerSlave, + boolean mFilterEnabled) { offers = mOffers; + timeoutMs = mTimeoutMs; + endpoint = mEndpoint; + maxRetries = mMaxRetries; + maxStartingTasksPerSlave = mMaxStartingTasksPerSlave; + filterEnabled = mFilterEnabled; } @VisibleForTesting @@ -93,26 +118,67 @@ public HttpOfferSetImpl(Set mOffers) { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface TimeoutMs { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxStartingTaskPerSlave { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface FilterEnabled { } + @Inject public HttpOfferSetImpl(Ordering ordering, @TimeoutMs Integer mTimeoutMs, @Endpoint String url, - @MaxRetries Integer mMaxRetries) { + @MaxRetries Integer mMaxRetries, + @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave, + @FilterEnabled Boolean mFilterEnabled) { offers = new ConcurrentSkipListSet<>(ordering); try { - endpoint = new URL(url); - HttpOfferSetModule.enable(true); - LOG.info("HttpOfferSetModule Enabled."); + endpoint = new URL(Objects.requireNonNull(url)); + HttpOfferSetImpl.setUseEndpoint(true); + LOG.info("HttpOfferSetImpl Enabled."); } catch (MalformedURLException e) { LOG.error("http_offer_set_endpoint is malformed. ", e); - HttpOfferSetModule.enable(false); - LOG.info("HttpOfferSetModule Disabled."); - } - timeoutMs = mTimeoutMs; - maxRetries = mMaxRetries; - LOG.info("HttpOfferSet's endpoint: " + endpoint); - LOG.info("HttpOfferSet's timeout: " + timeoutMs + " (ms)"); - LOG.info("HttpOfferSet's max retries: " + maxRetries); + HttpOfferSetImpl.setUseEndpoint(false); + LOG.info("HttpOfferSetImpl Disabled."); + } + timeoutMs = Objects.requireNonNull(mTimeoutMs); + maxRetries = Objects.requireNonNull(mMaxRetries); + maxStartingTasksPerSlave = Objects.requireNonNull(mMaxStartingTasksPerSlave); + filterEnabled = Objects.requireNonNull(mFilterEnabled); + LOG.info("HttpOfferSet's endpoint: {}", endpoint); + LOG.info("HttpOfferSet's timeout: {} (ms)", timeoutMs); + LOG.info("HttpOfferSet's max retries: {}", maxRetries); + LOG.info("HttpOfferSet's filter enabled: {}", filterEnabled); + LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave); + } + + public static synchronized void incrementFailureCount() { + HttpOfferSetImpl.failureCount++; + } + + public static synchronized long getFailureCount() { + return HttpOfferSetImpl.failureCount; + } + + public static synchronized void resetFailureCount() { + HttpOfferSetImpl.failureCount = 0; + } + + public static synchronized void setUseEndpoint(boolean mEnabled) { + HttpOfferSetImpl.useEndpoint = mEnabled; + } + + public static synchronized boolean isUseEndpoint() { + return HttpOfferSetImpl.useEndpoint; + } + + public static synchronized void fetchStartingTasks(Storage storage) { + startingTasks = Storage.Util.fetchTasks(storage, + Query.unscoped().byStatus(ScheduleStatus.STARTING)); } @Override @@ -144,48 +210,88 @@ public Iterable values() { @Override public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + long startTime = System.nanoTime(); // if there are no available offers, do nothing. - if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) { + if (offers.isEmpty()) { return offers; } + // count the number of starting tasks per slave + Map hostTaskCountMap = new HashMap<>(); + synchronized (startingTasks) { + for (IScheduledTask task : startingTasks) { + String slaveId = task.getAssignedTask().getSlaveId(); + hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1); + } + } + + // find the bad offers and put them at the bottom of the list + List badOffers = new LinkedList<>(); + List goodOffers = new LinkedList<>(); + if (maxStartingTasksPerSlave > 0) { + if (!filterEnabled) { + badOffers = offers.stream() + .filter(offer -> + hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + >= maxStartingTasksPerSlave) + .collect(Collectors.toList()); + } + goodOffers = offers.stream() + .filter(offer -> + hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0) + < maxStartingTasksPerSlave) + .collect(Collectors.toList()); + + if (!badOffers.isEmpty()) { + LOG.info("the number of bad offers: {}", badOffers.size()); + } + } else { + goodOffers = offers.stream().collect(Collectors.toList()); + } + + // if the external http endpoint was not reachable or we have nothing to send out + if (!HttpOfferSetImpl.isUseEndpoint() || goodOffers.isEmpty()) { + goodOffers.addAll(badOffers); + HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); + return goodOffers; + } + List orderedOffers = null; try { - long startTime = System.nanoTime(); // create json request & send the Rest API request to the scheduler plugin - ScheduleRequest scheduleRequest = createRequest(resourceRequest, startTime); - LOG.info("Sending request " + scheduleRequest.jobKey); + ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime); + LOG.info("Sending request {}", scheduleRequest.jobKey); String responseStr = sendRequest(scheduleRequest); - orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList); - HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + orderedOffers = processResponse(goodOffers, responseStr); } catch (IOException e) { - LOG.error("Failed to schedule the task of " - + resourceRequest.getTask().getJob().toString() - + " using HttpOfferSet. ", e); - HttpOfferSetModule.incFailureCount(); + LOG.error("Failed to schedule the task of {} using {} ", + resourceRequest.getTask().getJob().toString(), endpoint, e); + HttpOfferSetImpl.incrementFailureCount(); } finally { - // shutdown HttpOfferSet if failure is consistent. - if (HttpOfferSetModule.getFailureCount() >= maxRetries) { - LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled."); - HttpOfferSetModule.enable(false); + // stop reaching the endpoint if failure is consistent. + if (HttpOfferSetImpl.getFailureCount() >= maxRetries) { + LOG.error("Reaches {} retries. {} is disabled", maxRetries, endpoint); + HttpOfferSetImpl.setUseEndpoint(false); } } if (orderedOffers != null) { - return orderedOffers; + goodOffers = orderedOffers; } - // fall back to default scheduler. - LOG.warn("Falling back on default ordering."); - return offers; + goodOffers.addAll(badOffers); + HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime); + return goodOffers; } //createScheduleRequest creates the ScheduleRequest to be sent out to the plugin. - private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) { + private ScheduleRequest createRequest(List mOffers, + ResourceRequest resourceRequest, + long startTime) { Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); List hosts = - offers.stream() + mOffers.stream() .map(offer -> new Host(offer.getAttributes().getHost(), new Resource(offer))) .collect(Collectors.toList()); IJobKey jobKey = resourceRequest.getTask().getJob(); @@ -194,9 +300,9 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star return new ScheduleRequest(req, hosts, jobKeyStr); } - // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. + // sendRequest sends ScheduleRequest to the external endpoint and gets ordered offers in json private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { - LOG.debug("Sending request for " + scheduleRequest.toString()); + LOG.debug("Sending request for {}", scheduleRequest); HttpPost request = new HttpPost(endpoint.toString()); RequestConfig requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(timeoutMs) @@ -219,15 +325,15 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { } } - List processResponse(String responseStr, List offerSetDiffList) + List processResponse(List mOffers, String responseStr) throws IOException { ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); - LOG.info("Received " + response.hosts.size() + " offers"); + LOG.info("Received {} offers", response.hosts.size()); - Map offerMap = offers.stream() + Map offerMap = mOffers.stream() .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); if (!response.error.trim().isEmpty()) { - LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error); + LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error); throw new IOException(response.error); } @@ -240,24 +346,24 @@ List processResponse(String responseStr, List offerSetDiffList) .collect(Collectors.toList()); //offSetDiff is the total number of missing offers and the extra offers - long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size()) + long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size()) + extraOffers.size(); offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { - LOG.warn("The number of different offers between the original and received offer sets is " - + offSetDiff); + LOG.warn("The number of different offers between the original and received offer sets is {}", + offSetDiff); if (LOG.isDebugEnabled()) { - List missedOffers = offers.stream() + List missedOffers = mOffers.stream() .map(offer -> offer.getAttributes().getHost()) .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); - LOG.debug("missed offers: " + missedOffers); - LOG.debug("extra offers: " + extraOffers); + LOG.debug("missed offers: {}", missedOffers); + LOG.debug("extra offers: {}", extraOffers); } } if (!extraOffers.isEmpty()) { - LOG.error("Cannot find offers " + extraOffers + " in the original offer set"); + LOG.error("Cannot find offers {} in the original offer set", extraOffers); } return orderedOffers; diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java index 3195c271e..aee24f6e6 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -15,9 +15,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -39,6 +36,7 @@ import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.config.CliOptions; import org.apache.aurora.scheduler.config.CommandLine; +import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferOrderBuilder; import org.apache.aurora.scheduler.offers.OfferSet; @@ -55,41 +53,36 @@ public class HttpOfferSetModule extends AbstractModule { private final CliOptions cliOptions; private final Options options; private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); - static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); - static List offerSetDiffList = Collections.synchronizedList(new LinkedList<>()); - private static long failureCount = 0; - private static boolean enabled = false; - - public static synchronized void incFailureCount() { - HttpOfferSetModule.failureCount++; - } - - public static synchronized long getFailureCount() { - return HttpOfferSetModule.failureCount; - } - - public static synchronized void resetFailureCount() { - HttpOfferSetModule.failureCount = 0; - } - - public static synchronized void enable(boolean mEnabled) { - HttpOfferSetModule.enabled = mEnabled; - } - - public static synchronized boolean isEnabled() { - return HttpOfferSetModule.enabled; - } @Parameters(separators = "=") public static class Options { - @Parameter(names = "-http_offer_set_endpoint") + @Parameter(names = "-http_offer_set_endpoint", + description = "http_offer_set endpoint") String httpOfferSetEndpoint = "http://localhost:9090/v1/offerset"; - @Parameter(names = "-http_offer_set_timeout_ms") - int httpOfferSetTimeoutMs = 100; + @Parameter(names = "-http_offer_set_timeout", + description = "http_offer_set timeout") + TimeAmount httpOfferSetTimeout = new TimeAmount(100, Time.MILLISECONDS); - @Parameter(names = "-http_offer_set_max_retries") + @Parameter(names = "-http_offer_set_max_retries", + description = "Maximum number of tries to reach the http_offer_set_endpoint") int httpOfferSetMaxRetries = 10; + + // the slaves have more than or equal to the httpOfferSetMaxStartingTasksPerSlave + // are put in the bottom of the offerset. If you want to disable this feature, set + // httpOfferSetMaxStartingTasksPerSlave less than or equal to zero + @Parameter(names = "-http_offer_set_max_starting_tasks_per_slave", + description = "Maximum number of starting tasks per slave are allowed") + int httpOfferSetMaxStartingTasksPerSlave = 0; + + @Parameter(names = "-http_offer_set_filter_enabled", + description = "Allow to filter out the bad offers", + arity = 1) + boolean httpOfferSetFilterEnabled = false; + + @Parameter(names = "-http_offer_set_task_fetch_interval", + description = "Interval of fetching starting tasks from task_store") + TimeAmount httpOfferSetTaskFetchInterval = new TimeAmount(1, Time.SECONDS); } static { @@ -104,33 +97,46 @@ public HttpOfferSetModule(CliOptions mOptions) { @Override protected void configure() { - install(new PrivateModule() { @Override protected void configure() { bind(new TypeLiteral>() { }).toInstance(OfferOrderBuilder.create(cliOptions.offer.offerOrder)); bind(Integer.class) - .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) - .toInstance(options.httpOfferSetTimeoutMs); + .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) + .toInstance(options.httpOfferSetTimeout.getValue().intValue()); bind(String.class) - .annotatedWith(HttpOfferSetImpl.Endpoint.class) - .toInstance(options.httpOfferSetEndpoint); + .annotatedWith(HttpOfferSetImpl.Endpoint.class) + .toInstance(options.httpOfferSetEndpoint); bind(Integer.class) - .annotatedWith(HttpOfferSetImpl.MaxRetries.class) - .toInstance(options.httpOfferSetMaxRetries); + .annotatedWith(HttpOfferSetImpl.MaxRetries.class) + .toInstance(options.httpOfferSetMaxRetries); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.MaxStartingTaskPerSlave.class) + .toInstance(options.httpOfferSetMaxStartingTasksPerSlave); + bind(Boolean.class) + .annotatedWith(HttpOfferSetImpl.FilterEnabled.class) + .toInstance(options.httpOfferSetFilterEnabled); bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); expose(OfferSet.class); } }); bind(StatCalculator.class).in(com.google.inject.Singleton.class); + bind(TaskFetcher.class).in(com.google.inject.Singleton.class); + bind(ScheduledExecutorService.class) - .annotatedWith(Executor.class) - .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); + .annotatedWith(Executor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); + bind(Long.class) + .annotatedWith(RefreshRateMs.class) + .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS)); + bind(Long.class) + .annotatedWith(TaskFetcherRateSec.class) + .toInstance(options.httpOfferSetTaskFetchInterval.as(Time.MILLISECONDS)); bind(Integer.class) - .annotatedWith(RefreshRateMs.class) - .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS).intValue()); + .annotatedWith(MaxStartingTaskPerSlave.class) + .toInstance(options.httpOfferSetMaxStartingTasksPerSlave); bind(StatUpdater.class).in(com.google.inject.Singleton.class); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(StatUpdater.class); } @@ -145,24 +151,46 @@ protected void configure() { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface RefreshRateMs { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface TaskFetcherRateSec { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxStartingTaskPerSlave { } + static class StatUpdater extends AbstractIdleService { private final ScheduledExecutorService executor; private final StatCalculator calculator; - private final Integer refreshRateMs; + private final TaskFetcher taskFetcher; + private final Long refreshRateMs; + private final Long taskFetcherRateMs; + private final Integer maxStartingTasksPerSlave; @Inject StatUpdater( @Executor ScheduledExecutorService mExecutor, StatCalculator mCalculator, - @RefreshRateMs Integer mRefreshRateMs) { + TaskFetcher mTaskFetcher, + @RefreshRateMs Long mRefreshRateMs, + @TaskFetcherRateSec Long mTaskFetcherRateMs, + @MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) { executor = requireNonNull(mExecutor); calculator = requireNonNull(mCalculator); - refreshRateMs = mRefreshRateMs; + taskFetcher = requireNonNull(mTaskFetcher); + refreshRateMs = requireNonNull(mRefreshRateMs); + taskFetcherRateMs = requireNonNull(mTaskFetcherRateMs); + maxStartingTasksPerSlave = requireNonNull(mMaxStartingTasksPerSlave); } @Override protected void startUp() { executor.scheduleAtFixedRate(calculator, 0, refreshRateMs, TimeUnit.MILLISECONDS); + if (maxStartingTasksPerSlave > 0) { + executor.scheduleAtFixedRate(taskFetcher, 0, taskFetcherRateMs, TimeUnit.MILLISECONDS); + } } @Override diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java index 64e2674a1..311f4bf34 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -13,7 +13,9 @@ */ package io.github.aurora.scheduler.offers; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -65,13 +67,29 @@ public StatCalculator.Counter load(String key) { @Override public void run() { + // make copies of stat data as synchronizedList is not thread-safe iterating + List latencyList; + synchronized (HttpOfferSetImpl.latencyMsList) { + latencyList = HttpOfferSetImpl.latencyMsList.stream().collect(Collectors.toList()); + } + List offerSetDiffList; + synchronized (HttpOfferSetImpl.offerSetDiffList) { + offerSetDiffList = HttpOfferSetImpl.offerSetDiffList.stream().collect(Collectors.toList()); + } + long failureCount = HttpOfferSetImpl.getFailureCount(); + + // reset the stats. + HttpOfferSetImpl.latencyMsList.clear(); + HttpOfferSetImpl.resetFailureCount(); + HttpOfferSetImpl.offerSetDiffList.clear(); + float medianLatency = - Util.percentile(HttpOfferSetModule.latencyMsList, 50.0) + Util.percentile(latencyList, 50.0) .floatValue() / 1000000; float avgLatency = - (float) Util.avg(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.avg(latencyList) / 1000000; float worstLatency = - (float) Util.max(HttpOfferSetModule.latencyMsList) / 1000000; + (float) Util.max(latencyList) / 1000000; String medianLatencyName = "http_offer_set_median_latency_ms"; metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); @@ -80,17 +98,10 @@ public void run() { String avgLatencyName = "http_offer_set_avg_latency_ms"; metricCache.getUnchecked(avgLatencyName).set(avgLatencyName, avgLatency); String failureCountName = "http_offer_set_failure_count"; - metricCache.getUnchecked(failureCountName).set(failureCountName, - HttpOfferSetModule.getFailureCount()); + metricCache.getUnchecked(failureCountName).set(failureCountName, failureCount); - long maxOfferSetDiff = Util.max(HttpOfferSetModule.offerSetDiffList); + long maxOfferSetDiff = Util.max(offerSetDiffList); String maxOffSetDiffName = "http_offer_set_max_diff"; - metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, - maxOfferSetDiff); - - // reset the stats. - HttpOfferSetModule.latencyMsList.clear(); - HttpOfferSetModule.resetFailureCount(); - HttpOfferSetModule.offerSetDiffList.clear(); + metricCache.getUnchecked(maxOffSetDiffName).set(maxOffSetDiffName, maxOfferSetDiff); } } diff --git a/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java b/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java new file mode 100644 index 000000000..a91f50174 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/TaskFetcher.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import javax.inject.Inject; + +import org.apache.aurora.scheduler.storage.Storage; + +public class TaskFetcher implements Runnable { + private final Storage storage; + + @Inject + TaskFetcher(Storage mStorage) { + storage = mStorage; + } + + @Override + public void run() { + HttpOfferSetImpl.fetchStartingTasks(storage); + } +} diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index f390355f5..3007924e5 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -14,19 +14,35 @@ package io.github.aurora.scheduler.offers; import java.io.IOException; +import java.net.URL; +import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -50,14 +66,52 @@ public class HttpOfferSetImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); private static final String HOST_D = "HOST_D"; + private final Storage storage = MemStorageModule.newEmptyStorage(); + private HttpOfferSetImpl httpOfferSet; + private Set offers; @Before public void setUp() throws IOException { - httpOfferSet = new HttpOfferSetImpl(new HashSet<>()); - httpOfferSet.add(OFFER_A); - httpOfferSet.add(OFFER_B); - httpOfferSet.add(OFFER_C); + storage.write((Storage.MutateWork.NoResult.Quiet) sp -> { + ScheduledTask t0 = makeTask("t0", JOB) + .newBuilder() + .setStatus(ScheduleStatus.PENDING); + ScheduledTask t1 = makeTask("t1", JOB) + .newBuilder() + .setStatus(ScheduleStatus.STARTING); + t1.setAssignedTask(new AssignedTask("t1", + OFFER_B.getOffer().getAgentId().getValue(), + OFFER_B.getOffer().getHostname(), + t1.getAssignedTask().getTask(), + new HashMap<>(), + 0)); + + ScheduledTask t2 = makeTask("t2", JOB) + .newBuilder() + .setStatus(ScheduleStatus.RUNNING); + t2.setAssignedTask(new AssignedTask("t2", + OFFER_C.getOffer().getAgentId().getValue(), + OFFER_C.getOffer().getHostname(), + t1.getAssignedTask().getTask(), + new HashMap<>(), + 0)); + + sp.getUnsafeTaskStore().saveTasks( + IScheduledTask.setFromBuilders(ImmutableList.of(t0, t1, t2))); + }); + + offers = new HashSet<>(); + offers.add(OFFER_A); + offers.add(OFFER_B); + offers.add(OFFER_C); + + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 0, + false); } @Test @@ -67,24 +121,25 @@ public void testProcessResponse() throws IOException { + HOST_A + "\",\"" + HOST_B + "\",\"" + HOST_C + "\"]}"; - List offerSetDiffList = new LinkedList<>(); - List sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + List mOffers = ImmutableList.copyOf(httpOfferSet.values()); + + List sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(0), 0); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(0), 0); // plugin returns less offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(1), 1); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(1), 1); // plugin returns more offers than Aurora has. responseStr = "{\"error\": \"\", \"hosts\": [\"" @@ -92,23 +147,23 @@ public void testProcessResponse() throws IOException { + HOST_B + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(2), 1); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(2), 1); // plugin omits 1 offer & returns 1 extra offer responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(responseStr, offerSetDiffList); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); - assertEquals((long) offerSetDiffList.get(3), 2); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(3), 2); responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + HOST_A + "\",\"" @@ -116,7 +171,7 @@ public void testProcessResponse() throws IOException { + HOST_C + "\"]}"; boolean isException = false; try { - httpOfferSet.processResponse(responseStr, offerSetDiffList); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } @@ -125,7 +180,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"error\": \"error\"}"; isException = false; try { - httpOfferSet.processResponse(responseStr, new LinkedList<>()); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } @@ -134,10 +189,67 @@ public void testProcessResponse() throws IOException { responseStr = "{\"weird\": \"cannot decode this json string\"}"; isException = false; try { - httpOfferSet.processResponse(responseStr, new LinkedList<>()); + httpOfferSet.processResponse(mOffers, responseStr); } catch (IOException ioe) { isException = true; } assertTrue(isException); } + + @Test + public void testGetOrdered() throws IOException { + control.replay(); + IScheduledTask task = makeTask("id", JOB); + TaskGroupKey groupKey = TaskGroupKey.from(task.getAssignedTask().getTask()); + SchedulingFilter.ResourceRequest resourceRequest = + TaskTestUtil.toResourceRequest(task.getAssignedTask().getTask()); + HttpOfferSetImpl.fetchStartingTasks(storage); + + // return the same set of offers + Iterable sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + -1, + false); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 2, + false); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + + // OFFER_B is put in the bottom of list as it has 1 starting task. + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 1, + false); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size(), Iterables.size(sortedOffers)); + HostOffer lastOffer = null; + for (HostOffer o: sortedOffers) { + lastOffer = o; + } + assertEquals(OFFER_B, lastOffer); + + // filter OFFER_B out + httpOfferSet = new HttpOfferSetImpl(offers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 1, + true); + sortedOffers = httpOfferSet.getOrdered(groupKey, resourceRequest); + assertEquals(offers.size() - 1, Iterables.size(sortedOffers)); + } }