diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java index ffcbb6628072a..5f2296e5b8073 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java @@ -6,18 +6,25 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; import java.io.IOException; import java.io.PrintWriter; +import java.sql.Connection; import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +85,29 @@ public Set getExcludedInternalNameSpaces() { "SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS"); } + @Override + public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { + return database + .query(getPrivileges(database), sourceOperations::rowToJson) + .map(this::getPrivilegeDto) + .collect(Collectors.toSet()); + } + + private CheckedFunction getPrivileges(JdbcDatabase database) { + return connection -> { + final PreparedStatement ps = connection.prepareStatement( + "SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER"); + return ps; + }; + } + + private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) { + return JdbcPrivilegeDto.builder() + .schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim()) + .tableName(jsonNode.get("OBJECTNAME").asText()) + .build(); + } + /* Helpers */ private List obtainConnectionOptions(final JsonNode encryption) { diff --git a/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceAcceptanceTest.java index 9365b374a337a..73f5b39bb0b6c 100644 --- a/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -14,6 +16,8 @@ import io.airbyte.integrations.source.db2.Db2Source; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -22,16 +26,22 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.Db2Container; public class Db2SourceAcceptanceTest extends SourceAcceptanceTest { private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST"; + private static final String LESS_PERMITTED_USER = "db2inst2"; + private static final String USER_WITH_OUT_PERMISSIONS = "db2inst3"; private static final String STREAM_NAME1 = "ID_AND_NAME1"; private static final String STREAM_NAME2 = "ID_AND_NAME2"; + private static final String STREAM_NAME3 = "ID_AND_NAME3"; + public static final String PASSWORD = "password"; private Db2Container db; private JsonNode config; @@ -52,6 +62,19 @@ protected JsonNode getConfig() { return config; } + private JsonNode getConfig(String userName, String password) { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("port", db.getFirstMappedPort()) + .put("db", db.getDatabaseName()) + .put("username", userName) + .put("password", password) + .put("encryption", Jsons.jsonNode(ImmutableMap.builder() + .put("encryption_method", "unencrypted") + .build())) + .build()); + } + @Override protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( @@ -86,16 +109,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc db = new Db2Container("ibmcom/db2:11.5.5.0").acceptLicense(); db.start(); - config = Jsons.jsonNode(ImmutableMap.builder() - .put("host", db.getHost()) - .put("port", db.getFirstMappedPort()) - .put("db", db.getDatabaseName()) - .put("username", db.getUsername()) - .put("password", db.getPassword()) - .put("encryption", Jsons.jsonNode(ImmutableMap.builder() - .put("encryption_method", "unencrypted") - .build())) - .build()); + config = getConfig(db.getUsername(), db.getPassword()); database = Databases.createJdbcDatabase( config.get("username").asText(), @@ -111,18 +125,29 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1); final String createTableQuery2 = String .format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2); + final String createTableQuery3 = String + .format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME3); final String insertIntoTableQuery1 = String .format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')", SCHEMA_NAME, STREAM_NAME1); final String insertIntoTableQuery2 = String .format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')", SCHEMA_NAME, STREAM_NAME2); + final String grantSelect1 = String + .format("GRANT SELECT ON TABLE %s.%s TO %s", + SCHEMA_NAME, STREAM_NAME1, LESS_PERMITTED_USER); + final String grantSelect2 = String + .format("GRANT SELECT ON TABLE %s.%s TO %s", + SCHEMA_NAME, STREAM_NAME3, LESS_PERMITTED_USER); database.execute(createSchemaQuery); database.execute(createTableQuery1); database.execute(createTableQuery2); + database.execute(createTableQuery3); database.execute(insertIntoTableQuery1); database.execute(insertIntoTableQuery2); + database.execute(grantSelect1); + database.execute(grantSelect2); database.close(); } @@ -132,4 +157,40 @@ protected void tearDown(final TestDestinationEnv testEnv) { db.close(); } + @Test + public void testCheckPrivilegesForUserWithLessPerm() throws Exception { + createUser(LESS_PERMITTED_USER); + JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD); + + final List actualNamesWithPermission = getActualNamesWithPermission(config); + List expected = List.of(STREAM_NAME3, STREAM_NAME1); + assertEquals(expected.size(), actualNamesWithPermission.size()); + assertEquals(expected, actualNamesWithPermission); + } + + @Test + public void testCheckPrivilegesForUserWithoutPerm() throws Exception { + createUser(USER_WITH_OUT_PERMISSIONS); + + JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD); + + final List actualNamesWithPermission = getActualNamesWithPermission(config); + List expected = Collections.emptyList(); + assertEquals(0, actualNamesWithPermission.size()); + assertEquals(expected, actualNamesWithPermission); + } + + private void createUser(String lessPermittedUser) throws IOException, InterruptedException { + String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", ""); + db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword); + } + + private List getActualNamesWithPermission(JsonNode config) throws Exception{ + AirbyteCatalog airbyteCatalog = new Db2Source().discover(config); + return airbyteCatalog + .getStreams() + .stream() + .map(AirbyteStream::getName) + .toList(); + } }