Skip to content

Commit

Permalink
Add FixedSplitSource constructors
Browse files Browse the repository at this point in the history
Add a single element constructor and `emptyFixedSource` static creator.
No functional changes, just a slightly more elegant code.
  • Loading branch information
skrzypo987 committed Mar 9, 2023
1 parent defd6ee commit 50f0db8
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(ImmutableList.of(MOCK_CONNECTOR_SPLIT));
return new FixedSplitSource(MOCK_CONNECTOR_SPLIT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public class FixedSplitSource
private final Optional<List<Object>> tableExecuteSplitsInfo;
private int offset;

public static FixedSplitSource emptySplitSource()
{
return new FixedSplitSource(List.of());
}

public FixedSplitSource(ConnectorSplit split)
{
this(List.of(split));
}

public FixedSplitSource(Iterable<? extends ConnectorSplit> splits)
{
this(splits, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected static Optional<ColumnMapping> mapToUnboundedVarchar(JdbcTypeHandle ty
@Override
public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle)
{
return new FixedSplitSource(ImmutableList.of(new JdbcSplit(Optional.empty())));
return new FixedSplitSource(new JdbcSplit(Optional.empty()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public ConnectorSplitSource getSplits(

if (!bigQueryTableHandle.isNamedRelation()) {
List<BigQueryColumnHandle> columns = bigQueryTableHandle.getProjectedColumns().orElse(ImmutableList.of());
return new FixedSplitSource(ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)));
return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter));
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;

import static io.trino.plugin.cassandra.CassandraSessionProperties.getSplitsPerNode;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -103,7 +104,7 @@ public ConnectorSplitSource getSplits(

if (partitions.isEmpty()) {
log.debug("No partitions matched predicates for table %s", connectorTableHandle);
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

// if this is an unpartitioned table, split into equal ranges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -104,7 +105,7 @@ public ConnectorSplitSource getSplits(
if (deltaLakeTableHandle.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

DeltaLakeSplitSource splitSource = new DeltaLakeSplitSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.elasticsearch;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.elasticsearch.client.ElasticsearchClient;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand Down Expand Up @@ -55,7 +54,7 @@ public ConnectorSplitSource getSplits(
ElasticsearchTableHandle tableHandle = (ElasticsearchTableHandle) table;

if (tableHandle.getType().equals(QUERY)) {
return new FixedSplitSource(ImmutableList.of(new ElasticsearchSplit(tableHandle.getIndex(), 0, Optional.empty())));
return new FixedSplitSource(new ElasticsearchSplit(tableHandle.getIndex(), 0, Optional.empty()));
}
List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), shard.getAddress()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Collections.emptyIterator;
Expand Down Expand Up @@ -225,7 +226,7 @@ public ConnectorSplitSource getSplits(
if (hiveTable.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

// get buckets from first partition (arbitrary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand Down Expand Up @@ -66,7 +67,7 @@ public ConnectorSplitSource getSplits(
if (table.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

Table icebergTable = transactionManager.get(transaction, session.getIdentity()).getIcebergTable(session, table.getSchemaTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.mongodb;

import com.google.common.collect.ImmutableList;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand Down Expand Up @@ -49,6 +48,6 @@ public ConnectorSplitSource getSplits(
{
MongoSplit split = new MongoSplit(addresses);

return new FixedSplitSource(ImmutableList.of(split));
return new FixedSplitSource(split);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import static io.trino.plugin.pinot.PinotSplit.createSegmentSplit;
import static io.trino.spi.ErrorType.USER_ERROR;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;

public class PinotSplitManager
Expand All @@ -66,7 +65,7 @@ public PinotSplitManager(PinotClient pinotClient)

protected ConnectorSplitSource generateSplitForBrokerBasedScan(PinotTableHandle pinotTableHandle)
{
return new FixedSplitSource(singletonList(createBrokerSplit()));
return new FixedSplitSource(createBrokerSplit());
}

protected ConnectorSplitSource generateSplitsForSegmentBasedScan(
Expand Down

0 comments on commit 50f0db8

Please sign in to comment.