Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

Commit

Permalink
Fix duplicate host issue for aurora scheduler, DTCPIOPS-5221 (#445)
Browse files Browse the repository at this point in the history
* Fixed duplicate host issue for aurora scheduler, DTCPIOPS-5221
* keep track the offers that are filtered out.
Co-authored-by: Lawrence Wong <[email protected]>
  • Loading branch information
lawwong1 authored Jun 22, 2022
1 parent 9a1d540 commit 235e6a5
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ public Iterable<HostOffer> getOrdered(TaskGroupKey groupKey, ResourceRequest res
ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime);
LOG.info("Sending request {}", scheduleRequest.jobKey);
String responseStr = sendRequest(scheduleRequest);
orderedOffers = processResponse(goodOffers, responseStr);
} catch (IOException e) {
orderedOffers = processResponse(goodOffers, responseStr, badOffers.size());
} catch (Exception e) {
LOG.error("Failed to schedule the task of {} using {} ",
resourceRequest.getTask().getJob().toString(), endpoint, e);
HttpOfferSetImpl.incrementFailureCount();
Expand Down Expand Up @@ -325,36 +325,33 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
}
}

List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr)
List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr, int badOfferSize)
throws IOException {
ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class);
LOG.info("Received {} offers", response.hosts.size());

Map<String, HostOffer> offerMap = mOffers.stream()
.collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer));
if (!response.error.trim().isEmpty()) {
LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error);
throw new IOException(response.error);
}

List<HostOffer> orderedOffers = response.hosts.stream()
.map(host -> offerMap.get(host))
.filter(offer -> offer != null)
.collect(Collectors.toList());
List<String> extraOffers = response.hosts.stream()
.filter(host -> offerMap.get(host) == null)
.collect(Collectors.toList());

//offSetDiff is the total number of missing offers and the extra offers
long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size())
+ extraOffers.size();
// Use Map<String, List<HostOffer>> to fix offers with duplicate host name issue
Map<String, List<HostOffer>> offerMap = mOffers.stream().
collect(Collectors.groupingBy(offer -> offer.getOffer().getHostname(),
Collectors.mapping(offer -> offer, Collectors.toList())));
List<HostOffer> orderedOffers = response.hosts.stream().map(host -> offerMap.get(host))
.filter(offerList -> offerList != null && !offerList.isEmpty()).
flatMap(e -> e.stream()).collect(Collectors.toList());
List<String> extraOffers = response.hosts.stream().filter(host -> offerMap.get(host) == null).
collect(Collectors.toList());

//offSetDiff is the value of the difference between Aurora offers and response offers
long offSetDiff = mOffers.size() + badOfferSize - orderedOffers.size() + extraOffers.size();
offerSetDiffList.add(offSetDiff);
if (offSetDiff > 0) {
LOG.warn("The number of different offers between the original and received offer sets is {}",
offSetDiff);
if (LOG.isDebugEnabled()) {
List<String> missedOffers = mOffers.stream()
.map(offer -> offer.getAttributes().getHost())
List<String> missedOffers = offerMap.keySet().stream()
.filter(host -> !response.hosts.contains(host))
.collect(Collectors.toList());
LOG.debug("missed offers: {}", missedOffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ public class HttpOfferSetImplTest extends EasyMockTest {
private static final HostOffer OFFER_C = new HostOffer(
Offers.makeOffer("OFFER_C", HOST_C),
IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
private static final HostOffer OFFER_C1 = new HostOffer(
Offers.makeOffer("OFFER_C1", HOST_C),
IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
private static final String HOST_D = "HOST_D";

private final Storage storage = MemStorageModule.newEmptyStorage();

private HttpOfferSetImpl httpOfferSet;
private Set<HostOffer> offers;
private HttpOfferSetImpl duplicateHostsHttpOfferSet;

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -112,6 +116,20 @@ public void setUp() throws IOException {
0,
0,
false);

// duplicate host offers
Set<HostOffer> duplicateHostOffers = new HashSet<>();
duplicateHostOffers.add(OFFER_A);
duplicateHostOffers.add(OFFER_B);
duplicateHostOffers.add(OFFER_C);
duplicateHostOffers.add(OFFER_C1);

duplicateHostsHttpOfferSet = new HttpOfferSetImpl(duplicateHostOffers,
0,
new URL("http://localhost:9090/v1/offerset"),
0,
0,
false);
}

@Test
Expand All @@ -124,7 +142,7 @@ public void testProcessResponse() throws IOException {

List<HostOffer> mOffers = ImmutableList.copyOf(httpOfferSet.values());

List<HostOffer> sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
List<HostOffer> sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
Expand All @@ -135,7 +153,7 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
Expand All @@ -147,7 +165,7 @@ public void testProcessResponse() throws IOException {
+ HOST_B + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
Expand All @@ -159,19 +177,26 @@ public void testProcessResponse() throws IOException {
+ HOST_A + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(3), 2);

// Test with 1 bad offer
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 1);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(4), 3);

responseStr = "{\"error\": \"Error\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";
boolean isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand All @@ -180,7 +205,7 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"error\": \"error\"}";
isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand All @@ -189,7 +214,79 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"weird\": \"cannot decode this json string\"}";
isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
assertTrue(isException);

// Duplicate host test
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";

List<HostOffer> mDuplicateHostOffers = ImmutableList.
copyOf(duplicateHostsHttpOfferSet.values());
assertEquals(mDuplicateHostOffers.size(), 4);

sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers,
responseStr, 0);
assertEquals(sortedOffers.size(), 4);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C);
assertEquals(sortedOffers.get(3).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(5), 0);

// plugin returns less offers than Aurora has.
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_C + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(6), 1);

// plugin returns more offers than Aurora has.
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 4);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(7), 1);

// plugin omits 1 offer & returns 1 extra offer
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_D + "\",\""
+ HOST_B + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(8), 3);

// Test with 1 bad offer
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 1);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(9), 4);

responseStr = "{\"error\": \"Error\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";
isException = false;
try {
duplicateHostsHttpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand Down

0 comments on commit 235e6a5

Please sign in to comment.