diff --git a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java index d8f741a18..b012ee183 100644 --- a/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java +++ b/src/main/java/io/github/aurora/scheduler/offers/HttpOfferSetImpl.java @@ -262,8 +262,8 @@ public Iterable getOrdered(TaskGroupKey groupKey, ResourceRequest res ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime); LOG.info("Sending request {}", scheduleRequest.jobKey); String responseStr = sendRequest(scheduleRequest); - orderedOffers = processResponse(goodOffers, responseStr); - } catch (IOException e) { + orderedOffers = processResponse(goodOffers, responseStr, badOffers.size()); + } catch (Exception e) { LOG.error("Failed to schedule the task of {} using {} ", resourceRequest.getTask().getJob().toString(), endpoint, e); HttpOfferSetImpl.incrementFailureCount(); @@ -325,36 +325,33 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException { } } - List processResponse(List mOffers, String responseStr) + List processResponse(List mOffers, String responseStr, int badOfferSize) throws IOException { ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class); LOG.info("Received {} offers", response.hosts.size()); - Map offerMap = mOffers.stream() - .collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer)); if (!response.error.trim().isEmpty()) { LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error); throw new IOException(response.error); } - - List orderedOffers = response.hosts.stream() - .map(host -> offerMap.get(host)) - .filter(offer -> offer != null) - .collect(Collectors.toList()); - List extraOffers = response.hosts.stream() - .filter(host -> offerMap.get(host) == null) - .collect(Collectors.toList()); - - //offSetDiff is the total number of missing offers and the extra offers - long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size()) - + extraOffers.size(); + // Use Map> to fix offers with duplicate host name issue + Map> offerMap = mOffers.stream(). + collect(Collectors.groupingBy(offer -> offer.getOffer().getHostname(), + Collectors.mapping(offer -> offer, Collectors.toList()))); + List orderedOffers = response.hosts.stream().map(host -> offerMap.get(host)) + .filter(offerList -> offerList != null && !offerList.isEmpty()). + flatMap(e -> e.stream()).collect(Collectors.toList()); + List extraOffers = response.hosts.stream().filter(host -> offerMap.get(host) == null). + collect(Collectors.toList()); + + //offSetDiff is the value of the difference between Aurora offers and response offers + long offSetDiff = mOffers.size() + badOfferSize - orderedOffers.size() + extraOffers.size(); offerSetDiffList.add(offSetDiff); if (offSetDiff > 0) { LOG.warn("The number of different offers between the original and received offer sets is {}", offSetDiff); if (LOG.isDebugEnabled()) { - List missedOffers = mOffers.stream() - .map(offer -> offer.getAttributes().getHost()) + List missedOffers = offerMap.keySet().stream() .filter(host -> !response.hosts.contains(host)) .collect(Collectors.toList()); LOG.debug("missed offers: {}", missedOffers); diff --git a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java index 3007924e5..6900e9b57 100644 --- a/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java +++ b/src/test/java/io/github/aurora/scheduler/offers/HttpOfferSetImplTest.java @@ -64,12 +64,16 @@ public class HttpOfferSetImplTest extends EasyMockTest { private static final HostOffer OFFER_C = new HostOffer( Offers.makeOffer("OFFER_C", HOST_C), IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); + private static final HostOffer OFFER_C1 = new HostOffer( + Offers.makeOffer("OFFER_C1", HOST_C), + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); private static final String HOST_D = "HOST_D"; private final Storage storage = MemStorageModule.newEmptyStorage(); private HttpOfferSetImpl httpOfferSet; private Set offers; + private HttpOfferSetImpl duplicateHostsHttpOfferSet; @Before public void setUp() throws IOException { @@ -112,6 +116,20 @@ public void setUp() throws IOException { 0, 0, false); + + // duplicate host offers + Set duplicateHostOffers = new HashSet<>(); + duplicateHostOffers.add(OFFER_A); + duplicateHostOffers.add(OFFER_B); + duplicateHostOffers.add(OFFER_C); + duplicateHostOffers.add(OFFER_C1); + + duplicateHostsHttpOfferSet = new HttpOfferSetImpl(duplicateHostOffers, + 0, + new URL("http://localhost:9090/v1/offerset"), + 0, + 0, + false); } @Test @@ -124,7 +142,7 @@ public void testProcessResponse() throws IOException { List mOffers = ImmutableList.copyOf(httpOfferSet.values()); - List sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); + List sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); @@ -135,7 +153,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"error\": \"\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); @@ -147,7 +165,7 @@ public void testProcessResponse() throws IOException { + HOST_B + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0); assertEquals(sortedOffers.size(), 3); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); @@ -159,19 +177,26 @@ public void testProcessResponse() throws IOException { + HOST_A + "\",\"" + HOST_D + "\",\"" + HOST_C + "\"]}"; - sortedOffers = httpOfferSet.processResponse(mOffers, responseStr); + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0); assertEquals(sortedOffers.size(), 2); assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(3), 2); + // Test with 1 bad offer + sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 1); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(4), 3); + responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + HOST_A + "\",\"" + HOST_B + "\",\"" + HOST_C + "\"]}"; boolean isException = false; try { - httpOfferSet.processResponse(mOffers, responseStr); + httpOfferSet.processResponse(mOffers, responseStr, 0); } catch (IOException ioe) { isException = true; } @@ -180,7 +205,7 @@ public void testProcessResponse() throws IOException { responseStr = "{\"error\": \"error\"}"; isException = false; try { - httpOfferSet.processResponse(mOffers, responseStr); + httpOfferSet.processResponse(mOffers, responseStr, 0); } catch (IOException ioe) { isException = true; } @@ -189,7 +214,79 @@ public void testProcessResponse() throws IOException { responseStr = "{\"weird\": \"cannot decode this json string\"}"; isException = false; try { - httpOfferSet.processResponse(mOffers, responseStr); + httpOfferSet.processResponse(mOffers, responseStr, 0); + } catch (IOException ioe) { + isException = true; + } + assertTrue(isException); + + // Duplicate host test + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + + List mDuplicateHostOffers = ImmutableList. + copyOf(duplicateHostsHttpOfferSet.values()); + assertEquals(mDuplicateHostOffers.size(), 4); + + sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, + responseStr, 0); + assertEquals(sortedOffers.size(), 4); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + assertEquals(sortedOffers.get(3).getAttributes().getHost(), HOST_C); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(5), 0); + + // plugin returns less offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0); + assertEquals(sortedOffers.size(), 3); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(6), 1); + + // plugin returns more offers than Aurora has. + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_D + "\",\"" + + HOST_C + "\"]}"; + sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0); + assertEquals(sortedOffers.size(), 4); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(7), 1); + + // plugin omits 1 offer & returns 1 extra offer + responseStr = "{\"error\": \"\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_D + "\",\"" + + HOST_B + "\"]}"; + sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(8), 3); + + // Test with 1 bad offer + sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 1); + assertEquals(sortedOffers.size(), 2); + assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A); + assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B); + assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(9), 4); + + responseStr = "{\"error\": \"Error\", \"hosts\": [\"" + + HOST_A + "\",\"" + + HOST_B + "\",\"" + + HOST_C + "\"]}"; + isException = false; + try { + duplicateHostsHttpOfferSet.processResponse(mOffers, responseStr, 0); } catch (IOException ioe) { isException = true; }