Skip to content

Commit

Permalink
Db2 source: discover only permitted for user tables (#9875)
Browse files Browse the repository at this point in the history
* db2 source: discover only permitted tables

* db2 source: handle case for user without permitted tables

* db2 source: sonar fixes

* db2 source: format fixes

* db2 source: sonar fixes

* db2 source: bump versions

* db2 source: generated seeds
  • Loading branch information
sashaNeshcheret authored Feb 1, 2022
1 parent 5133ce6 commit 98b5aab
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
"name": "IBM Db2",
"dockerRepository": "airbyte/source-db2",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2",
"icon": "db2.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
icon: db2.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3138,7 +3138,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-db2:0.1.4"
- dockerImage: "airbyte/source-db2:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/db2"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-db2-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-db2-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-db2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +33,8 @@ public class Db2Source extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
private static Db2SourceOperations operations;

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
Expand All @@ -51,8 +60,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {

var result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put(USERNAME, config.get(USERNAME).asText())
.put(PASSWORD, config.get(PASSWORD).asText())
.build());

// assume ssl if not explicitly mentioned.
Expand All @@ -62,8 +71,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
jdbcUrl.append(";");
result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put(USERNAME, config.get(USERNAME).asText())
.put(PASSWORD, config.get(PASSWORD).asText())
.put("connection_properties", additionalParams)
.build());
}
Expand All @@ -78,6 +87,26 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
return database
.query(getPrivileges(), sourceOperations::rowToJson)
.map(this::getPrivilegeDto)
.collect(Collectors.toSet());
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges() {
return connection -> connection.prepareStatement(
"SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER");
}

private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) {
return JdbcPrivilegeDto.builder()
.schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim())
.tableName(jsonNode.get("OBJECTNAME").asText())
.build();
}

/* Helpers */

private List<String> obtainConnectionOptions(final JsonNode encryption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +16,8 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -22,14 +26,22 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Db2Container;

public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String LESS_PERMITTED_USER = "db2inst2";
private static final String USER_WITH_OUT_PERMISSIONS = "db2inst3";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";
private static final String STREAM_NAME3 = "ID_AND_NAME3";
public static final String PASSWORD = "password";

private Db2Container db;
private JsonNode config;
Expand All @@ -50,6 +62,19 @@ protected JsonNode getConfig() {
return config;
}

private JsonNode getConfig(String userName, String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", userName)
.put("password", password)
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
Expand Down Expand Up @@ -84,16 +109,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
db = new Db2Container("ibmcom/db2:11.5.5.0").acceptLicense();
db.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
config = getConfig(db.getUsername(), db.getPassword());

database = Databases.createJdbcDatabase(
config.get("username").asText(),
Expand All @@ -109,18 +125,29 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String createTableQuery3 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME3);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);
final String grantSelect1 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME1, LESS_PERMITTED_USER);
final String grantSelect2 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME3, LESS_PERMITTED_USER);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(createTableQuery3);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);
database.execute(grantSelect1);
database.execute(grantSelect2);

database.close();
}
Expand All @@ -130,4 +157,41 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

@Test
public void testCheckPrivilegesForUserWithLessPerm() throws Exception {
createUser(LESS_PERMITTED_USER);
JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
assertEquals(expected.size(), actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

@Test
public void testCheckPrivilegesForUserWithoutPerm() throws Exception {
createUser(USER_WITH_OUT_PERMISSIONS);

JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = Collections.emptyList();
assertEquals(0, actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

private void createUser(String lessPermittedUser) throws IOException, InterruptedException {
String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword);
}

private List<String> getActualNamesWithPermission(JsonNode config) throws Exception {
AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
return airbyteCatalog
.getStreams()
.stream()
.map(AirbyteStream::getName)
.toList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private Predicate<JsonNode> excludeNotAccessibleTables(final Set<String> interna
final Set<JdbcPrivilegeDto> tablesWithSelectGrantPrivilege) {
return jsonNode -> {
if (tablesWithSelectGrantPrivilege.isEmpty()) {
return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText());
return false;
}
return tablesWithSelectGrantPrivilege.stream()
.anyMatch(e -> e.getSchemaName().equals(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()))
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/db2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You can also enter your own password for the keystore, but if you don't, the pas

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2022-02-01 | [9875](https://github.com/airbytehq/airbyte/pull/9875) | Discover only permitted for user tables |
| 0.1.4 | 2021-12-30 | [9187](https://github.com/airbytehq/airbyte/pull/9187) [8749](https://github.com/airbytehq/airbyte/pull/8749) | Add support of JdbcType.ARRAY to JdbcSourceOperations. |
| 0.1.3 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
| 0.1.2 | 2021-10-25 | [7355](https://github.com/airbytehq/airbyte/pull/7355) | Added ssl support |
Expand Down

0 comments on commit 98b5aab

Please sign in to comment.