Skip to content

Commit

Permalink
SSH for Postgres Source (#5742)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Sep 2, 2021
1 parent 175d379 commit 7bf531a
Show file tree
Hide file tree
Showing 25 changed files with 1,110 additions and 127 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ jobs:
MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }}
MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }}
PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }}
POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }}
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }}
MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }}
PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }}
POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }}
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
87 changes: 73 additions & 14 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.stream.MoreStreams;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class Jsons {
Expand All @@ -49,42 +53,42 @@ public class Jsons {
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writer(new JsonPrettyPrinter());

public static <T> String serialize(T object) {
public static <T> String serialize(final T object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static <T> T deserialize(final String jsonString, final Class<T> klass) {
try {
return OBJECT_MAPPER.readValue(jsonString, klass);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static JsonNode deserialize(final String jsonString) {
try {
return OBJECT_MAPPER.readTree(jsonString);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static <T> Optional<T> tryDeserialize(final String jsonString, final Class<T> klass) {
try {
return Optional.of(OBJECT_MAPPER.readValue(jsonString, klass));
} catch (IOException e) {
} catch (final IOException e) {
return Optional.empty();
}
}

public static Optional<JsonNode> tryDeserialize(final String jsonString) {
try {
return Optional.of(OBJECT_MAPPER.readTree(jsonString));
} catch (IOException e) {
} catch (final IOException e) {
return Optional.empty();
}
}
Expand All @@ -108,15 +112,15 @@ public static <T> T object(final JsonNode jsonNode, final TypeReference<T> typeR
public static <T> Optional<T> tryObject(final JsonNode jsonNode, final Class<T> klass) {
try {
return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, klass));
} catch (Exception e) {
} catch (final Exception e) {
return Optional.empty();
}
}

public static <T> Optional<T> tryObject(final JsonNode jsonNode, final TypeReference<T> typeReference) {
try {
return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, typeReference));
} catch (Exception e) {
} catch (final Exception e) {
return Optional.empty();
}
}
Expand All @@ -126,30 +130,85 @@ public static <T> T clone(final T object) {
return (T) deserialize(serialize(object), object.getClass());
}

public static byte[] toBytes(JsonNode jsonNode) {
public static byte[] toBytes(final JsonNode jsonNode) {
return serialize(jsonNode).getBytes(Charsets.UTF_8);
}

public static Set<String> keys(JsonNode jsonNode) {
public static Set<String> keys(final JsonNode jsonNode) {
if (jsonNode.isObject()) {
return Jsons.object(jsonNode, new TypeReference<Map<String, Object>>() {}).keySet();
} else {
return new HashSet<>();
}
}

public static List<JsonNode> children(JsonNode jsonNode) {
public static List<JsonNode> children(final JsonNode jsonNode) {
return MoreStreams.toStream(jsonNode.elements()).collect(Collectors.toList());
}

public static String toPrettyString(JsonNode jsonNode) {
public static String toPrettyString(final JsonNode jsonNode) {
try {
return OBJECT_WRITER.writeValueAsString(jsonNode) + "\n";
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static JsonNode navigateTo(JsonNode node, final List<String> keys) {
for (final String key : keys) {
node = node.get(key);
}
return node;
}

public static void replaceNestedString(final JsonNode json, final List<String> keys, final String replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}

public static void replaceNestedInt(final JsonNode json, final List<String> keys, final int replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}

private static void replaceNested(final JsonNode json, final List<String> keys, final BiConsumer<ObjectNode, String> typedReplacement) {
Preconditions.checkArgument(keys.size() > 0, "Must pass at least one key");
final JsonNode nodeContainingFinalKey = navigateTo(json, keys.subList(0, keys.size() - 1));
typedReplacement.accept((ObjectNode) nodeContainingFinalKey, keys.get(keys.size() - 1));
}

public static Optional<JsonNode> getOptional(final JsonNode json, final String... keys) {
return getOptional(json, Arrays.asList(keys));
}

public static Optional<JsonNode> getOptional(JsonNode json, final List<String> keys) {
for (final String key : keys) {
if (json == null) {
return Optional.empty();
}

json = json.get(key);
}

return Optional.ofNullable(json);
}

public static String getStringOrNull(final JsonNode json, final String... keys) {
return getStringOrNull(json, Arrays.asList(keys));
}

public static String getStringOrNull(final JsonNode json, final List<String> keys) {
final Optional<JsonNode> optional = getOptional(json, keys);
return optional.map(JsonNode::asText).orElse(null);
}

public static int getIntOrZero(final JsonNode json, final String... keys) {
return getIntOrZero(json, Arrays.asList(keys));
}

public static int getIntOrZero(final JsonNode json, final List<String> keys) {
final Optional<JsonNode> optional = getOptional(json, keys);
return optional.map(JsonNode::asInt).orElse(0);
}

/**
* By the Jackson DefaultPrettyPrinter prints objects with an extra space as follows: {"name" :
* "airbyte"}. We prefer {"name": "airbyte"}.
Expand All @@ -165,7 +224,7 @@ public DefaultPrettyPrinter createInstance() {

// override the method that inserts the extra space.
@Override
public DefaultPrettyPrinter withSeparators(Separators separators) {
public DefaultPrettyPrinter withSeparators(final Separators separators) {
_separators = separators;
_objectFieldValueSeparatorWithSpaces = separators.getObjectFieldValueSeparator() + " ";
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@

public class Strings {

public static String join(Iterable<?> iterable, CharSequence separator) {
public static String join(final Iterable<?> iterable, final CharSequence separator) {
return Streams.stream(iterable)
.map(Object::toString)
.collect(Collectors.joining(separator));
}

public static String addRandomSuffix(String base, String separator, int suffixLength) {
public static String addRandomSuffix(final String base, final String separator, final int suffixLength) {
return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase();
}

public static String safeTrim(final String string) {
return string == null ? null : string.trim();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -241,6 +242,32 @@ void testToPrettyString() {
assertEquals(expectedOutput, Jsons.toPrettyString(jsonNode));
}

@Test
void testGetOptional() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");

assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def"));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl"));
assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno"));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "def", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "jkl", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", "xyz"));
}

@Test
void testGetStringOrNull() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }");

assertEquals("ghi", Jsons.getStringOrNull(json, "abc", "def"));
assertEquals("mno", Jsons.getStringOrNull(json, "jkl"));
assertEquals("1", Jsons.getStringOrNull(json, "pqr"));
assertNull(Jsons.getStringOrNull(json, "abc", "def", "xyz"));
assertNull(Jsons.getStringOrNull(json, "xyz"));
}

private static class ToClass {

@JsonProperty("str")
Expand All @@ -254,21 +281,21 @@ private static class ToClass {

public ToClass() {}

public ToClass(String str, Integer num, long numLong) {
public ToClass(final String str, final Integer num, final long numLong) {
this.str = str;
this.num = num;
this.numLong = numLong;
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ToClass toClass = (ToClass) o;
final ToClass toClass = (ToClass) o;
return numLong == toClass.numLong
&& Objects.equals(str, toClass.str)
&& Objects.equals(num, toClass.num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.10
dockerImageTag: 0.3.11
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
Expand Down
6 changes: 6 additions & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ plugins {

dependencies {
implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.sshd:sshd-mina:2.7.0'
// bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java
// because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation
implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
implementation 'org.bouncycastle:bcpkix-jdk15on:1.66'
implementation 'org.bouncycastle:bctls-jdk15on:1.66'

implementation project(':airbyte-protocol:models')
implementation project(":airbyte-json-validation")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.base.ssh;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;

public class SshHelpers {

public static ConnectorSpecification getSpecAndInjectSsh() throws IOException {
final ConnectorSpecification originalSpec = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
return injectSshIntoSpec(originalSpec);
}

public static ConnectorSpecification injectSshIntoSpec(final ConnectorSpecification connectorSpecification) throws IOException {
final ConnectorSpecification originalSpec = Jsons.clone(connectorSpecification);
final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties");
propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json")));
return originalSpec;
}

}
Loading

0 comments on commit 7bf531a

Please sign in to comment.