Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

Commit

Permalink
[httpofferset] put the bad offers to the bottom of the list (#336)
Browse files Browse the repository at this point in the history
avoid scheduling on some bad nodes: more than N number of starting tasks.

Co-authored-by: Bipra De <[email protected]>
  • Loading branch information
lenhattan86 and biprade authored Oct 25, 2021
1 parent e7a186f commit b2ac222
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 127 deletions.
20 changes: 16 additions & 4 deletions docs/features/custom-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@ We added HTTP OfferSet `io.github.aurora.scheduler.offers.HttpOfferSetModule` th

How to configure HTTP OfferSet?
- offer_set_module=io.github.aurora.scheduler.offers.HttpOfferSetModule
- http_offer_set_timeout_ms is http timeout in milliseconds.
- http_offer_set_max_retries is the number of retries if the module fails to connects to the external REST API server.
If it exceeds the number of retries, Aurora uses the native offerset implementation.
- http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset.
- `http_offer_set_timeout` is http timeout value. `100ms` is the default value.
- `http_offer_set_max_retries` is the number of retries if the module fails to connects to the external REST API server.
`10` is the default value.
If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore.
- `http_offer_set_endpoint` is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset.
- `http_offer_set_filter_enabled` is used to filter bad offers out. By default, it is disabled.
- `http_offer_set_max_starting_tasks_per_slave` is the maximum starting tasks per slave.
If a slave has more than this number of starting tasks, it will be put at the end of offer list (`http_offer_set_filter_enabled=false`)
or taken out (`http_offer_set_filter_enabled=true`).
This feature is useful when a node is vulnerable to a certain number of tasks starting at the same time.
A task often demands a lot of resources during STARTING (e.g., pulling docker images and frequent healchecks).
Too many of them starting at the same time may overload the nodes.
It is disabled by default.
If you want to use this feature, please set this parameter a positive integer number.
- `http_offer_set_task_fetch_interval` determine how often HTTP OfferSet fetches the starting tasks from the `task_store`.
By default, it is `1secs`.

How to implement the external REST API server?
The REST API needs to handle the request in the following format:
Expand Down
202 changes: 154 additions & 48 deletions src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import java.lang.annotation.Target;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
Expand All @@ -33,13 +36,17 @@
import com.google.common.collect.Ordering;
import com.google.inject.Inject;

import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferSet;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -66,16 +73,34 @@
public class HttpOfferSetImpl implements OfferSet {
private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class);

static List<Long> latencyMsList = Collections.synchronizedList(new LinkedList<>());
static List<Long> offerSetDiffList = Collections.synchronizedList(new LinkedList<>());
private static Iterable<IScheduledTask> startingTasks = new LinkedList<>();
private static long failureCount = 0;
private static boolean useEndpoint = false;

private final Set<HostOffer> offers;
private final ObjectMapper jsonMapper = new ObjectMapper();
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private final int timeoutMs;
private final int maxRetries;
private final int maxStartingTasksPerSlave;
private final boolean filterEnabled;

private Integer timeoutMs;
private URL endpoint;
private Integer maxRetries;

public HttpOfferSetImpl(Set<HostOffer> mOffers) {
public HttpOfferSetImpl(Set<HostOffer> mOffers,
int mTimeoutMs,
URL mEndpoint,
int mMaxRetries,
int mMaxStartingTasksPerSlave,
boolean mFilterEnabled) {
offers = mOffers;
timeoutMs = mTimeoutMs;
endpoint = mEndpoint;
maxRetries = mMaxRetries;
maxStartingTasksPerSlave = mMaxStartingTasksPerSlave;
filterEnabled = mFilterEnabled;
}

@VisibleForTesting
Expand All @@ -93,26 +118,67 @@ public HttpOfferSetImpl(Set<HostOffer> mOffers) {
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface TimeoutMs { }

@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface MaxStartingTaskPerSlave { }

@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface FilterEnabled { }

@Inject
public HttpOfferSetImpl(Ordering<HostOffer> ordering,
@TimeoutMs Integer mTimeoutMs,
@Endpoint String url,
@MaxRetries Integer mMaxRetries) {
@MaxRetries Integer mMaxRetries,
@MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave,
@FilterEnabled Boolean mFilterEnabled) {
offers = new ConcurrentSkipListSet<>(ordering);
try {
endpoint = new URL(url);
HttpOfferSetModule.enable(true);
LOG.info("HttpOfferSetModule Enabled.");
endpoint = new URL(Objects.requireNonNull(url));
HttpOfferSetImpl.setUseEndpoint(true);
LOG.info("HttpOfferSetImpl Enabled.");
} catch (MalformedURLException e) {
LOG.error("http_offer_set_endpoint is malformed. ", e);
HttpOfferSetModule.enable(false);
LOG.info("HttpOfferSetModule Disabled.");
}
timeoutMs = mTimeoutMs;
maxRetries = mMaxRetries;
LOG.info("HttpOfferSet's endpoint: " + endpoint);
LOG.info("HttpOfferSet's timeout: " + timeoutMs + " (ms)");
LOG.info("HttpOfferSet's max retries: " + maxRetries);
HttpOfferSetImpl.setUseEndpoint(false);
LOG.info("HttpOfferSetImpl Disabled.");
}
timeoutMs = Objects.requireNonNull(mTimeoutMs);
maxRetries = Objects.requireNonNull(mMaxRetries);
maxStartingTasksPerSlave = Objects.requireNonNull(mMaxStartingTasksPerSlave);
filterEnabled = Objects.requireNonNull(mFilterEnabled);
LOG.info("HttpOfferSet's endpoint: {}", endpoint);
LOG.info("HttpOfferSet's timeout: {} (ms)", timeoutMs);
LOG.info("HttpOfferSet's max retries: {}", maxRetries);
LOG.info("HttpOfferSet's filter enabled: {}", filterEnabled);
LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave);
}

public static synchronized void incrementFailureCount() {
HttpOfferSetImpl.failureCount++;
}

public static synchronized long getFailureCount() {
return HttpOfferSetImpl.failureCount;
}

public static synchronized void resetFailureCount() {
HttpOfferSetImpl.failureCount = 0;
}

public static synchronized void setUseEndpoint(boolean mEnabled) {
HttpOfferSetImpl.useEndpoint = mEnabled;
}

public static synchronized boolean isUseEndpoint() {
return HttpOfferSetImpl.useEndpoint;
}

public static synchronized void fetchStartingTasks(Storage storage) {
startingTasks = Storage.Util.fetchTasks(storage,
Query.unscoped().byStatus(ScheduleStatus.STARTING));
}

@Override
Expand Down Expand Up @@ -144,48 +210,88 @@ public Iterable<HostOffer> values() {

@Override
public Iterable<HostOffer> getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) {
long startTime = System.nanoTime();
// if there are no available offers, do nothing.
if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) {
if (offers.isEmpty()) {
return offers;
}

// count the number of starting tasks per slave
Map<String, Integer> hostTaskCountMap = new HashMap<>();
synchronized (startingTasks) {
for (IScheduledTask task : startingTasks) {
String slaveId = task.getAssignedTask().getSlaveId();
hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1);
}
}

// find the bad offers and put them at the bottom of the list
List<HostOffer> badOffers = new LinkedList<>();
List<HostOffer> goodOffers = new LinkedList<>();
if (maxStartingTasksPerSlave > 0) {
if (!filterEnabled) {
badOffers = offers.stream()
.filter(offer ->
hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0)
>= maxStartingTasksPerSlave)
.collect(Collectors.toList());
}
goodOffers = offers.stream()
.filter(offer ->
hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0)
< maxStartingTasksPerSlave)
.collect(Collectors.toList());

if (!badOffers.isEmpty()) {
LOG.info("the number of bad offers: {}", badOffers.size());
}
} else {
goodOffers = offers.stream().collect(Collectors.toList());
}

// if the external http endpoint was not reachable or we have nothing to send out
if (!HttpOfferSetImpl.isUseEndpoint() || goodOffers.isEmpty()) {
goodOffers.addAll(badOffers);
HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime);
return goodOffers;
}

List<HostOffer> orderedOffers = null;
try {
long startTime = System.nanoTime();
// create json request & send the Rest API request to the scheduler plugin
ScheduleRequest scheduleRequest = createRequest(resourceRequest, startTime);
LOG.info("Sending request " + scheduleRequest.jobKey);
ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime);
LOG.info("Sending request {}", scheduleRequest.jobKey);
String responseStr = sendRequest(scheduleRequest);
orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList);
HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime);
orderedOffers = processResponse(goodOffers, responseStr);
} catch (IOException e) {
LOG.error("Failed to schedule the task of "
+ resourceRequest.getTask().getJob().toString()
+ " using HttpOfferSet. ", e);
HttpOfferSetModule.incFailureCount();
LOG.error("Failed to schedule the task of {} using {} ",
resourceRequest.getTask().getJob().toString(), endpoint, e);
HttpOfferSetImpl.incrementFailureCount();
} finally {
// shutdown HttpOfferSet if failure is consistent.
if (HttpOfferSetModule.getFailureCount() >= maxRetries) {
LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled.");
HttpOfferSetModule.enable(false);
// stop reaching the endpoint if failure is consistent.
if (HttpOfferSetImpl.getFailureCount() >= maxRetries) {
LOG.error("Reaches {} retries. {} is disabled", maxRetries, endpoint);
HttpOfferSetImpl.setUseEndpoint(false);
}
}
if (orderedOffers != null) {
return orderedOffers;
goodOffers = orderedOffers;
}

// fall back to default scheduler.
LOG.warn("Falling back on default ordering.");
return offers;
goodOffers.addAll(badOffers);
HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime);
return goodOffers;
}

//createScheduleRequest creates the ScheduleRequest to be sent out to the plugin.
private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) {
private ScheduleRequest createRequest(List<HostOffer> mOffers,
ResourceRequest resourceRequest,
long startTime) {
Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS),
resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB),
resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB));
List<Host> hosts =
offers.stream()
mOffers.stream()
.map(offer -> new Host(offer.getAttributes().getHost(), new Resource(offer)))
.collect(Collectors.toList());
IJobKey jobKey = resourceRequest.getTask().getJob();
Expand All @@ -194,9 +300,9 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star
return new ScheduleRequest(req, hosts, jobKeyStr);
}

// sendRequest sends resorceRequest to the external plugin endpoint and gets json response.
// sendRequest sends ScheduleRequest to the external endpoint and gets ordered offers in json
private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
LOG.debug("Sending request for " + scheduleRequest.toString());
LOG.debug("Sending request for {}", scheduleRequest);
HttpPost request = new HttpPost(endpoint.toString());
RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(timeoutMs)
Expand All @@ -219,15 +325,15 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
}
}

List<HostOffer> processResponse(String responseStr, List<Long> offerSetDiffList)
List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr)
throws IOException {
ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class);
LOG.info("Received " + response.hosts.size() + " offers");
LOG.info("Received {} offers", response.hosts.size());

Map<String, HostOffer> offerMap = offers.stream()
Map<String, HostOffer> offerMap = mOffers.stream()
.collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer));
if (!response.error.trim().isEmpty()) {
LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error);
LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error);
throw new IOException(response.error);
}

Expand All @@ -240,24 +346,24 @@ List<HostOffer> processResponse(String responseStr, List<Long> offerSetDiffList)
.collect(Collectors.toList());

//offSetDiff is the total number of missing offers and the extra offers
long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size())
long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size())
+ extraOffers.size();
offerSetDiffList.add(offSetDiff);
if (offSetDiff > 0) {
LOG.warn("The number of different offers between the original and received offer sets is "
+ offSetDiff);
LOG.warn("The number of different offers between the original and received offer sets is {}",
offSetDiff);
if (LOG.isDebugEnabled()) {
List<String> missedOffers = offers.stream()
List<String> missedOffers = mOffers.stream()
.map(offer -> offer.getAttributes().getHost())
.filter(host -> !response.hosts.contains(host))
.collect(Collectors.toList());
LOG.debug("missed offers: " + missedOffers);
LOG.debug("extra offers: " + extraOffers);
LOG.debug("missed offers: {}", missedOffers);
LOG.debug("extra offers: {}", extraOffers);
}
}

if (!extraOffers.isEmpty()) {
LOG.error("Cannot find offers " + extraOffers + " in the original offer set");
LOG.error("Cannot find offers {} in the original offer set", extraOffers);
}

return orderedOffers;
Expand Down
Loading

0 comments on commit b2ac222

Please sign in to comment.