Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db2 source: discover only permitted tables #9875

Merged
merged 8 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
"name": "IBM Db2",
"dockerRepository": "airbyte/source-db2",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2",
"icon": "db2.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
icon: db2.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3185,7 +3185,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-db2:0.1.4"
- dockerImage: "airbyte/source-db2:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/db2"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-db2-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-db2-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-db2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +33,8 @@ public class Db2Source extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
private static Db2SourceOperations operations;

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
Expand All @@ -51,8 +60,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {

var result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put(USERNAME, config.get(USERNAME).asText())
.put(PASSWORD, config.get(PASSWORD).asText())
.build());

// assume ssl if not explicitly mentioned.
Expand All @@ -62,8 +71,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
jdbcUrl.append(";");
result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put(USERNAME, config.get(USERNAME).asText())
.put(PASSWORD, config.get(PASSWORD).asText())
.put("connection_properties", additionalParams)
.build());
}
Expand All @@ -78,6 +87,26 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
return database
.query(getPrivileges(), sourceOperations::rowToJson)
.map(this::getPrivilegeDto)
.collect(Collectors.toSet());
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges() {
return connection -> connection.prepareStatement(
"SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER");
}

private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) {
return JdbcPrivilegeDto.builder()
.schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim())
.tableName(jsonNode.get("OBJECTNAME").asText())
.build();
}

/* Helpers */

private List<String> obtainConnectionOptions(final JsonNode encryption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +16,8 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -22,14 +26,22 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Db2Container;

public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String LESS_PERMITTED_USER = "db2inst2";
private static final String USER_WITH_OUT_PERMISSIONS = "db2inst3";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";
private static final String STREAM_NAME3 = "ID_AND_NAME3";
public static final String PASSWORD = "password";

private Db2Container db;
private JsonNode config;
Expand All @@ -50,6 +62,19 @@ protected JsonNode getConfig() {
return config;
}

private JsonNode getConfig(String userName, String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", userName)
.put("password", password)
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
Expand Down Expand Up @@ -84,16 +109,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
db = new Db2Container("ibmcom/db2:11.5.5.0").acceptLicense();
db.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
config = getConfig(db.getUsername(), db.getPassword());

database = Databases.createJdbcDatabase(
config.get("username").asText(),
Expand All @@ -109,18 +125,29 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String createTableQuery3 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME3);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);
final String grantSelect1 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME1, LESS_PERMITTED_USER);
final String grantSelect2 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME3, LESS_PERMITTED_USER);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(createTableQuery3);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);
database.execute(grantSelect1);
database.execute(grantSelect2);

database.close();
}
Expand All @@ -130,4 +157,41 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

@Test
public void testCheckPrivilegesForUserWithLessPerm() throws Exception {
createUser(LESS_PERMITTED_USER);
JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
assertEquals(expected.size(), actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

@Test
public void testCheckPrivilegesForUserWithoutPerm() throws Exception {
createUser(USER_WITH_OUT_PERMISSIONS);

JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = Collections.emptyList();
assertEquals(0, actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

private void createUser(String lessPermittedUser) throws IOException, InterruptedException {
String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword);
}

private List<String> getActualNamesWithPermission(JsonNode config) throws Exception {
AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
return airbyteCatalog
.getStreams()
.stream()
.map(AirbyteStream::getName)
.toList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private Predicate<JsonNode> excludeNotAccessibleTables(final Set<String> interna
final Set<JdbcPrivilegeDto> tablesWithSelectGrantPrivilege) {
return jsonNode -> {
if (tablesWithSelectGrantPrivilege.isEmpty()) {
return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText());
return false;
}
return tablesWithSelectGrantPrivilege.stream()
.anyMatch(e -> e.getSchemaName().equals(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()))
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/db2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You can also enter your own password for the keystore, but if you don't, the pas

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2022-02-01 | [9875](https://github.com/airbytehq/airbyte/pull/9875) | Discover only permitted for user tables |
| 0.1.4 | 2021-12-30 | [9187](https://github.com/airbytehq/airbyte/pull/9187) [8749](https://github.com/airbytehq/airbyte/pull/8749) | Add support of JdbcType.ARRAY to JdbcSourceOperations. |
| 0.1.3 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
| 0.1.2 | 2021-10-25 | [7355](https://github.com/airbytehq/airbyte/pull/7355) | Added ssl support |
Expand Down