Skip to content

Commit

Permalink
Extract pluggable KafkaTableDescriptionSupplier
Browse files Browse the repository at this point in the history
  • Loading branch information
kokosing committed Feb 22, 2020
1 parent d5ee56c commit 0e379a3
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,27 @@

import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.prestosql.spi.NodeManager;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.TypeManager;

import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

public class KafkaConnectorFactory
implements ConnectorFactory
{
private final Module extension;
private final Optional<Supplier<Map<SchemaTableName, KafkaTopicDescription>>> tableDescriptionSupplier;

KafkaConnectorFactory(Module extension, Optional<Supplier<Map<SchemaTableName, KafkaTopicDescription>>> tableDescriptionSupplier)
KafkaConnectorFactory(Module extension)
{
this.extension = requireNonNull(extension, "extension is null");
this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
}

@Override
Expand Down Expand Up @@ -71,13 +64,6 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(NodeManager.class).toInstance(context.getNodeManager());

if (tableDescriptionSupplier.isPresent()) {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KafkaTopicDescription>>>() {}).toInstance(tableDescriptionSupplier.get());
}
else {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KafkaTopicDescription>>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
}
});

Injector injector = app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.inject.Inject;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
Expand All @@ -50,6 +51,7 @@ public void configure(Binder binder)
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(KafkaConfig.class);
newSetBinder(binder, TableDescriptionSupplier.class).addBinding().toProvider(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);

jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.prestosql.decoder.dummy.DummyRowDecoder;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
Expand All @@ -33,9 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.kafka.KafkaHandleResolver.convertColumnHandle;
import static io.prestosql.plugin.kafka.KafkaHandleResolver.convertTableHandle;
import static java.util.Objects.requireNonNull;
Expand All @@ -49,46 +49,41 @@ public class KafkaMetadata
implements ConnectorMetadata
{
private final boolean hideInternalColumns;
private final Map<SchemaTableName, KafkaTopicDescription> tableDescriptions;
private final Set<TableDescriptionSupplier> tableDescriptions;

@Inject
public KafkaMetadata(
KafkaConfig kafkaConfig,
Supplier<Map<SchemaTableName, KafkaTopicDescription>> kafkaTableDescriptionSupplier)
Set<TableDescriptionSupplier> tableDescriptions)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
this.hideInternalColumns = kafkaConfig.isHideInternalColumns();

requireNonNull(kafkaTableDescriptionSupplier, "kafkaTableDescriptionSupplier is null");
this.tableDescriptions = kafkaTableDescriptionSupplier.get();
this.tableDescriptions = requireNonNull(tableDescriptions, "tableDescriptions is null");
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
for (SchemaTableName tableName : tableDescriptions.keySet()) {
builder.add(tableName.getSchemaName());
}
return ImmutableList.copyOf(builder.build());
return tableDescriptions.stream()
.map(TableDescriptionSupplier::listTables)
.flatMap(Set::stream)
.map(SchemaTableName::getSchemaName)
.collect(toImmutableList());
}

@Override
public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
KafkaTopicDescription table = tableDescriptions.get(schemaTableName);
if (table == null) {
return null;
}

return new KafkaTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
table.getTopicName(),
getDataFormat(table.getKey()),
getDataFormat(table.getMessage()),
table.getKey().flatMap(KafkaTopicFieldGroup::getDataSchema),
table.getMessage().flatMap(KafkaTopicFieldGroup::getDataSchema));
return getTopicDescription(schemaTableName)
.map(kafkaTopicDescription -> new KafkaTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
kafkaTopicDescription.getTopicName(),
getDataFormat(kafkaTopicDescription.getKey()),
getDataFormat(kafkaTopicDescription.getMessage()),
kafkaTopicDescription.getKey().flatMap(KafkaTopicFieldGroup::getDataSchema),
kafkaTopicDescription.getMessage().flatMap(KafkaTopicFieldGroup::getDataSchema)))
.orElse(null);
}

private static String getDataFormat(Optional<KafkaTopicFieldGroup> fieldGroup)
Expand All @@ -105,14 +100,11 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
for (SchemaTableName tableName : tableDescriptions.keySet()) {
if (schemaName.map(tableName.getSchemaName()::equals).orElse(true)) {
builder.add(tableName);
}
}

return builder.build();
return tableDescriptions.stream()
.map(TableDescriptionSupplier::listTables)
.flatMap(Set::stream)
.filter(tableName -> schemaName.map(tableName.getSchemaName()::equals).orElse(true))
.collect(toImmutableList());
}

@SuppressWarnings("ValueOfIncrementOrDecrementUsed")
Expand All @@ -121,10 +113,8 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
KafkaTableHandle kafkaTableHandle = convertTableHandle(tableHandle);

KafkaTopicDescription kafkaTopicDescription = tableDescriptions.get(kafkaTableHandle.toSchemaTableName());
if (kafkaTopicDescription == null) {
throw new TableNotFoundException(kafkaTableHandle.toSchemaTableName());
}
SchemaTableName schemaTableName = kafkaTableHandle.toSchemaTableName();
KafkaTopicDescription kafkaTopicDescription = getRequiredTopicDescription(schemaTableName);

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();

