Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert some classes to record in Kinesis #21770

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Supplier;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.kinesis.KinesisCompressionCodec.UNCOMPRESSED;
import static java.util.Objects.requireNonNull;

public class KinesisMetadata
Expand Down Expand Up @@ -73,15 +74,15 @@ public KinesisTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
return new KinesisTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
table.getStreamName(),
getDataFormat(table.getMessage()),
table.getMessage().getCompressionCodec());
table.streamName(),
getDataFormat(table.message()),
table.message().compressionCodec().orElse(UNCOMPRESSED));
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession connectorSession, ConnectorTableHandle tableHandle)
{
return getTableMetadata(((KinesisTableHandle) tableHandle).toSchemaTableName());
return getTableMetadata(((KinesisTableHandle) tableHandle).schemaTableName());
}

@Override
Expand All @@ -102,21 +103,21 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession connectorSess
{
KinesisTableHandle kinesisTableHandle = (KinesisTableHandle) tableHandle;

KinesisStreamDescription kinesisStreamDescription = tableDescriptionSupplier.get().get(kinesisTableHandle.toSchemaTableName());
KinesisStreamDescription kinesisStreamDescription = tableDescriptionSupplier.get().get(kinesisTableHandle.schemaTableName());
if (kinesisStreamDescription == null) {
throw new TableNotFoundException(kinesisTableHandle.toSchemaTableName());
throw new TableNotFoundException(kinesisTableHandle.schemaTableName());
}

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();

int index = 0;
// Note: partition key and related fields are handled by internalFieldDescriptions below
KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage();
KinesisStreamFieldGroup message = kinesisStreamDescription.message();
if (message != null) {
List<KinesisStreamFieldDescription> fields = message.getFields();
List<KinesisStreamFieldDescription> fields = message.fields();
if (fields != null) {
for (KinesisStreamFieldDescription kinesisStreamFieldDescription : fields) {
columnHandles.put(kinesisStreamFieldDescription.getName(), kinesisStreamFieldDescription.getColumnHandle(index++));
columnHandles.put(kinesisStreamFieldDescription.name(), kinesisStreamFieldDescription.columnHandle(index++));
}
}
}
Expand Down Expand Up @@ -161,7 +162,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess

private static String getDataFormat(KinesisStreamFieldGroup fieldGroup)
{
return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.getDataFormat();
return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.dataFormat();
}

private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)
Expand All @@ -173,12 +174,12 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)

ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();

KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage();
KinesisStreamFieldGroup message = kinesisStreamDescription.message();
if (message != null) {
List<KinesisStreamFieldDescription> fields = message.getFields();
List<KinesisStreamFieldDescription> fields = message.fields();
if (fields != null) {
for (KinesisStreamFieldDescription fieldDescription : fields) {
builder.add(fieldDescription.getColumnMetadata());
builder.add(fieldDescription.columnMetadata());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ public ConnectorSplitSource getSplits(
{
KinesisTableHandle kinesisTableHandle = (KinesisTableHandle) table;

InternalStreamDescription description = this.getStreamDescription(kinesisTableHandle.getStreamName());
InternalStreamDescription description = this.getStreamDescription(kinesisTableHandle.streamName());

ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder();
for (Shard shard : description.getShards()) {
KinesisSplit split = new KinesisSplit(
kinesisTableHandle.getStreamName(),
kinesisTableHandle.getMessageDataFormat(),
kinesisTableHandle.getCompressionCodec(),
kinesisTableHandle.streamName(),
kinesisTableHandle.messageDataFormat(),
kinesisTableHandle.compressionCodec(),
shard.getShardId(),
shard.getSequenceNumberRange().getStartingSequenceNumber(),
shard.getSequenceNumberRange().getEndingSequenceNumber());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,22 @@
*/
package io.trino.plugin.kinesis;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;

/**
* This Class maintains all the details of Kinesis stream like name, fields of data, Trino table stream is mapping to, tables's schema name
*/
public class KinesisStreamDescription
public record KinesisStreamDescription(
String tableName,
String schemaName,
String streamName,
KinesisStreamFieldGroup message)
{
private final String tableName;
private final String streamName;
private final String schemaName;
private final KinesisStreamFieldGroup message;

@JsonCreator
public KinesisStreamDescription(
@JsonProperty("tableName") String tableName,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("streamName") String streamName,
@JsonProperty("message") KinesisStreamFieldGroup message)
public KinesisStreamDescription
{
checkArgument(!isNullOrEmpty(tableName), "tableName is null or is empty");
this.tableName = tableName;
this.streamName = requireNonNull(streamName, "streamName is null");
this.schemaName = schemaName;
this.message = message;
}

@JsonProperty
public String getTableName()
{
return tableName;
}

@JsonProperty
public String getStreamName()
{
return streamName;
}

@JsonProperty
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public KinesisStreamFieldGroup getMessage()
{
return message;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("tableName", tableName)
.add("streamName", streamName)
.add("schemaName", schemaName)
.add("message", message)
.toString();
requireNonNull(streamName, "streamName is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,149 +13,50 @@
*/
package io.trino.plugin.kinesis;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;

import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;

public class KinesisStreamFieldDescription
public record KinesisStreamFieldDescription(
String name,
Type type,
String mapping,
String comment,
String dataFormat,
String formatHint,
boolean hidden)
{
private final String name;
private final Type type;
private final String mapping;
private final String comment;
private final String dataFormat;
private final String formatHint;
private final boolean hidden;

@JsonCreator
public KinesisStreamFieldDescription(
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("mapping") String mapping,
@JsonProperty("comment") String comment,
@JsonProperty("dataFormat") String dataFormat,
@JsonProperty("formatHint") String formatHint,
@JsonProperty("hidden") boolean hidden)
public KinesisStreamFieldDescription
{
checkArgument(!isNullOrEmpty(name), "name is null or is empty");
this.name = name;
this.type = requireNonNull(type, "type is null");
this.mapping = mapping;
this.comment = comment;
this.dataFormat = dataFormat;
this.formatHint = formatHint;
this.hidden = hidden;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public Type getType()
{
return type;
}

@JsonProperty
public String getMapping()
{
return mapping;
}

@JsonProperty
public String getComment()
{
return comment;
}

@JsonProperty
public String getDataFormat()
{
return dataFormat;
requireNonNull(type, "type is null");
}

@JsonProperty
public String getFormatHint()
{
return formatHint;
}

@JsonProperty
public boolean isHidden()
{
return hidden;
}

KinesisColumnHandle getColumnHandle(int index)
KinesisColumnHandle columnHandle(int index)
{
return new KinesisColumnHandle(
index,
getName(),
getType(),
getMapping(),
getDataFormat(),
getFormatHint(),
isHidden(),
name(),
type(),
mapping(),
dataFormat(),
formatHint(),
hidden(),
false);
}

ColumnMetadata getColumnMetadata()
ColumnMetadata columnMetadata()
{
return ColumnMetadata.builder()
.setName(getName())
.setType(getType())
.setComment(Optional.ofNullable(getComment()))
.setHidden(isHidden())
.setName(name())
.setType(type())
.setComment(Optional.ofNullable(comment()))
.setHidden(hidden())
.build();
}

@Override
public int hashCode()
{
return Objects.hash(name, type, mapping, dataFormat, formatHint, hidden);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

KinesisStreamFieldDescription other = (KinesisStreamFieldDescription) obj;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.mapping, other.mapping) &&
Objects.equals(this.dataFormat, other.dataFormat) &&
Objects.equals(this.formatHint, other.formatHint) &&
Objects.equals(this.hidden, other.hidden);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("type", type)
.add("mapping", mapping)
.add("dataFormat", dataFormat)
.add("formatHint", formatHint)
.add("hidden", hidden)
.toString();
}
}
Loading