Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db2 source: discover only permitted tables #9875

Merged
merged 8 commits into from
Feb 1, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,6 +85,26 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
return database
.query(getPrivileges(), sourceOperations::rowToJson)
.map(this::getPrivilegeDto)
.collect(Collectors.toSet());
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges() {
return connection -> connection.prepareStatement(
"SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER");
}

private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) {
return JdbcPrivilegeDto.builder()
.schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim())
.tableName(jsonNode.get("OBJECTNAME").asText())
.build();
}

/* Helpers */

private List<String> obtainConnectionOptions(final JsonNode encryption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +16,8 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -22,14 +26,22 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Db2Container;

public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String LESS_PERMITTED_USER = "db2inst2";
private static final String USER_WITH_OUT_PERMISSIONS = "db2inst3";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";
private static final String STREAM_NAME3 = "ID_AND_NAME3";
public static final String PASSWORD = "password";

private Db2Container db;
private JsonNode config;
Expand All @@ -50,6 +62,19 @@ protected JsonNode getConfig() {
return config;
}

private JsonNode getConfig(String userName, String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", userName)
.put("password", password)
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
Expand Down Expand Up @@ -84,16 +109,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
db = new Db2Container("ibmcom/db2:11.5.5.0").acceptLicense();
db.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
config = getConfig(db.getUsername(), db.getPassword());

database = Databases.createJdbcDatabase(
config.get("username").asText(),
Expand All @@ -109,18 +125,29 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String createTableQuery3 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME3);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);
final String grantSelect1 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME1, LESS_PERMITTED_USER);
final String grantSelect2 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME3, LESS_PERMITTED_USER);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(createTableQuery3);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);
database.execute(grantSelect1);
database.execute(grantSelect2);

database.close();
}
Expand All @@ -130,4 +157,40 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

@Test
public void testCheckPrivilegesForUserWithLessPerm() throws Exception {
createUser(LESS_PERMITTED_USER);
JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
assertEquals(expected.size(), actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

@Test
public void testCheckPrivilegesForUserWithoutPerm() throws Exception {
createUser(USER_WITH_OUT_PERMISSIONS);

JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = Collections.emptyList();
assertEquals(0, actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

private void createUser(String lessPermittedUser) throws IOException, InterruptedException {
String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword);
}

private List<String> getActualNamesWithPermission(JsonNode config) throws Exception{
AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
return airbyteCatalog
.getStreams()
.stream()
.map(AirbyteStream::getName)
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private Predicate<JsonNode> excludeNotAccessibleTables(final Set<String> interna
final Set<JdbcPrivilegeDto> tablesWithSelectGrantPrivilege) {
return jsonNode -> {
if (tablesWithSelectGrantPrivilege.isEmpty()) {
return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText());
return false;
}
return tablesWithSelectGrantPrivilege.stream()
.anyMatch(e -> e.getSchemaName().equals(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()))
Expand Down