Expand Down Expand Up @@ -191,10 +181,7 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
@SuppressWarnings("ValueOfIncrementOrDecrementUsed")
private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)
{
KafkaTopicDescription table = tableDescriptions.get(schemaTableName);
if (table == null) {
throw new TableNotFoundException(schemaTableName);
}
KafkaTopicDescription table = getRequiredTopicDescription(schemaTableName);

ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();

Expand Down Expand Up @@ -234,4 +221,18 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
{
return new ConnectorTableProperties();
}

private KafkaTopicDescription getRequiredTopicDescription(SchemaTableName schemaTableName)
{
return getTopicDescription(schemaTableName).orElseThrow(() -> new TableNotFoundException(schemaTableName));
}

private Optional<KafkaTopicDescription> getTopicDescription(SchemaTableName schemaTableName)
{
return tableDescriptions.stream()
.map(kafkaTableDescriptionSupplier -> kafkaTableDescriptionSupplier.getTopicDescription(schemaTableName))
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,10 @@
*/
package io.prestosql.plugin.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Module;
import io.prestosql.spi.Plugin;
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.connector.SchemaTableName;

import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

Expand All @@ -34,7 +28,6 @@ public class KafkaPlugin
};

private final Module extension;
private Optional<Supplier<Map<SchemaTableName, KafkaTopicDescription>>> tableDescriptionSupplier = Optional.empty();

public KafkaPlugin()
{
Expand All @@ -46,15 +39,9 @@ public KafkaPlugin(Module extension)
this.extension = requireNonNull(extension, "extension is null");
}

@VisibleForTesting
public synchronized void setTableDescriptionSupplier(Supplier<Map<SchemaTableName, KafkaTopicDescription>> tableDescriptionSupplier)
{
this.tableDescriptionSupplier = Optional.of(requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"));
}

@Override
public synchronized Iterable<ConnectorFactory> getConnectorFactories()
{
return ImmutableList.of(new KafkaConnectorFactory(extension, tableDescriptionSupplier));
return ImmutableList.of(new KafkaConnectorFactory(extension));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.prestosql.spi.connector.SchemaTableName;

import javax.inject.Inject;
import javax.inject.Provider;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,7 +32,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -40,7 +40,7 @@
import static java.util.Objects.requireNonNull;

public class KafkaTableDescriptionSupplier
implements Supplier<Map<SchemaTableName, KafkaTopicDescription>>
implements Provider<TableDescriptionSupplier>
{
private static final Logger log = Logger.get(KafkaTableDescriptionSupplier.class);

Expand All @@ -53,15 +53,20 @@ public class KafkaTableDescriptionSupplier
KafkaTableDescriptionSupplier(KafkaConfig kafkaConfig, JsonCodec<KafkaTopicDescription> topicDescriptionCodec)
{
this.topicDescriptionCodec = requireNonNull(topicDescriptionCodec, "topicDescriptionCodec is null");

requireNonNull(kafkaConfig, "kafkaConfig is null");
this.tableDescriptionDir = kafkaConfig.getTableDescriptionDir();
this.defaultSchema = kafkaConfig.getDefaultSchema();
this.tableNames = ImmutableSet.copyOf(kafkaConfig.getTableNames());
}

@Override
public Map<SchemaTableName, KafkaTopicDescription> get()
public TableDescriptionSupplier get()
{
Map<SchemaTableName, KafkaTopicDescription> tables = populateTables();
return new MapBasedTableDescriptionSupplier(tables);
}

private Map<SchemaTableName, KafkaTopicDescription> populateTables()
{
ImmutableMap.Builder<SchemaTableName, KafkaTopicDescription> builder = ImmutableMap.builder();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.prestosql.plugin.kafka;

import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.connector.SchemaTableName;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class MapBasedTableDescriptionSupplier
implements TableDescriptionSupplier
{
private final Map<SchemaTableName, KafkaTopicDescription> map;

public MapBasedTableDescriptionSupplier(Map<SchemaTableName, KafkaTopicDescription> map)
{
this.map = ImmutableMap.copyOf(requireNonNull(map, "map is null"));
}

@Override
public Set<SchemaTableName> listTables()
{
return map.keySet();
}

@Override
public Optional<KafkaTopicDescription> getTopicDescription(SchemaTableName schemaTableName)
{
return Optional.ofNullable(map.get(schemaTableName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.prestosql.plugin.kafka;

import io.prestosql.spi.connector.SchemaTableName;

import java.util.Optional;
import java.util.Set;

public interface TableDescriptionSupplier
{
Set<SchemaTableName> listTables();

Optional<KafkaTopicDescription> getTopicDescription(SchemaTableName schemaTableName);
}
Loading

0 comments on commit 0e379a3

Please sign in to comment.