Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Adding authorization capabilities to Dr. Kafka to restrict privileged operations #143

Merged
merged 1 commit into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,25 @@ DELETE will remove the cluster from maintenance mode.
curl -XGET http://localhost:8080/api/cluster/<clustername>/admin/maintenance
curl -XPUT http://localhost:8080/api/cluster/<clustername>/admin/maintenance
curl -XDELETE http://localhost:8080/api/cluster/<clustername>/admin/maintenance
```
```

**API Security**

Dr. Kafka allows plugable API request authorization and follows the Role Based Access Control (RBAC) model. Authorization is performed by populating role-mapping in [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java) by creating an implementation of AuthorizationFilter e.g. [SampleAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java)

Here's the flow sequence:
1. DoctorKafkaMain checks if an authorization filter has been specified via `doctorkafka.authorization.filter.class` configuration and creates an instance of `DrKafkaAuthorizationFilter`
2. This instance is then configured (invoke `configure(DoctorKafkaConfig config)`) and registered with Jersey

All authorization filters must implement [DrKafkaAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java) which has two methods that need to be implemented:

- `configure(DoctorKafkaConfig config)`
- `filter(ContainerRequestContext requestContext)`

`configure(DoctorKafkaConfig config)` provides DoctorKafkaConfig to allow authorizer to configure, `DoctorKafkaConfig.getDrKafkaAdminGroups()` returns the list of groups that need to be mapped to `drkafka_admin` role

`filter(ContainerRequestContext requestContext)` should implement the logic to extract and populate PRINCIPAL & ROLE information which is needed to create a new instance of [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java). Jersey then uses this information to restricted access to methods for users who are not in the `drkafka_admin` role. Here's the flow:

(Authentication) -> (Populates user & group info headers) -> (YourDrKafkaAuthoriziationFilter) -> (extract User and Group info) -> (Map groups to roles) -> (Create SecurityContext) -> (Inject SecurityContext back in session)

Note: We currently don't ship authentication mechanisms with Dr.Kafka since authentication requirements are environment/company specific. For plugable authentication, please refer to https://www.dropwizard.io/1.3.8/docs/manual/auth.html You may also use an authentication proxy.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;

import com.google.common.collect.ImmutableList;

Expand All @@ -16,6 +17,7 @@
import com.pinterest.doctorkafka.config.DoctorKafkaAppConfig;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter;
import com.pinterest.doctorkafka.servlet.ClusterInfoServlet;
import com.pinterest.doctorkafka.servlet.DoctorKafkaActionsServlet;
import com.pinterest.doctorkafka.servlet.DoctorKafkaBrokerStatsServlet;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void run(DoctorKafkaAppConfig configuration, Environment environment) thr

doctorKafka = new DoctorKafka(replicaStatsManager);

registerAPIs(environment, doctorKafka);
registerAPIs(environment, doctorKafka, replicaStatsManager.getConfig());
registerServlets(environment);

