From af9e532e772e0ecdd8d0b5d540b55d1bfb10ac99 Mon Sep 17 00:00:00 2001 From: Ambud Sharma Date: Fri, 17 May 2019 15:29:28 -0700 Subject: [PATCH] Adding authorization capabilities to Dr. Kafka to restrict execution of privileged operations --- docs/APIs.md | 23 ++++++- .../doctorkafka/DoctorKafkaMain.java | 23 ++++++- .../pinterest/doctorkafka/api/BrokersApi.java | 2 - .../api/BrokersDecommissionApi.java | 4 ++ .../api/ClustersMaintenanceApi.java | 4 ++ .../doctorkafka/api/DoctorKafkaApi.java | 4 +- .../doctorkafka/config/DoctorKafkaConfig.java | 35 ++++++++++ .../security/DrKafkaAuthorizationFilter.java | 17 +++++ .../security/DrKafkaSecurityContext.java | 44 +++++++++++++ .../security/SampleAuthorizationFilter.java | 66 +++++++++++++++++++ .../doctorkafka/security/UserPrincipal.java | 23 +++++++ 11 files changed, 237 insertions(+), 8 deletions(-) create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/security/UserPrincipal.java diff --git a/docs/APIs.md b/docs/APIs.md index b1506fed..ba92d82e 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -20,4 +20,25 @@ DELETE will remove the cluster from maintenance mode. curl -XGET http://localhost:8080/api/cluster//admin/maintenance curl -XPUT http://localhost:8080/api/cluster//admin/maintenance curl -XDELETE http://localhost:8080/api/cluster//admin/maintenance -``` \ No newline at end of file +``` + +**API Security** + +Dr. Kafka allows plugable API request authorization and follows the Role Based Access Control (RBAC) model. Authorization is performed by populating role-mapping in [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java) by creating an implementation of AuthorizationFilter e.g. [SampleAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java) + +Here's the flow sequence: +1. DoctorKafkaMain checks if an authorization filter has been specified via `doctorkafka.authorization.filter.class` configuration and creates an instance of `DrKafkaAuthorizationFilter` +2. This instance is then configured (invoke `configure(DoctorKafkaConfig config)`) and registered with Jersey + +All authorization filters must implement [DrKafkaAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java) which has two methods that need to be implemented: + +- `configure(DoctorKafkaConfig config)` +- `filter(ContainerRequestContext requestContext)` + +`configure(DoctorKafkaConfig config)` provides DoctorKafkaConfig to allow authorizer to configure, `DoctorKafkaConfig.getDrKafkaAdminGroups()` returns the list of groups that need to be mapped to `drkafka_admin` role + +`filter(ContainerRequestContext requestContext)` should implement the logic to extract and populate PRINCIPAL & ROLE information which is needed to create a new instance of [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java). Jersey then uses this information to restricted access to methods for users who are not in the `drkafka_admin` role. Here's the flow: + +(Authentication) -> (Populates user & group info headers) -> (YourDrKafkaAuthoriziationFilter) -> (extract User and Group info) -> (Map groups to roles) -> (Create SecurityContext) -> (Inject SecurityContext back in session) + +Note: We currently don't ship authentication mechanisms with Dr.Kafka since authentication requirements are environment/company specific. For plugable authentication, please refer to https://www.dropwizard.io/1.3.8/docs/manual/auth.html You may also use an authentication proxy. \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java b/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java index 526190ef..9572af0b 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java @@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature; import com.google.common.collect.ImmutableList; @@ -16,6 +17,7 @@ import com.pinterest.doctorkafka.config.DoctorKafkaAppConfig; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; +import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter; import com.pinterest.doctorkafka.servlet.ClusterInfoServlet; import com.pinterest.doctorkafka.servlet.DoctorKafkaActionsServlet; import com.pinterest.doctorkafka.servlet.DoctorKafkaBrokerStatsServlet; @@ -68,7 +70,7 @@ public void run(DoctorKafkaAppConfig configuration, Environment environment) thr doctorKafka = new DoctorKafka(replicaStatsManager); - registerAPIs(environment, doctorKafka); + registerAPIs(environment, doctorKafka, replicaStatsManager.getConfig()); registerServlets(environment); Executors.newCachedThreadPool().submit(() -> { @@ -115,14 +117,31 @@ private void configureServerRuntime(DoctorKafkaAppConfig configuration, DoctorKa defaultServerFactory.setApplicationConnectors(Collections.singletonList(application)); } - private void registerAPIs(Environment environment, DoctorKafka doctorKafka) { + private void registerAPIs(Environment environment, DoctorKafka doctorKafka, DoctorKafkaConfig doctorKafkaConfig) { environment.jersey().setUrlPattern("/api/*"); + checkAndInitializeAuthorizationFilter(environment, doctorKafkaConfig); environment.jersey().register(new BrokersApi(doctorKafka)); environment.jersey().register(new ClustersApi(doctorKafka)); environment.jersey().register(new ClustersMaintenanceApi(doctorKafka)); environment.jersey().register(new BrokersDecommissionApi(doctorKafka)); } + private void checkAndInitializeAuthorizationFilter(Environment environment, DoctorKafkaConfig doctorKafkaConfig) { + LOG.info("Checking authorization filter"); + try { + Class authorizationFilterClass = doctorKafkaConfig.getAuthorizationFilterClass(); + if (authorizationFilterClass != null) { + DrKafkaAuthorizationFilter filter = authorizationFilterClass.newInstance(); + filter.configure(doctorKafkaConfig); + LOG.info("Using authorization filer:" + filter.getClass().getName()); + environment.jersey().register(filter); + environment.jersey().register(RolesAllowedDynamicFeature.class); + } + } catch (Exception e) { + LOG.error("Failed to get and initialize DrKafkaAuthorizationFilter", e); + } + } + private void startMetricsService() { int ostrichPort = replicaStatsManager.getConfig().getOstrichPort(); String tsdHostPort = replicaStatsManager.getConfig().getTsdHostPort(); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java index 1c2e1a0f..130bb764 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java @@ -1,7 +1,6 @@ package com.pinterest.doctorkafka.api; import java.util.List; -import java.util.stream.Collectors; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -11,7 +10,6 @@ import javax.ws.rs.core.MediaType; import com.pinterest.doctorkafka.DoctorKafka; -import com.pinterest.doctorkafka.DoctorKafkaMain; import com.pinterest.doctorkafka.KafkaBroker; import com.pinterest.doctorkafka.KafkaClusterManager; diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java index 0f762f53..a3dcc447 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java @@ -1,11 +1,13 @@ package com.pinterest.doctorkafka.api; import com.pinterest.doctorkafka.DoctorKafka; +import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.util.ApiUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import javax.annotation.security.RolesAllowed; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -34,6 +36,7 @@ public boolean isBrokerDecommissioned(@PathParam("clusterName") String clusterNa } @PUT + @RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE }) public void decommissionBroker(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName, @PathParam("brokerId") String brokerIdStr) { @@ -42,6 +45,7 @@ public void decommissionBroker(@Context HttpServletRequest ctx, } @DELETE + @RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE }) public void cancelDecommissionBroker(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName, @PathParam("brokerId") String brokerIdStr) { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java index 7150906a..1fa4ae6d 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java @@ -1,5 +1,6 @@ package com.pinterest.doctorkafka.api; +import javax.annotation.security.RolesAllowed; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -16,6 +17,7 @@ import com.pinterest.doctorkafka.DoctorKafka; import com.pinterest.doctorkafka.KafkaClusterManager; +import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.util.ApiUtils; @Path("/clusters/{clusterName}/admin/maintenance") @@ -36,6 +38,7 @@ public boolean checkMaintenance(@PathParam("clusterName") String clusterName) { } @PUT + @RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE }) public void enableMaintenance(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName) { KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); @@ -44,6 +47,7 @@ public void enableMaintenance(@Context HttpServletRequest ctx, } @DELETE + @RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE }) public void disableMaintenance(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName) { KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java index 37738e21..64e56a8c 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java @@ -39,6 +39,4 @@ protected KafkaBroker checkAndGetBroker(String clusterName, String brokerId) { return broker; } - - -} +} \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaConfig.java b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaConfig.java index a753e9b1..fd970479 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaConfig.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaConfig.java @@ -8,10 +8,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter; + import java.io.File; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -43,6 +47,9 @@ public class DoctorKafkaConfig { private static final String NOTIFICATION_EMAILS = "emails.notification"; private static final String ALERT_EMAILS = "emails.alert"; private static final String WEB_BIND_HOST = "web.bindhost"; + public static final String DRKAFKA_ADMIN_ROLE = "drkafka_admin"; + private static final String DRKAFKA_ADMIN_GROUPS = "admin.groups"; + private static final String AUTHORIZATION_FILTER_CLASS = "authorization.filter.class"; private PropertiesConfiguration configuration = null; private AbstractConfiguration drkafkaConfiguration = null; @@ -218,4 +225,32 @@ public String[] getAlertEmails() { public boolean getRestartDisabled(){ return drkafkaConfiguration.getBoolean(RESTART_DISABLE, false); } + + /** + * Return authorization filter class (if any) + * @return authorization filter class + * @throws ClassNotFoundException + */ + @SuppressWarnings("unchecked") + public Class getAuthorizationFilterClass() throws ClassNotFoundException { + if (drkafkaConfiguration.containsKey(AUTHORIZATION_FILTER_CLASS)) { + String classFqcn = drkafkaConfiguration.getString(AUTHORIZATION_FILTER_CLASS); + return (Class) Class.forName(classFqcn); + } else { + return null; + } + } + + /** + * Groups from directory service (like LDAP) that are granted Dr.Kafka Admin + * permissions to run privileged commands. + * @return list of groups + */ + public List getDrKafkaAdminGroups() { + if (drkafkaConfiguration.containsKey(DRKAFKA_ADMIN_GROUPS)) { + return Arrays.asList(drkafkaConfiguration.getStringArray(DRKAFKA_ADMIN_GROUPS)); + } else { + return null; + } + } } \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java b/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java new file mode 100644 index 00000000..6532e8e6 --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java @@ -0,0 +1,17 @@ +package com.pinterest.doctorkafka.security; + +import javax.ws.rs.container.ContainerRequestFilter; + +import com.pinterest.doctorkafka.config.DoctorKafkaConfig; + +/** + * This extends JAX-RS containter request filter for authorization. + * + * Please refer to https://docs.oracle.com/javaee/7/api/javax/ws/rs/container/ContainerRequestFilter.html + * for more details on how {@link ContainerRequestFilter} works + */ +public interface DrKafkaAuthorizationFilter extends ContainerRequestFilter { + + public void configure(DoctorKafkaConfig config) throws Exception; + +} \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java b/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java new file mode 100644 index 00000000..3a87d7b3 --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java @@ -0,0 +1,44 @@ +package com.pinterest.doctorkafka.security; + +import java.security.Principal; +import java.util.Set; + +import javax.ws.rs.core.SecurityContext; + +public class DrKafkaSecurityContext implements SecurityContext { + + private static final String DR_KAFKA_AUTH = "drkauth"; + private UserPrincipal principal; + private Set roles; + + public DrKafkaSecurityContext(UserPrincipal principal, Set roles) { + this.principal = principal; + this.roles = roles; + } + + @Override + public Principal getUserPrincipal() { + return principal; + } + + @Override + public boolean isUserInRole(String role) { + return roles.contains(role); + } + + @Override + public boolean isSecure() { + return true; + } + + @Override + public String getAuthenticationScheme() { + return DR_KAFKA_AUTH; + } + + @Override + public String toString() { + return "DrKafkaSecurityContext [principal=" + principal + ", roles=" + roles + "]"; + } + +} \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java b/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java new file mode 100644 index 00000000..372aa17e --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java @@ -0,0 +1,66 @@ +package com.pinterest.doctorkafka.security; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.Priority; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.ext.Provider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.pinterest.doctorkafka.config.DoctorKafkaConfig; + +import jersey.repackaged.com.google.common.collect.Sets; +import jersey.repackaged.com.google.common.collect.Sets.SetView; + +/** + * This is a sample implementation of {@link DrKafkaAuthorizationFilter} + */ +@Provider +@Priority(1000) +public class SampleAuthorizationFilter implements DrKafkaAuthorizationFilter { + + private static final Logger LOG = LogManager.getLogger(SampleAuthorizationFilter.class); + private static final String GROUPS_HEADER = "GROUPS"; + private static final String USER_HEADER = "USER"; + private Set allowedAdminGroups = new HashSet<>(); + private static final Set ADMIN_ROLE_SET = new HashSet<>( + Arrays.asList(DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE)); + private static final Set EMPTY_ROLE_SET = new HashSet<>(); + + @Override + public void configure(DoctorKafkaConfig config) throws Exception { + List drKafkaAdminGroups = config.getDrKafkaAdminGroups(); + if (drKafkaAdminGroups != null) { + allowedAdminGroups.addAll(drKafkaAdminGroups); + LOG.info("Following groups will be allowed admin access:" + allowedAdminGroups); + } + } + + @Override + public void filter(ContainerRequestContext requestContext) throws IOException { + String userHeader = requestContext.getHeaderString(USER_HEADER); + String groupsHeader = requestContext.getHeaderString(GROUPS_HEADER); + DrKafkaSecurityContext ctx = null; + if (userHeader != null && groupsHeader != null) { + Set userGroups = new HashSet<>(Arrays.asList(groupsHeader.split(","))); + SetView intersection = Sets.intersection(allowedAdminGroups, userGroups); + if (intersection.size() > 0) { + ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), ADMIN_ROLE_SET); + requestContext.setSecurityContext(ctx); + LOG.info("Received authenticated request, created context:" + ctx); + return; + } + } + + ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), EMPTY_ROLE_SET); + requestContext.setSecurityContext(ctx); + LOG.info("Received annonymous request, bypassing authorizer"); + } + +} \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/security/UserPrincipal.java b/drkafka/src/main/java/com/pinterest/doctorkafka/security/UserPrincipal.java new file mode 100644 index 00000000..873876f2 --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/security/UserPrincipal.java @@ -0,0 +1,23 @@ +package com.pinterest.doctorkafka.security; + +import java.security.Principal; + +public class UserPrincipal implements Principal { + + private String username; + + public UserPrincipal(String username) { + this.username = username; + } + + @Override + public String getName() { + return username; + } + + @Override + public String toString() { + return "UserPrincipal [username=" + username + "]"; + } + +} \ No newline at end of file