Skip to content

Commit

Permalink
db2 source: discover only permitted tables
Browse files Browse the repository at this point in the history
  • Loading branch information
sashaNeshcheret committed Jan 28, 2022
1 parent c8ee3f8 commit d56b566
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,6 +85,29 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
return database
.query(getPrivileges(database), sourceOperations::rowToJson)
.map(this::getPrivilegeDto)
.collect(Collectors.toSet());
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
return connection -> {
final PreparedStatement ps = connection.prepareStatement(
"SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER");
return ps;
};
}

private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) {
return JdbcPrivilegeDto.builder()
.schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim())
.tableName(jsonNode.get("OBJECTNAME").asText())
.build();
}

/* Helpers */

private List<String> obtainConnectionOptions(final JsonNode encryption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +16,8 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -22,16 +26,22 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Db2Container;

public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String LESS_PERMITTED_USER = "db2inst2";
private static final String USER_WITH_OUT_PERMISSIONS = "db2inst3";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";
private static final String STREAM_NAME3 = "ID_AND_NAME3";
public static final String PASSWORD = "password";

private Db2Container db;
private JsonNode config;
Expand All @@ -52,6 +62,19 @@ protected JsonNode getConfig() {
return config;
}

private JsonNode getConfig(String userName, String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", userName)
.put("password", password)
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
Expand Down Expand Up @@ -86,16 +109,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
db = new Db2Container("ibmcom/db2:11.5.5.0").acceptLicense();
db.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());
config = getConfig(db.getUsername(), db.getPassword());

database = Databases.createJdbcDatabase(
config.get("username").asText(),
Expand All @@ -111,18 +125,29 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String createTableQuery3 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME3);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);
final String grantSelect1 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME1, LESS_PERMITTED_USER);
final String grantSelect2 = String
.format("GRANT SELECT ON TABLE %s.%s TO %s",
SCHEMA_NAME, STREAM_NAME3, LESS_PERMITTED_USER);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(createTableQuery3);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);
database.execute(grantSelect1);
database.execute(grantSelect2);

database.close();
}
Expand All @@ -132,4 +157,40 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

@Test
public void testCheckPrivilegesForUserWithLessPerm() throws Exception {
createUser(LESS_PERMITTED_USER);
JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
assertEquals(expected.size(), actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

@Test
public void testCheckPrivilegesForUserWithoutPerm() throws Exception {
createUser(USER_WITH_OUT_PERMISSIONS);

JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);

final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
List<String> expected = Collections.emptyList();
assertEquals(0, actualNamesWithPermission.size());
assertEquals(expected, actualNamesWithPermission);
}

private void createUser(String lessPermittedUser) throws IOException, InterruptedException {
String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword);
}

private List<String> getActualNamesWithPermission(JsonNode config) throws Exception{
AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
return airbyteCatalog
.getStreams()
.stream()
.map(AirbyteStream::getName)
.toList();
}
}

1 comment on commit d56b566

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Source Db2(#9875)

Measures

Name Value Name Value Name Value
Vulnerabilities 0 Bugs 1 Security Rating A
Duplicated Lines (%) 0.0 Reliability Rating C Code Smells 10
Coverage 0.0 Quality Gate Status OK Lines to Cover 100
Duplicated Blocks 0 Lines of Code 190 Blocker Issues 0
Critical Issues 4 Major Issues 6 Minor Issues 1

Detected Issues

Rule File Description Message
java:S1172 (MAJOR) io.airbyte.integrations.source.db2/Db2Source.java:96 Unused method parameters should be removed Remove this unused method parameter "database".
java:S1488 (MINOR) io.airbyte.integrations.source.db2/Db2Source.java:98 Local variables should not be declared and then immediately returned or thrown Immediately return this expression instead of assigning it to the temporary variable "ps".
java:S2142 (MAJOR) io.airbyte.integrations.source.db2/Db2Source.java:121 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S1068 (MAJOR) io.airbyte.integrations.source.db2/Db2Source.java:36 Unused "private" fields should be removed Remove this unused "operations" private field.
java:S1598 (CRITICAL) io.airbyte.integrations.source.db2/Db2SourceOperations.java:5 Package declaration should match source file directory This file "Db2SourceOperations.java" should be located in "io/airbyte/integrations/source/db2" directory, not in "/github/workspace/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2"(Do not use dots in directory names).
java:S1068 (MAJOR) io.airbyte.integrations.source.db2/Db2SourceOperations.java:20 Unused "private" fields should be removed Remove this unused "LOGGER" private field.
java:S1192 (CRITICAL) io.airbyte.integrations.source.db2/Db2Source.java:61 String literals should not be duplicated Define a constant instead of duplicating this literal "username" 4 times.
java:S1192 (CRITICAL) io.airbyte.integrations.source.db2/Db2Source.java:62 String literals should not be duplicated Define a constant instead of duplicating this literal "password" 4 times.
java:S112 (MAJOR) io.airbyte.integrations.source.db2/Db2Source.java:122 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) io.airbyte.integrations.source.db2/Db2Source.java:156 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1598 (CRITICAL) io.airbyte.integrations.source.db2/Db2Source.java:5 Package declaration should match source file directory This file "Db2Source.java" should be located in "io/airbyte/integrations/source/db2" directory, not in "/github/workspace/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2"(Do not use dots in directory names).

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io.airbyte.integrations.source.db2/Db2Source.java 0.0 src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java 0.0

Please sign in to comment.