diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java new file mode 100644 index 000000000..6af34971b --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -0,0 +1,324 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.io.IOException; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; + +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; +import com.google.gson.Gson; +import com.google.inject.Inject; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferSet; +import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Implementation for OfferSet. + * HttpOfferSetImpl sorts offers using an external endpoint. + * It sends the request (request + offers) to the external endpoint + * and receives the response (sorted offers). + */ +@VisibleForTesting +public class HttpOfferSetImpl implements OfferSet { + + private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class); + private final Set offers; + private final Gson gson = new Gson(); + private Integer timeoutMs; + private URL endpoint; + private Integer maxRetries; + + public HttpOfferSetImpl(Set offers) { + this.offers = offers; + } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface Endpoint { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxRetries { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface TimeoutMs { } + + @Inject + public HttpOfferSetImpl(Ordering ordering, + @TimeoutMs Integer timeoutMs, + @Endpoint String url, + @MaxRetries Integer maxRetries) { + offers = new ConcurrentSkipListSet<>(ordering); + try { + endpoint = new URL(url); + HttpOfferSetModule.enable(true); + LOG.info("HttpOfferSetModule Enabled."); + } catch (MalformedURLException e) { + LOG.error("http_offer_set_endpoint is malformed. ", e); + HttpOfferSetModule.enable(false); + LOG.info("HttpOfferSetModule Disabled."); + } + this.timeoutMs = timeoutMs; + this.maxRetries = maxRetries; + LOG.info("HttpOfferSet's endpoint: " + this.endpoint); + LOG.info("HttpOfferSet's timeout: " + this.timeoutMs + " (ms)"); + LOG.info("HttpOfferSet's max retries: " + this.maxRetries); + } + + @Override + public void add(HostOffer offer) { + offers.add(offer); + } + + @Override + public void remove(HostOffer removed) { + offers.remove(removed); + } + + @Override + public int size() { + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more + // expensive. + // Could track this separately if it turns out to pose problems. + return offers.size(); + } + + @Override + public void clear() { + offers.clear(); + } + + @Override + public Iterable values() { + return offers; + } + + @Override + public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) { + // if there are no available offers, do nothing. + if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) { + return offers; + } + + List orderedOffers = null; + try { + long startTime = System.nanoTime(); + // create json request & send the Rest API request to the scheduler plugin + ScheduleRequest scheduleRequest = this.createRequest(resourceRequest, startTime); + LOG.debug("Sending request " + scheduleRequest.jobKey); + String responseStr = this.sendRequest(scheduleRequest); + orderedOffers = processResponse(responseStr); + LOG.debug("received response for " + scheduleRequest.jobKey); + HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime); + } catch (IOException e) { + LOG.error("Failed to schedule the task of " + + resourceRequest.getTask().getJob().toString() + + " using HttpOfferSet. ", e); + HttpOfferSetModule.incFailureCount(); + } finally { + // shutdown HttpOfferSet if failure is consistent. + if (HttpOfferSetModule.getFailureCount() >= maxRetries) { + LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled."); + HttpOfferSetModule.enable(false); + } + } + if (orderedOffers != null) { + return orderedOffers; + } + + // fall back to default scheduler. + LOG.warn("Falling back on default ordering."); + return offers; + } + + //createScheduleRequest creates the ScheduleRequest to be sent out to the plugin. + private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) { + Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS), + resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB), + resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB)); + List hosts = offers.stream().map(offer -> new Host(offer.getAttributes().getHost(), + new Resource(offer.getResourceBag(true).valueOf(ResourceType.CPUS) + + offer.getResourceBag(false).valueOf(ResourceType.CPUS), + offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) + + offer.getResourceBag(false).valueOf(ResourceType.RAM_MB), + offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) + + offer.getResourceBag(false).valueOf(ResourceType.DISK_MB)))) + .collect(Collectors.toList()); + IJobKey jobKey = resourceRequest.getTask().getJob(); + String jobKeyStr = jobKey.getRole() + "-" + jobKey.getEnvironment() + "-" + jobKey.getName() + + "@" + startTime; + return new ScheduleRequest(req, hosts, jobKeyStr); + } + + // sendRequest sends resorceRequest to the external plugin endpoint and gets json response. + private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { + LOG.debug("Sending request for " + scheduleRequest.toString()); + CloseableHttpClient httpClient = HttpClients.createDefault(); + try { + HttpPost request = new HttpPost(this.endpoint.toString()); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(this.timeoutMs) + .setConnectTimeout(this.timeoutMs) + .setSocketTimeout(this.timeoutMs) + .build(); + request.setConfig(requestConfig); + request.addHeader("Content-Type", "application/json; utf-8"); + request.addHeader("Accept", "application/json"); + request.setEntity(new StringEntity(gson.toJson(scheduleRequest))); + CloseableHttpResponse response = httpClient.execute(request); + try { + HttpEntity entity = response.getEntity(); + if (entity == null) { + throw new IOException("Empty response from the external http endpoint."); + } else { + return EntityUtils.toString(entity); + } + } finally { + response.close(); + } + } finally { + httpClient.close(); + } + } + + List processResponse(String responseStr) throws IOException { + // process the response + ScheduleResponse response = gson.fromJson(responseStr, ScheduleResponse.class); + if (response.error == null || response.hosts == null) { + LOG.error("Response: " + responseStr); + throw new IOException("response is malformed"); + } + + Map offerMap = offers.stream() + .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); + List orderedOffers = new ArrayList<>(); + if (response.error.trim().isEmpty()) { + for (String host : response.hosts) { + HostOffer offer = offerMap.get(host); + if (offer == null) { + LOG.warn("Cannot find host " + host + " in the response"); + } else { + orderedOffers.add(offer); + } + } + LOG.debug("Sorted offers: " + String.join(",", + response.hosts.subList(0, Math.min(5, response.hosts.size())) + "...")); + if (orderedOffers.isEmpty()) { + LOG.warn("Cannot find any offers for this task. " + + "Please check the condition of these hosts: " + + Util.getHostnames(offers)); + } + return orderedOffers; + } else { + LOG.error("Unable to get sorted offers due to " + response.error); + throw new IOException(response.error); + } + } + + // Host represents for each host offer. + static class Host { + String name; + Resource offer; + + Host(String mName, Resource mOffer) { + name = mName; + offer = mOffer; + } + + @Override + public String toString() { + return "Host{" + "name='" + name + '\'' + ", offer=" + offer + '}'; + } + } + + // Resource is used between Aurora and MagicMatch. + static class Resource { + double cpu; + double memory; + double disk; + + Resource(double cpu, double memory, double disk) { + this.cpu = cpu; + this.memory = memory; + this.disk = disk; + } + + @Override + public String toString() { + return "Resource{" + "cpu=" + cpu + ", memory=" + memory + ", disk=" + disk + '}'; + } + } + + // ScheduleRequest is the request sent to MagicMatch. + static class ScheduleRequest { + String jobKey; + Resource request; + List hosts; + + ScheduleRequest(Resource request, List hosts, String jobKey) { + this.request = request; + this.hosts = hosts; + this.jobKey = jobKey; + } + + @Override + public String toString() { + return "ScheduleRequest{" + "jobKey=" + jobKey + "request=" + request + + ", hosts=" + hosts + '}'; + } + } + + // ScheduleResponse is the scheduling result responded by MagicMatch + static class ScheduleResponse { + String error; + List hosts; + + @Override + public String toString() { + return "ScheduleResponse{" + "error='" + error + '\'' + ", hosts=" + hosts + '}'; + } + } +} diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java new file mode 100644 index 000000000..d0347a647 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetModule.java @@ -0,0 +1,173 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.inject.Qualifier; +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.config.CommandLine; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.OfferOrderBuilder; +import org.apache.aurora.scheduler.offers.OfferSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +public class HttpOfferSetModule extends AbstractModule { + private final CliOptions cliOptions; + private final Options options; + private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetModule.class); + static List latencyMsList = Collections.synchronizedList(new LinkedList<>()); + private static long failureCount = 0; + private static boolean enabled = false; + + public static synchronized void incFailureCount() { + HttpOfferSetModule.failureCount++; + } + + public static synchronized long getFailureCount() { + return HttpOfferSetModule.failureCount; + } + + public static synchronized void resetFailureCount() { + HttpOfferSetModule.failureCount = 0; + } + + public static synchronized void enable(boolean mEnabled) { + HttpOfferSetModule.enabled = mEnabled; + } + + public static synchronized boolean isEnabled() { + return HttpOfferSetModule.enabled; + } + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-http_offer_set_endpoint") + String httpOfferSetEndpoint = "http://localhost:9090/v1/offerset"; + + @Parameter(names = "-http_offer_set_timeout_ms") + int httpOfferSetTimeoutMs = 100; + + @Parameter(names = "-http_offer_set_max_retries") + int httpOfferSetMaxRetries = 10; + } + + static { + // Statically register custom options for CLI parsing. + CommandLine.registerCustomOptions(new Options()); + } + + public HttpOfferSetModule(CliOptions options) { + this.cliOptions = options; + this.options = options.getCustom(Options.class); + } + + @Override + protected void configure() { + + install(new PrivateModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).toInstance(OfferOrderBuilder.create(cliOptions.offer.offerOrder)); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.TimeoutMs.class) + .toInstance(options.httpOfferSetTimeoutMs); + bind(String.class) + .annotatedWith(HttpOfferSetImpl.Endpoint.class) + .toInstance(options.httpOfferSetEndpoint); + bind(Integer.class) + .annotatedWith(HttpOfferSetImpl.MaxRetries.class) + .toInstance(options.httpOfferSetMaxRetries); + bind(OfferSet.class).to(HttpOfferSetImpl.class).in(Singleton.class); + expose(OfferSet.class); + } + }); + + bind(StatCalculator.class).in(com.google.inject.Singleton.class); + bind(ScheduledExecutorService.class) + .annotatedWith(Executor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG)); + bind(Integer.class) + .annotatedWith(RefreshRateMs.class) + .toInstance(cliOptions.sla.slaRefreshInterval.as(Time.MILLISECONDS).intValue()); + bind(StatUpdater.class).in(com.google.inject.Singleton.class); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(StatUpdater.class); + } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface Executor { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface RefreshRateMs { } + + static class StatUpdater extends AbstractIdleService { + private final ScheduledExecutorService executor; + private final StatCalculator calculator; + private final Integer refreshRateMs; + + @Inject + StatUpdater( + @Executor ScheduledExecutorService executor, + StatCalculator calculator, + @RefreshRateMs Integer refreshRateMs) { + this.executor = requireNonNull(executor); + this.calculator = requireNonNull(calculator); + this.refreshRateMs = refreshRateMs; + } + + @Override + protected void startUp() { + long interval = this.refreshRateMs; + executor.scheduleAtFixedRate(calculator, 0, interval, TimeUnit.MILLISECONDS); + } + + @Override + protected void shutDown() { + // Ignored. + } + } +} diff --git a/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java new file mode 100644 index 000000000..4ef1c056b --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/StatCalculator.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.inject.Inject; + +import com.google.common.base.Supplier; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.apache.aurora.common.stats.StatsProvider; + +import static java.util.Objects.requireNonNull; + +public class StatCalculator implements Runnable { + private final LoadingCache metricCache; + + private static class Counter implements Supplier { + private final AtomicReference value = new AtomicReference<>((Number) 0); + private final StatsProvider statsProvider; + private boolean exported; + + Counter(StatsProvider statsProvider) { + this.statsProvider = statsProvider; + } + + @Override + public Number get() { + return value.get(); + } + + private void set(String name, Number newValue) { + if (!exported) { + statsProvider.makeGauge(name, this); + exported = true; + } + value.set(newValue); + } + } + + @Inject + StatCalculator(final StatsProvider statsProvider) { + requireNonNull(statsProvider); + this.metricCache = CacheBuilder.newBuilder().build( + new CacheLoader() { + public StatCalculator.Counter load(String key) { + return new StatCalculator.Counter(statsProvider.untracked()); + } + }); + } + + @Override + public void run() { + float medianLatency = + Util.percentile(HttpOfferSetModule.latencyMsList, 50.0) + .floatValue() / 1000000; + float avgLatency = + (float) Util.avg(HttpOfferSetModule.latencyMsList) / 1000000; + float worstLatency = + (float) Util.max(HttpOfferSetModule.latencyMsList) / 1000000; + + String medianLatencyName = "http_offer_set_median_latency_ms"; + metricCache.getUnchecked(medianLatencyName).set(medianLatencyName, medianLatency); + String worstLatencyName = "http_offer_set_worst_latency_ms"; + metricCache.getUnchecked(worstLatencyName).set(worstLatencyName, worstLatency); + String avgLatencyName = "http_offer_set_avg_latency_ms"; + metricCache.getUnchecked(avgLatencyName).set(avgLatencyName, avgLatency); + String failureCountName = "http_offer_set_failure_count"; + metricCache.getUnchecked(failureCountName).set(failureCountName, + HttpOfferSetModule.getFailureCount()); + + // reset the stats. + HttpOfferSetModule.latencyMsList.clear(); + HttpOfferSetModule.resetFailureCount(); + } +} diff --git a/src/main/java/io/github/aurora/scheduler/offers/Util.java b/src/main/java/io/github/aurora/scheduler/offers/Util.java new file mode 100644 index 000000000..43da3b970 --- /dev/null +++ b/src/main/java/io/github/aurora/scheduler/offers/Util.java @@ -0,0 +1,76 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.util.LinkedList; +import java.util.List; + +import com.google.common.math.Quantiles; + +import org.apache.aurora.scheduler.offers.HostOffer; + +public final class Util { + + private Util() { + // Utility class. + } + + public static Number percentile(List list, double percentile) { + if (list.isEmpty()) { + return 0.0; + } + + // index should be a full integer. use quantile scale to allow reporting of percentile values + // such as p99.9. + double percentileCopy = percentile; + int quantileScale = 100; + while ((percentileCopy - Math.floor(percentileCopy)) > 0) { + quantileScale *= 10; + percentileCopy *= 10; + } + + return Quantiles.scale(quantileScale).index((int) Math.floor(quantileScale - percentileCopy)) + .compute(list); + } + + public static long max(List list) { + long max = 0; + for (long e : list) { + if (e > max) { + max = e; + } + } + return max; + } + + public static long avg(List list) { + if (list.isEmpty()) { + return 0; + } + + long avg = 0; + for (long e : list) { + avg += e; + } + return avg / list.size(); + } + + public static List getHostnames(Iterable offers) { + List hostnames = new LinkedList<>(); + for (HostOffer offer: offers) { + hostnames.add(offer.getOffer().getHostname()); + } + return hostnames; + } +} diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java new file mode 100644 index 000000000..475a5bc78 --- /dev/null +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -0,0 +1,117 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.aurora.scheduler.offers; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.scheduler.offers.HostOffer; +import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HttpOfferSetImplTest extends EasyMockTest { + + private static final String HOST_A = "HOST_A"; + private static final IHostAttributes HOST_ATTRIBUTES_A = + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A)); + private static final HostOffer OFFER_A = new HostOffer( + Offers.makeOffer("OFFER_A", HOST_A), + HOST_ATTRIBUTES_A); + private static final String HOST_B = "HOST_B"; + private static final IHostAttributes HOST_ATTRIBUTES_B = + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_B)); + private static final HostOffer OFFER_B = new HostOffer( + Offers.makeOffer("OFFER_B", HOST_B), + HOST_ATTRIBUTES_B); + private static final String HOST_C = "HOST_C"; + private static final HostOffer OFFER_C = new HostOffer( + Offers.makeOffer("OFFER_C", HOST_C), + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); + private static final String HOST_D = "HOST_D"; + + private HttpOfferSetImpl httpOfferSet; + + @Before + public void setUp() throws IOException { + httpOfferSet = new HttpOfferSetImpl(new HashSet<>()); + httpOfferSet.add(OFFER_A); + httpOfferSet.add(OFFER_B); + httpOfferSet.add(OFFER_C); + } + + @Test + public void testProcessResponse() throws IOException { + control.replay(); + String responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + List sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 3); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + + // plugin returns less offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + + // plugin returns more offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_D + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = httpOfferSet.processResponse(responseStr); + assertEquals(sortedOffers.size(), 3); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + + responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + boolean isException = false; + try { + httpOfferSet.processResponse(responseStr); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + + responseStr = "{\"error\": \"\"}"; + isException = false; + try { + httpOfferSet.processResponse(responseStr); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + } +}