Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

[httpofferset] put the bad offers to the bottom of the list #336

Merged
merged 43 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d8f6f1f
performance & monitoring improvement
lenhattan86 May 14, 2021
c4e650b
use the same CloseableHttpClient for multiple requests
lenhattan86 May 14, 2021
d3d77a2
fix compiling error
lenhattan86 May 15, 2021
42bf2f7
fix CI error
lenhattan86 May 18, 2021
2a35900
fix CI error
lenhattan86 May 18, 2021
1196da5
resolve review comments
lenhattan86 May 19, 2021
81b887c
resolve PR comments
lenhattan86 May 19, 2021
6d836ea
reflect comments
lenhattan86 May 20, 2021
113a0bc
polish code
lenhattan86 May 20, 2021
dc594d5
polish code
lenhattan86 May 20, 2021
9d6a7f6
remove this.
lenhattan86 May 20, 2021
7bd29c3
update aurora version 0.24.0
lenhattan86 Jun 18, 2021
bd36a72
fix release-candidate error
lenhattan86 Jun 18, 2021
ff11201
Updating CHANGELOG for 0.24.2 release.
lenhattan86 Jun 18, 2021
e600867
Incrementing snapshot version to 0.25.0-SNAPSHOT.
lenhattan86 Jun 18, 2021
c4a6fc3
fixed release candidate
lenhattan86 Jun 18, 2021
48e75b3
fixed release candidate script
lenhattan86 Jun 18, 2021
22bc79e
aurora version 0.24.2
lenhattan86 Jun 18, 2021
6aff891
Updating CHANGELOG.md for 0.24.2 release.
lenhattan86 Jun 18, 2021
daffeee
Incrementing snapshot version to 0.25.0-SNAPSHOT.
lenhattan86 Jun 18, 2021
9f94253
Merge pull request #1 from lenhattan86/origin/http_offer_set_perf_imp…
lenhattan86 Jun 18, 2021
b9c9ce6
Merge branch 'master' into master
lenhattan86 Sep 7, 2021
259eb54
Merge pull request #2 from aurora-scheduler/master
lenhattan86 Sep 7, 2021
cc35f9a
Merge branch 'aurora-scheduler:master' into master
lenhattan86 Oct 5, 2021
1092fee
Merge branch 'aurora-scheduler:master' into master
lenhattan86 Oct 6, 2021
4f437b8
Merge branch 'aurora-scheduler:master' into master
lenhattan86 Oct 11, 2021
8fae700
put the bad offers with too many starting tasks to the bottom
lenhattan86 Oct 15, 2021
e4fccf2
put the bad offers with too many starting tasks to the bottom
lenhattan86 Oct 15, 2021
e04afb9
fix unit test
lenhattan86 Oct 15, 2021
93d85f9
fix unit test
lenhattan86 Oct 15, 2021
a1f8c84
fix unit test
lenhattan86 Oct 15, 2021
d8d7019
Merge branch 'aurora-scheduler:master' into docker_bug
lenhattan86 Oct 18, 2021
4c15ab0
update on unit tests
lenhattan86 Oct 19, 2021
7a60c49
Merge branch 'docker_bug' of https://github.com/lenhattan86/scheduler…
lenhattan86 Oct 19, 2021
5d48911
when no offers, skip calculating stats
lenhattan86 Oct 19, 2021
7b30d1b
fix java.util.ConcurrentModificationException
lenhattan86 Oct 19, 2021
574bb62
periodically fetching the starting tasks
lenhattan86 Oct 20, 2021
7026337
Update docs/features/custom-plugins.md
lenhattan86 Oct 21, 2021
0aaf21b
Update src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetIm…
lenhattan86 Oct 21, 2021
c116586
address comment
lenhattan86 Oct 21, 2021
5664d89
Apply suggestions from code review
lenhattan86 Oct 21, 2021
4af9021
addressed comments
lenhattan86 Oct 21, 2021
1621f86
add parameter
lenhattan86 Oct 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/features/custom-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@ We added HTTP OfferSet `io.github.aurora.scheduler.offers.HttpOfferSetModule` th

How to configure HTTP OfferSet?
- offer_set_module=io.github.aurora.scheduler.offers.HttpOfferSetModule
- http_offer_set_timeout_ms is http timeout in milliseconds.
- http_offer_set_timeout is http timeout value. `100ms` is the default value.
- http_offer_set_max_retries is the number of retries if the module fails to connects to the external REST API server.
If it exceeds the number of retries, Aurora uses the native offerset implementation.
`10` is the default value.
If it exceeds the number of retries, HTTP OfferSet will not reach the external endpoint anymore.
- http_offer_set_endpoint is the REST API endpoint, e.g. http://127.0.0.1:9090/v1/offerset.
- http_offer_set_max_starting_tasks_per_slave is the maximum starting tasks per slave.
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
If a slave has more this number of starting tasks, it will be put at the end of offer list.
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
If you want to use this feature, please set this parameter a positive integer number.
It is disabled by default.
- http_offer_set_task_fetch_interval determine how often HTTP OfferSet fetches the starting tasks from the `task_store`.
By default, it is `1secs`.

How to implement the external REST API server?
The REST API needs to handle the request in the following format:
Expand Down
189 changes: 141 additions & 48 deletions src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import java.lang.annotation.Target;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
Expand All @@ -33,13 +36,17 @@
import com.google.common.collect.Ordering;
import com.google.inject.Inject;

import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferSet;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -66,16 +73,31 @@
public class HttpOfferSetImpl implements OfferSet {
private static final Logger LOG = LoggerFactory.getLogger(HttpOfferSetImpl.class);

static List<Long> latencyMsList = Collections.synchronizedList(new LinkedList<>());
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
static List<Long> offerSetDiffList = Collections.synchronizedList(new LinkedList<>());
private static Iterable<IScheduledTask> startingTasks = new LinkedList<>();
private static long failureCount = 0;
private static boolean useEndpoint = false;

private final Set<HostOffer> offers;
private final ObjectMapper jsonMapper = new ObjectMapper();
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private final int timeoutMs;
private final int maxRetries;
private final int maxStartingTasksPerSlave;

private Integer timeoutMs;
private URL endpoint;
private Integer maxRetries;

public HttpOfferSetImpl(Set<HostOffer> mOffers) {
public HttpOfferSetImpl(Set<HostOffer> mOffers,
int mTimeoutMs,
URL mEndpoint,
int mMaxRetries,
int mMaxStartingTasksPerSlave) {
offers = mOffers;
timeoutMs = mTimeoutMs;
endpoint = mEndpoint;
maxRetries = mMaxRetries;
maxStartingTasksPerSlave = mMaxStartingTasksPerSlave;
}

@VisibleForTesting
Expand All @@ -93,26 +115,59 @@ public HttpOfferSetImpl(Set<HostOffer> mOffers) {
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface TimeoutMs { }

@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface MaxStartingTaskPerSlave { }
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved

@Inject
public HttpOfferSetImpl(Ordering<HostOffer> ordering,
@TimeoutMs Integer mTimeoutMs,
@Endpoint String url,
@MaxRetries Integer mMaxRetries) {
@MaxRetries Integer mMaxRetries,
@MaxStartingTaskPerSlave Integer mMaxStartingTasksPerSlave) {
offers = new ConcurrentSkipListSet<>(ordering);
try {
endpoint = new URL(url);
HttpOfferSetModule.enable(true);
LOG.info("HttpOfferSetModule Enabled.");
endpoint = new URL(Objects.requireNonNull(url));
HttpOfferSetImpl.setUseEndpoint(true);
LOG.info("HttpOfferSetImpl Enabled.");
} catch (MalformedURLException e) {
LOG.error("http_offer_set_endpoint is malformed. ", e);
HttpOfferSetModule.enable(false);
LOG.info("HttpOfferSetModule Disabled.");
}
timeoutMs = mTimeoutMs;
maxRetries = mMaxRetries;
LOG.info("HttpOfferSet's endpoint: " + endpoint);
LOG.info("HttpOfferSet's timeout: " + timeoutMs + " (ms)");
LOG.info("HttpOfferSet's max retries: " + maxRetries);
HttpOfferSetImpl.setUseEndpoint(false);
LOG.info("HttpOfferSetImpl Disabled.");
}
timeoutMs = Objects.requireNonNull(mTimeoutMs);
maxRetries = Objects.requireNonNull(mMaxRetries);
maxStartingTasksPerSlave = Objects.requireNonNull(mMaxStartingTasksPerSlave);
LOG.info("HttpOfferSet's endpoint: {}", endpoint);
LOG.info("HttpOfferSet's timeout: {} (ms)", timeoutMs);
LOG.info("HttpOfferSet's max retries: {}", maxRetries);
LOG.info("HttpOfferSet's max number of starting tasks per slave: {}", maxStartingTasksPerSlave);
}

public static synchronized void incFailureCount() {
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
HttpOfferSetImpl.failureCount++;
}

public static synchronized long getFailureCount() {
return HttpOfferSetImpl.failureCount;
}

public static synchronized void resetFailureCount() {
HttpOfferSetImpl.failureCount = 0;
}

public static synchronized void setUseEndpoint(boolean mEnabled) {
HttpOfferSetImpl.useEndpoint = mEnabled;
}

public static synchronized boolean isUseEndpoint() {
return HttpOfferSetImpl.useEndpoint;
}

public static synchronized void fetchStartingTasks(Storage storage) {
startingTasks = Storage.Util.fetchTasks(storage,
Query.unscoped().byStatus(ScheduleStatus.STARTING));
}

@Override
Expand Down Expand Up @@ -144,48 +199,86 @@ public Iterable<HostOffer> values() {

@Override
public Iterable<HostOffer> getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) {
long startTime = System.nanoTime();
// if there are no available offers, do nothing.
if (offers.isEmpty() || !HttpOfferSetModule.isEnabled()) {
if (offers.isEmpty()) {
return offers;
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
}

// count the number of starting tasks per slave
Map<String, Integer> hostTaskCountMap = new HashMap<>();
synchronized (startingTasks) {
for (IScheduledTask task : startingTasks) {
String slaveId = task.getAssignedTask().getSlaveId();
hostTaskCountMap.put(slaveId, hostTaskCountMap.getOrDefault(slaveId, 0) + 1);
}
}

// find the bad offers and put them at the bottom of the list
List<HostOffer> badOffers = new LinkedList<>();
List<HostOffer> goodOffers = new LinkedList<>();
if (maxStartingTasksPerSlave > 0) {
badOffers = offers.stream()
.filter(offer ->
hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0)
>= maxStartingTasksPerSlave)
.collect(Collectors.toList());
goodOffers = offers.stream()
.filter(offer ->
hostTaskCountMap.getOrDefault(offer.getOffer().getAgentId().getValue(), 0)
< maxStartingTasksPerSlave)
.collect(Collectors.toList());

if (!badOffers.isEmpty()) {
LOG.info("the number of bad offers: {}", badOffers.size());
}
} else {
goodOffers = offers.stream().collect(Collectors.toList());
}

// if the external http endpoint was not reachable or we have nothing to send out
if (!HttpOfferSetImpl.isUseEndpoint() || goodOffers.isEmpty()) {
goodOffers.addAll(badOffers);
HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime);
return goodOffers;
}

List<HostOffer> orderedOffers = null;
try {
long startTime = System.nanoTime();
// create json request & send the Rest API request to the scheduler plugin
ScheduleRequest scheduleRequest = createRequest(resourceRequest, startTime);
LOG.info("Sending request " + scheduleRequest.jobKey);
ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime);
LOG.info("Sending request {}", scheduleRequest.jobKey);
String responseStr = sendRequest(scheduleRequest);
orderedOffers = processResponse(responseStr, HttpOfferSetModule.offerSetDiffList);
HttpOfferSetModule.latencyMsList.add(System.nanoTime() - startTime);
orderedOffers = processResponse(goodOffers, responseStr);
} catch (IOException e) {
LOG.error("Failed to schedule the task of "
+ resourceRequest.getTask().getJob().toString()
+ " using HttpOfferSet. ", e);
HttpOfferSetModule.incFailureCount();
LOG.error("Failed to schedule the task of {} using {} ",
resourceRequest.getTask().getJob().toString(), endpoint, e);
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
HttpOfferSetImpl.incFailureCount();
} finally {
// shutdown HttpOfferSet if failure is consistent.
if (HttpOfferSetModule.getFailureCount() >= maxRetries) {
LOG.error("Reaches " + maxRetries + ". HttpOfferSetModule Disabled.");
HttpOfferSetModule.enable(false);
// stop reaching the endpoint if failure is consistent.
if (HttpOfferSetImpl.getFailureCount() >= maxRetries) {
LOG.error("Reaches {} retries. {} is disabled", maxRetries, endpoint);
HttpOfferSetImpl.setUseEndpoint(false);
}
}
if (orderedOffers != null) {
return orderedOffers;
goodOffers = orderedOffers;
}

// fall back to default scheduler.
LOG.warn("Falling back on default ordering.");
return offers;
goodOffers.addAll(badOffers);
HttpOfferSetImpl.latencyMsList.add(System.nanoTime() - startTime);
return goodOffers;
}

//createScheduleRequest creates the ScheduleRequest to be sent out to the plugin.
private ScheduleRequest createRequest(ResourceRequest resourceRequest, long startTime) {
private ScheduleRequest createRequest(List<HostOffer> mOffers,
ResourceRequest resourceRequest,
long startTime) {
Resource req = new Resource(resourceRequest.getResourceBag().valueOf(ResourceType.CPUS),
resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB),
resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB));
List<Host> hosts =
offers.stream()
mOffers.stream()
.map(offer -> new Host(offer.getAttributes().getHost(), new Resource(offer)))
.collect(Collectors.toList());
IJobKey jobKey = resourceRequest.getTask().getJob();
Expand All @@ -194,9 +287,9 @@ private ScheduleRequest createRequest(ResourceRequest resourceRequest, long star
return new ScheduleRequest(req, hosts, jobKeyStr);
}

// sendRequest sends resorceRequest to the external plugin endpoint and gets json response.
// sendRequest sends ScheduleRequest to the external endpoint and gets ordered offers in json
private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
LOG.debug("Sending request for " + scheduleRequest.toString());
LOG.debug("Sending request for {}", scheduleRequest);
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
HttpPost request = new HttpPost(endpoint.toString());
RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(timeoutMs)
Expand All @@ -219,15 +312,15 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
}
}

List<HostOffer> processResponse(String responseStr, List<Long> offerSetDiffList)
List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr)
throws IOException {
ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class);
LOG.info("Received " + response.hosts.size() + " offers");
LOG.info("Received {} offers", response.hosts.size());

Map<String, HostOffer> offerMap = offers.stream()
Map<String, HostOffer> offerMap = mOffers.stream()
.collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer));
if (!response.error.trim().isEmpty()) {
LOG.error("Unable to receive offers from " + endpoint + " due to " + response.error);
LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error);
throw new IOException(response.error);
}

Expand All @@ -240,24 +333,24 @@ List<HostOffer> processResponse(String responseStr, List<Long> offerSetDiffList)
.collect(Collectors.toList());

//offSetDiff is the total number of missing offers and the extra offers
long offSetDiff = offers.size() - (response.hosts.size() - extraOffers.size())
long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size())
+ extraOffers.size();
offerSetDiffList.add(offSetDiff);
if (offSetDiff > 0) {
LOG.warn("The number of different offers between the original and received offer sets is "
+ offSetDiff);
LOG.warn("The number of different offers between the original and received offer sets is {}",
offSetDiff);
if (LOG.isDebugEnabled()) {
List<String> missedOffers = offers.stream()
List<String> missedOffers = mOffers.stream()
.map(offer -> offer.getAttributes().getHost())
.filter(host -> !response.hosts.contains(host))
.collect(Collectors.toList());
LOG.debug("missed offers: " + missedOffers);
LOG.debug("extra offers: " + extraOffers);
LOG.debug("missed offers: {}", missedOffers);
LOG.debug("extra offers: {}", extraOffers);
}
}

if (!extraOffers.isEmpty()) {
LOG.error("Cannot find offers " + extraOffers + " in the original offer set");
LOG.error("Cannot find offers {} in the original offer set", extraOffers);
}

return orderedOffers;
Expand Down
Loading