Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

Http Offerset Plugin #80

Merged
merged 24 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/com/paypal/aurora/scheduler/model/Config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.paypal.aurora.scheduler.model;
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved

// for aurora plugin configuration
public class Config {
public String host;
public int port;
}
8 changes: 8 additions & 0 deletions src/main/java/com/paypal/aurora/scheduler/model/Host.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.paypal.aurora.scheduler.model;

import com.paypal.aurora.scheduler.model.Resource;

public class Host {
public String name;
public Resource offer;
}
12 changes: 12 additions & 0 deletions src/main/java/com/paypal/aurora/scheduler/model/Resource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.paypal.aurora.scheduler.model;

public class Resource {
public double cpu;
public double memory;
public double disk;
public Resource(double cpu, double memory, double disk){
this.cpu = cpu;
this.memory = memory;
this.disk = disk;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.paypal.aurora.scheduler.model;

public class ScheduleRequest {
public Resource request;
public Host[]hosts;
public ScheduleRequest(){
request = new Resource(0.0,0.0,0.0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.paypal.aurora.scheduler.model;

import org.apache.aurora.scheduler.offers.HostOffer;

public class ScheduleResponse {
public String error;
public String []hosts;
}
190 changes: 190 additions & 0 deletions src/main/java/com/paypal/aurora/scheduler/offers/OfferSetImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package com.paypal.aurora.scheduler.offers;
mauri marked this conversation as resolved.
Show resolved Hide resolved

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.google.gson.Gson;

import com.paypal.aurora.scheduler.model.Host;
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
import com.paypal.aurora.scheduler.model.Resource;
import com.paypal.aurora.scheduler.model.ScheduleRequest;
import com.paypal.aurora.scheduler.model.ScheduleResponse;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferSet;

import org.apache.aurora.scheduler.resources.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation for OfferSet. Backed by a ConcurrentSkipListSet.
*/
@VisibleForTesting
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
public class OfferSetImpl implements OfferSet {

private final Set<HostOffer> offers;
private static final Logger LOG = LoggerFactory.getLogger(OfferSetImpl.class);

@Inject
public OfferSetImpl(Ordering<HostOffer> ordering) {
offers = new ConcurrentSkipListSet<>(ordering);
}

@Override
public void add(HostOffer offer) {
offers.add(offer);
}

@Override
public void remove(HostOffer removed) {
offers.remove(removed);
}

@Override
public int size() {
// Potential gotcha - since this is a ConcurrentSkipListSet, size() is more
// expensive.
// Could track this separately if it turns out to pose problems.
return offers.size();
}

@Override
public void clear() {
offers.clear();
}

@Override
public Iterable<HostOffer> values() {
return offers;
}

private Timestamp prevTimestamp = new Timestamp(System.currentTimeMillis());
private long ageInMilliseconds = 60*1000;
@Override
public Iterable<HostOffer> getOrdered(TaskGroupKey groupKey, ResourceRequest resourceRequest) {
List<HostOffer> orderedOffers = getOffersFromPlugin(resourceRequest, offers);
if (orderedOffers != null){
return orderedOffers;
}
// fallback to default scheduler.
LOG.error("MagicMatch failed to schedule the task. Aurora uses the default scheduler.");
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
return offers;
}
PluginConfig plugin = null;
// getOffersFromPlugin gets the offers from MagicMatch.
private List<HostOffer> getOffersFromPlugin(ResourceRequest resourceRequest, Iterable<HostOffer> offers){
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
if (plugin==null){
plugin = new PluginConfig();
}
Gson gson = new Gson();
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
List<HostOffer> orderedOffers = new ArrayList<HostOffer>();
Map<String,HostOffer> offerMap = new HashMap<String, HostOffer>();
for(HostOffer offer: offers) {
offerMap.put(offer.getAttributes().getHost(), offer);
}
// send the Rest API request to the scheduler plugin
URL url = null;
try {
url = new URL(plugin.getEndpoint() + "/v1/offerset");
} catch (MalformedURLException e) {
LOG.error(e.toString());
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
return null;
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
}
// create json request
ScheduleRequest scheduleRequest = new ScheduleRequest();
scheduleRequest.request.cpu = resourceRequest.getResourceBag().valueOf(ResourceType.CPUS);
scheduleRequest.request.memory = resourceRequest.getResourceBag().valueOf(ResourceType.RAM_MB);
scheduleRequest.request.disk = resourceRequest.getResourceBag().valueOf(ResourceType.DISK_MB);
scheduleRequest.hosts = new Host[Iterables.size(offers)];
int i =0;
for (HostOffer offer:offers) {
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
scheduleRequest.hosts[i] = new Host();
scheduleRequest.hosts[i].name = offer.getAttributes().getHost();
double cpu = offer.getResourceBag(true).valueOf(ResourceType.CPUS) +
offer.getResourceBag(false).valueOf(ResourceType.CPUS);
double memory = offer.getResourceBag(true).valueOf(ResourceType.RAM_MB) +
offer.getResourceBag(false).valueOf(ResourceType.RAM_MB) ;
double disk = offer.getResourceBag(true).valueOf(ResourceType.DISK_MB) +
offer.getResourceBag(false).valueOf(ResourceType.DISK_MB);
scheduleRequest.hosts[i].offer = new Resource(cpu, memory, disk);
i++;
}

// create connection
HttpURLConnection con = null;
try {
con = (HttpURLConnection)url.openConnection();
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/json; utf-8");
con.setRequestProperty("Accept", "application/json");
con.setDoOutput(true);
} catch (ProtocolException pe) {
LOG.error(pe.toString());
return null;
} catch (IOException ioe) {
LOG.error(ioe.toString());
return null;
}
String jsonStr = gson.toJson(scheduleRequest);
LOG.debug("request to plugin: "+jsonStr);
try(OutputStream os = con.getOutputStream()) {
byte[] input = jsonStr.getBytes("utf-8");
os.write(input, 0, input.length);
} catch (UnsupportedEncodingException uee) {
LOG.error(uee.toString());
return null;
} catch (IOException ioe){
LOG.error(ioe.toString());
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
// read response
StringBuilder response = new StringBuilder();
try{
BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), "utf-8"));
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
String responseLine = null;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
} catch (UnsupportedEncodingException uee){
LOG.error(uee.toString());
return null;
} catch (IOException ioe) {
LOG.error(ioe.toString());
return null;
}
ScheduleResponse scheduleResponse = gson.fromJson(response.toString(), ScheduleResponse.class);
LOG.debug("plugin response: "+response.toString());
// process the scheduleResult
if (scheduleResponse.hosts!=null){
for(String host: scheduleResponse.hosts) {
HostOffer offer = offerMap.get(host);
if (offer!=null) {
orderedOffers.add(offer);
} else {
LOG.error("Cannot find this host "+host+" in "+offerMap.size()+" offers");
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return orderedOffers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.paypal.aurora.scheduler.offers;

import javax.inject.Singleton;

import com.google.common.collect.Ordering;
import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;

import com.google.inject.TypeLiteral;

import org.apache.aurora.scheduler.config.CliOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferOrderBuilder;
import org.apache.aurora.scheduler.offers.OfferSet;

public class OfferSetModule extends AbstractModule {
private final CliOptions options;
private static final Logger LOG = LoggerFactory.getLogger(OfferSetModule.class);

public OfferSetModule(CliOptions options) {
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
this.options = options;
}

@Override
protected void configure() {
LOG.info("PayPal Offer Set module Enabled.");
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
install(new PrivateModule() {
@Override
protected void configure() {
bind(new TypeLiteral<Ordering<HostOffer>>() { })
.toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
bind(OfferSetImpl.class).in(Singleton.class);
bind(OfferSet.class).to(OfferSetImpl.class);
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
expose(OfferSet.class);
}
});
}
}
54 changes: 54 additions & 0 deletions src/main/java/com/paypal/aurora/scheduler/offers/PluginConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.paypal.aurora.scheduler.offers;

import com.paypal.aurora.scheduler.model.Config;
import com.google.gson.Gson;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;

public class PluginConfig {
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(PluginConfig.class);
private String endpoint = "http://localhost:9090";
private String configFile = "/etc/aurora-plugin.json";
/*
aurora-plugin.json file:
{
"host": "localhost",
"port": 9090
}
*/
public PluginConfig() {
// load file
FileInputStream targetFile = null;
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
try {
targetFile = new FileInputStream(new File(this.configFile));
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
} catch (FileNotFoundException e) {
LOG.error("Cannot load "+this.configFile);
LOG.error(e.toString());
}
String jsonStr = "";
try {
jsonStr = IOUtils.toString(targetFile, "UTF-8");
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException io){
LOG.error("Cannot load "+this.configFile);
LOG.error(io.toString());
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
}
if (!jsonStr.equals("")){
Config config = new Gson().fromJson(jsonStr, Config.class);
if (config!=null) {
this.endpoint = "http://"+config.host+":"+config.port;
}
} else {
LOG.error(this.configFile+" is empty");
}
}

public String getEndpoint() {
return this.endpoint;
}
}