Executors.newCachedThreadPool().submit(() -> {
Expand Down Expand Up @@ -115,14 +117,31 @@ private void configureServerRuntime(DoctorKafkaAppConfig configuration, DoctorKa
defaultServerFactory.setApplicationConnectors(Collections.singletonList(application));
}

private void registerAPIs(Environment environment, DoctorKafka doctorKafka) {
private void registerAPIs(Environment environment, DoctorKafka doctorKafka, DoctorKafkaConfig doctorKafkaConfig) {
environment.jersey().setUrlPattern("/api/*");
checkAndInitializeAuthorizationFilter(environment, doctorKafkaConfig);
environment.jersey().register(new BrokersApi(doctorKafka));
environment.jersey().register(new ClustersApi(doctorKafka));
environment.jersey().register(new ClustersMaintenanceApi(doctorKafka));
environment.jersey().register(new BrokersDecommissionApi(doctorKafka));
}

private void checkAndInitializeAuthorizationFilter(Environment environment, DoctorKafkaConfig doctorKafkaConfig) {
LOG.info("Checking authorization filter");
try {
Class<? extends DrKafkaAuthorizationFilter> authorizationFilterClass = doctorKafkaConfig.getAuthorizationFilterClass();
if (authorizationFilterClass != null) {
DrKafkaAuthorizationFilter filter = authorizationFilterClass.newInstance();
filter.configure(doctorKafkaConfig);
LOG.info("Using authorization filer:" + filter.getClass().getName());
environment.jersey().register(filter);
environment.jersey().register(RolesAllowedDynamicFeature.class);
}
} catch (Exception e) {
LOG.error("Failed to get and initialize DrKafkaAuthorizationFilter", e);
}
}

private void startMetricsService() {
int ostrichPort = replicaStatsManager.getConfig().getOstrichPort();
String tsdHostPort = replicaStatsManager.getConfig().getTsdHostPort();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pinterest.doctorkafka.api;

import java.util.List;
import java.util.stream.Collectors;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
Expand All @@ -11,7 +10,6 @@
import javax.ws.rs.core.MediaType;

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.DoctorKafkaMain;
import com.pinterest.doctorkafka.KafkaBroker;
import com.pinterest.doctorkafka.KafkaClusterManager;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.pinterest.doctorkafka.api;

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.ApiUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.security.RolesAllowed;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -34,6 +36,7 @@ public boolean isBrokerDecommissioned(@PathParam("clusterName") String clusterNa
}

@PUT
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void decommissionBroker(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName,
@PathParam("brokerId") String brokerIdStr) {
Expand All @@ -42,6 +45,7 @@ public void decommissionBroker(@Context HttpServletRequest ctx,
}

@DELETE
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void cancelDecommissionBroker(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName,
@PathParam("brokerId") String brokerIdStr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pinterest.doctorkafka.api;

import javax.annotation.security.RolesAllowed;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand All @@ -16,6 +17,7 @@

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.KafkaClusterManager;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.ApiUtils;

@Path("/clusters/{clusterName}/admin/maintenance")
Expand All @@ -36,6 +38,7 @@ public boolean checkMaintenance(@PathParam("clusterName") String clusterName) {
}

@PUT
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void enableMaintenance(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName) {
KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName);
Expand All @@ -44,6 +47,7 @@ public void enableMaintenance(@Context HttpServletRequest ctx,
}

@DELETE
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void disableMaintenance(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName) {
KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,4 @@ protected KafkaBroker checkAndGetBroker(String clusterName, String brokerId) {
return broker;
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -43,6 +47,9 @@ public class DoctorKafkaConfig {
private static final String NOTIFICATION_EMAILS = "emails.notification";
private static final String ALERT_EMAILS = "emails.alert";
private static final String WEB_BIND_HOST = "web.bindhost";
public static final String DRKAFKA_ADMIN_ROLE = "drkafka_admin";
ambud marked this conversation as resolved.
Show resolved Hide resolved
private static final String DRKAFKA_ADMIN_GROUPS = "admin.groups";
private static final String AUTHORIZATION_FILTER_CLASS = "authorization.filter.class";

private PropertiesConfiguration configuration = null;
private AbstractConfiguration drkafkaConfiguration = null;
Expand Down Expand Up @@ -218,4 +225,32 @@ public String[] getAlertEmails() {
public boolean getRestartDisabled(){
return drkafkaConfiguration.getBoolean(RESTART_DISABLE, false);
}

/**
* Return authorization filter class (if any)
* @return authorization filter class
* @throws ClassNotFoundException
*/
@SuppressWarnings("unchecked")
public Class<? extends DrKafkaAuthorizationFilter> getAuthorizationFilterClass() throws ClassNotFoundException {
if (drkafkaConfiguration.containsKey(AUTHORIZATION_FILTER_CLASS)) {
String classFqcn = drkafkaConfiguration.getString(AUTHORIZATION_FILTER_CLASS);
return (Class<? extends DrKafkaAuthorizationFilter>) Class.forName(classFqcn);
} else {
return null;
}
}

/**
* Groups from directory service (like LDAP) that are granted Dr.Kafka Admin
* permissions to run privileged commands.
* @return list of groups
*/
public List<String> getDrKafkaAdminGroups() {
if (drkafkaConfiguration.containsKey(DRKAFKA_ADMIN_GROUPS)) {
return Arrays.asList(drkafkaConfiguration.getStringArray(DRKAFKA_ADMIN_GROUPS));
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.pinterest.doctorkafka.security;

import javax.ws.rs.container.ContainerRequestFilter;

import com.pinterest.doctorkafka.config.DoctorKafkaConfig;

/**
* This extends JAX-RS containter request filter for authorization.
*
* Please refer to https://docs.oracle.com/javaee/7/api/javax/ws/rs/container/ContainerRequestFilter.html
* for more details on how {@link ContainerRequestFilter} works
*/
public interface DrKafkaAuthorizationFilter extends ContainerRequestFilter {

public void configure(DoctorKafkaConfig config) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.pinterest.doctorkafka.security;

import java.security.Principal;
import java.util.Set;

import javax.ws.rs.core.SecurityContext;

public class DrKafkaSecurityContext implements SecurityContext {

private static final String DR_KAFKA_AUTH = "drkauth";
private UserPrincipal principal;
private Set<String> roles;

public DrKafkaSecurityContext(UserPrincipal principal, Set<String> roles) {
this.principal = principal;
this.roles = roles;
}

@Override
public Principal getUserPrincipal() {
return principal;
}

@Override
public boolean isUserInRole(String role) {
return roles.contains(role);
}

@Override
public boolean isSecure() {
return true;
}

@Override
public String getAuthenticationScheme() {
return DR_KAFKA_AUTH;
}

@Override
public String toString() {
return "DrKafkaSecurityContext [principal=" + principal + ", roles=" + roles + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.pinterest.doctorkafka.security;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.Priority;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.ext.Provider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.pinterest.doctorkafka.config.DoctorKafkaConfig;

import jersey.repackaged.com.google.common.collect.Sets;
import jersey.repackaged.com.google.common.collect.Sets.SetView;

/**
* This is a sample implementation of {@link DrKafkaAuthorizationFilter}
*/
@Provider
@Priority(1000)
public class SampleAuthorizationFilter implements DrKafkaAuthorizationFilter {

private static final Logger LOG = LogManager.getLogger(SampleAuthorizationFilter.class);
private static final String GROUPS_HEADER = "GROUPS";
private static final String USER_HEADER = "USER";
private Set<String> allowedAdminGroups = new HashSet<>();
private static final Set<String> ADMIN_ROLE_SET = new HashSet<>(
Arrays.asList(DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE));
private static final Set<String> EMPTY_ROLE_SET = new HashSet<>();

@Override
public void configure(DoctorKafkaConfig config) throws Exception {
List<String> drKafkaAdminGroups = config.getDrKafkaAdminGroups();
if (drKafkaAdminGroups != null) {
allowedAdminGroups.addAll(drKafkaAdminGroups);
LOG.info("Following groups will be allowed admin access:" + allowedAdminGroups);
}
}

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
String userHeader = requestContext.getHeaderString(USER_HEADER);
String groupsHeader = requestContext.getHeaderString(GROUPS_HEADER);
DrKafkaSecurityContext ctx = null;
if (userHeader != null && groupsHeader != null) {
Set<String> userGroups = new HashSet<>(Arrays.asList(groupsHeader.split(",")));
SetView<String> intersection = Sets.intersection(allowedAdminGroups, userGroups);
if (intersection.size() > 0) {
ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), ADMIN_ROLE_SET);
requestContext.setSecurityContext(ctx);
LOG.info("Received authenticated request, created context:" + ctx);
return;
}
}

ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), EMPTY_ROLE_SET);
requestContext.setSecurityContext(ctx);
LOG.info("Received annonymous request, bypassing authorizer");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.pinterest.doctorkafka.security;

import java.security.Principal;

public class UserPrincipal implements Principal {

private String username;

public UserPrincipal(String username) {
this.username = username;
}

@Override
public String getName() {
return username;
}

@Override
public String toString() {
return "UserPrincipal [username=" + username + "]";
}

}