Skip to content

Commit

Permalink
test PR
Browse files Browse the repository at this point in the history
test commit: github token setup

create JobStatusRequest data model class to hold https request params for API request handler

rollback dummy PR changes

creating draft PR for refactoring JobStatus request. unfinished
  • Loading branch information
Whitney Deng committed Oct 17, 2024
1 parent 450613a commit c05a1c7
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 48 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.HttpConstants.*;
import static com.linkedin.venice.HttpConstants.HTTP_GET;
import static com.linkedin.venice.VeniceConstants.*;
import static com.linkedin.venice.VeniceConstants.CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.*;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;

import com.linkedin.venice.acl.AclException;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.exceptions.VeniceException;
import java.security.cert.X509Certificate;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import spark.Request;


public class VeniceControllerAccessControlService {
private static final Logger LOGGER = LogManager.getLogger(VeniceControllerAccessControlService.class);

private static final String USER_UNKNOWN = "USER_UNKNOWN";
private static final String STORE_UNKNOWN = "STORE_UNKNOWN";

// A singleton of acl check function against store resource
private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck GET_ACCESS_TO_STORE =
(cert, resourceName, aclClient) -> aclClient.hasAccess(cert, resourceName, HTTP_GET);
// A singleton of acl check function against topic resource
private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck WRITE_ACCESS_TO_TOPIC =
(cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Write");

private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck READ_ACCESS_TO_TOPIC =
(cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Read");

private final boolean sslEnabled;
private final Optional<DynamicAccessController> accessController;

/**
* Default constructor for different controller request routes.
*
* TODO: once Venice Admin allowlist proposal is approved, we can transfer the allowlist to all routes
* through this constructor; make sure Nuage is also in the allowlist so that they can create stores
* @param accessController the access client that check whether a certificate can access a resource
*/
public VeniceControllerAccessControlService(boolean sslEnabled, Optional<DynamicAccessController> accessController) {
this.sslEnabled = sslEnabled;
this.accessController = accessController;
}

/**
* Check whether the user certificate in request has access to the store specified in
* the request.
*/
private boolean hasAccess(
Request request,
com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck aclCheckFunction) {
if (!isAclEnabled()) {
/**
* Grant access if it's not required to check ACL.
*/
return true;
}
X509Certificate certificate = getCertificate(request);

String storeName = request.queryParams(NAME);
/**
* Currently Nuage only supports adding GET/POST methods for a store resource
* TODO: Feature request for Nuage to support other method like PUT or customized methods
* like WRITE, UPDATE, ADMIN etc.
*/
try {
if (!aclCheckFunction.apply(certificate, storeName, accessController.get())) {
// log the abused users
LOGGER.warn(
"Client {} [host:{} IP:{}] doesn't have access to store {}",
certificate.getSubjectX500Principal().toString(),
request.host(),
request.ip(),
storeName);
return false;
}
} catch (AclException e) {
LOGGER.error(
"Error while parsing certificate from client {} [host:{} IP:{}]",
certificate.getSubjectX500Principal().toString(),
request.host(),
request.ip(),
e);
return false;
}
return true;
}

/**
* Check whether the user has "Write" method access to the related version topics.
*/
protected boolean hasWriteAccessToTopic(Request request) {
return hasAccess(request, WRITE_ACCESS_TO_TOPIC);
}

/**
* Check whether the user has "Read" method access to the related version topics.
*/
protected boolean hasReadAccessToTopic(Request request) {
return hasAccess(request, READ_ACCESS_TO_TOPIC);
}

/**
* Get principal Id from request.
*/
protected String getPrincipalId(Request request) {
if (!isSslEnabled()) {
LOGGER.warn("SSL is not enabled. No certificate could be extracted from request.");
return USER_UNKNOWN;
}
X509Certificate certificate = getCertificate(request);
if (isAclEnabled()) {
try {
return accessController.get().getPrincipalId(certificate);
} catch (Exception e) {
LOGGER.error("Error when retrieving principal Id from request", e);
return USER_UNKNOWN;
}
} else {
return certificate.getSubjectX500Principal().getName();
}
}

/**
* Check whether the user has "GET" method access to the related store resource.
*
* Notice: currently we don't have any controller request that necessarily requires "GET" ACL to store;
* ACL is not checked for requests that want to get metadata of a store/job.
*/
protected boolean hasAccessToStore(Request request) {
return hasAccess(request, GET_ACCESS_TO_STORE);
}

/**
* Check whether the user is within the admin users allowlist.
*/
protected boolean isAllowListUser(Request request) {
if (!isAclEnabled()) {
/**
* Grant access if it's not required to check ACL.
* {@link accessController} will be empty if ACL is not enabled.
*/
return true;
}
X509Certificate certificate = getCertificate(request);

String storeName = request.queryParamOrDefault(NAME, STORE_UNKNOWN);
return accessController.get().isAllowlistUsers(certificate, storeName, HTTP_GET);
}

/**
* @return whether SSL is enabled
*/
protected boolean isSslEnabled() {
return sslEnabled;
}

/**
* @return whether ACL check is enabled.
*/
protected boolean isAclEnabled() {
/**
* {@link accessController} will be empty if ACL is not enabled.
*/
return accessController.isPresent();
}

/**
* Helper function to get certificate out of Spark request
*/
protected static X509Certificate getCertificate(Request request) {
HttpServletRequest rawRequest = request.raw();
Object certificateObject = rawRequest.getAttribute(CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME);
if (certificateObject == null) {
throw new VeniceException("Client request doesn't contain certificate for store: " + request.queryParams(NAME));
}
return ((X509Certificate[]) certificateObject)[0];
}

/**
* A function that would check whether a principal has access to a resource.
*/
@FunctionalInterface
interface ResourceAclCheck {
boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController)
throws AclException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.venice.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.server.endpoint.JobStatusRequest;
import com.linkedin.venice.controller.spark.VeniceSparkServerFactory;
import com.linkedin.venice.controller.stats.SparkServerStats;
import com.linkedin.venice.controllerapi.ControllerRoute;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import spark.embeddedserver.EmbeddedServers;


public class VeniceControllerApiHandler {
private final int port;
private final Admin admin;
private final boolean enforceSSL;
private final boolean sslEnabled;
private final boolean checkReadMethodForKafka;
private final Optional<SSLConfig> sslConfig;
private final Optional<DynamicAccessController> accessController;

protected static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
final private Map<String, SparkServerStats> statsMap;
final private SparkServerStats nonclusterSpecificStats;

private static String REQUEST_START_TIME = "startTime";
private static String REQUEST_SUCCEED = "succeed";

private final List<ControllerRoute> disabledRoutes;

private final boolean disableParentRequestTopicForStreamPushes;
private final PubSubTopicRepository pubSubTopicRepository;

// Use this for access controls and other security related checks
VeniceControllerAccessControlService veniceControllerAccessControlService;

public VeniceControllerApiHandler(
int port,
Admin admin,
MetricsRepository metricsRepository,
Set<String> clusters,
boolean enforceSSL,
Optional<SSLConfig> sslConfig,
boolean checkReadMethodForKafka,
Optional<DynamicAccessController> accessController,
List<ControllerRoute> disabledRoutes,
VeniceProperties jettyConfigOverrides,
boolean disableParentRequestTopicForStreamPushes,
PubSubTopicRepository pubSubTopicRepository) {
this.port = port;
this.enforceSSL = enforceSSL;
this.sslEnabled = sslConfig.isPresent();
this.sslConfig = sslConfig;
this.checkReadMethodForKafka = checkReadMethodForKafka;
this.accessController = accessController;
// Note: admin is passed in as a reference. The expectation is the source of the admin will
// close it so we don't close it in stopInner()
this.admin = admin;
statsMap = new HashMap<>(clusters.size());
String statsPrefix = sslEnabled ? "secure_" : "";
for (String cluster: clusters) {
statsMap.put(
cluster,
new SparkServerStats(metricsRepository, cluster + "." + statsPrefix + "controller_spark_server"));
}
nonclusterSpecificStats = new SparkServerStats(metricsRepository, "." + statsPrefix + "controller_spark_server");
EmbeddedServers.add(EmbeddedServers.Identifiers.JETTY, new VeniceSparkServerFactory(jettyConfigOverrides));

this.disabledRoutes = disabledRoutes;
this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes;
this.pubSubTopicRepository = pubSubTopicRepository;
this.veniceControllerAccessControlService = new VeniceControllerAccessControlService(sslEnabled, accessController);
}

public JobStatusQueryResponse populateJobStatus(JobStatusRequest jobStatusRequest) {
JobStatusQueryResponse responseObject = new JobStatusQueryResponse();

String store = jobStatusRequest.getStore();
int versionNumber = jobStatusRequest.getVersionNumber();
String cluster = jobStatusRequest.getCluster();
String incrementalPushVersion = jobStatusRequest.getIncrementalPushVersion();
String region = jobStatusRequest.getRegion();
String targetedRegions = jobStatusRequest.getTargetedRegions();

String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber);
Admin.OfflinePushStatusInfo offlineJobStatus = admin.getOffLinePushStatus(
cluster,
kafkaTopicName,
Optional.ofNullable(incrementalPushVersion),
region,
targetedRegions);
responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString());
responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp());
responseObject.setStatusDetails(offlineJobStatus.getStatusDetails());
responseObject.setExtraInfo(offlineJobStatus.getExtraInfo());
responseObject.setExtraInfoUpdateTimestamp(offlineJobStatus.getExtraInfoUpdateTimestamp());
responseObject.setExtraDetails(offlineJobStatus.getExtraDetails());
responseObject.setUncompletedPartitions(offlineJobStatus.getUncompletedPartitions());

responseObject.setCluster(cluster);
responseObject.setName(store);
responseObject.setVersion(versionNumber);
return responseObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected static X509Certificate getCertificate(Request request) {
* A function that would check whether a principal has access to a resource.
*/
@FunctionalInterface
interface ResourceAclCheck {
public interface ResourceAclCheck {
boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController)
throws AclException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.AuditInfo;
import com.linkedin.venice.controller.VeniceControllerApiHandler;
import com.linkedin.venice.controller.spark.VeniceSparkServerFactory;
import com.linkedin.venice.controller.stats.SparkServerStats;
import com.linkedin.venice.controllerapi.ControllerRoute;
Expand Down Expand Up @@ -169,6 +170,7 @@ public class AdminSparkServer extends AbstractVeniceService {

private final boolean disableParentRequestTopicForStreamPushes;
private final PubSubTopicRepository pubSubTopicRepository;
private final VeniceControllerApiHandler veniceControllerApiHandler;

public AdminSparkServer(
int port,
Expand Down Expand Up @@ -206,6 +208,19 @@ public AdminSparkServer(
this.disabledRoutes = disabledRoutes;
this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes;
this.pubSubTopicRepository = pubSubTopicRepository;
this.veniceControllerApiHandler = new VeniceControllerApiHandler(
port,
admin,
metricsRepository,
clusters,
enforceSSL,
sslConfig,
checkReadMethodForKafka,
accessController,
disabledRoutes,
jettyConfigOverrides,
disableParentRequestTopicForStreamPushes,
pubSubTopicRepository);
}

@Override
Expand Down Expand Up @@ -280,7 +295,7 @@ public boolean startInner() throws Exception {
// Build all different routes
ControllerRoutes controllerRoutes = new ControllerRoutes(sslEnabled, accessController, pubSubTopicRepository);
StoresRoutes storesRoutes = new StoresRoutes(sslEnabled, accessController, pubSubTopicRepository);
JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController);
JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController, veniceControllerApiHandler);
SkipAdminRoute skipAdminRoute = new SkipAdminRoute(sslEnabled, accessController);
CreateVersion createVersion = new CreateVersion(
sslEnabled,
Expand Down
Loading

0 comments on commit c05a1c7

Please sign in to comment.