Skip to content

Commit

Permalink
Fixed Test part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Oct 10, 2024
1 parent 5eb18d6 commit 66a07b5
Show file tree
Hide file tree
Showing 40 changed files with 1,197 additions and 48 deletions.
48 changes: 47 additions & 1 deletion plugin/trino-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>461-SNAPSHOT</version>
<version>458</version>
<!--<version>461-SNAPSHOT</version>-->
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -403,9 +404,36 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

Expand All @@ -422,12 +450,30 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-containers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino.tpch</groupId>
<artifactId>tpch</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,28 @@ public String getConnectorId()
}

@JsonProperty
@Override
public String getName()
{
return name;
}

@JsonProperty
@Override
public String getMapping()
{
return mapping;
}

@Override
@JsonProperty
public String getDataFormat()
{
return dataFormat;
}

@JsonProperty
@Override
public Type getType()
{
return type;
Expand All @@ -133,12 +137,14 @@ public boolean isHidden()
}

@JsonProperty
@Override
public boolean isInternal()
{
return internal;
}

@JsonProperty
@Override
public String getFormatHint()
{
return formatHint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -51,9 +52,9 @@
public class PulsarConnectorCache
{
private static final Logger log = Logger.get(PulsarConnectorCache.class);
private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
// static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
///private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
//private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
@VisibleForTesting
static PulsarConnectorCache instance;
private final MetadataStoreExtended metadataStore;
Expand Down Expand Up @@ -188,10 +189,10 @@ private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPo
offloadPolicies.getManagedLedgerOffloadDriver());

try {
return offloaderFactory.create(offloadPolicies, ImmutableMap.of(LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()), this.offloaderScheduler, this.offloaderStats);
return offloaderFactory.create(offloadPolicies, ImmutableMap.of(LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(Locale.getDefault()), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(Locale.getDefault()), PulsarVersion.getGitSha()), this.offloaderScheduler, this.offloaderStats);
}
catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
log.error("Failed to create offloader: %s", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public PulsarConnectorConfig setZookeeperUri(String zookeeperUri)
return this;
}

@NotNull
public String getMetadataUrl()
{
return this.metadataUrl;
Expand All @@ -184,7 +183,6 @@ public PulsarConnectorConfig setMetadataUrl(String metadataUrl)
return this;
}

@NotNull
public int getMaxEntryReadBatchSize()
{
return this.entryReadBatchSize;
Expand All @@ -197,7 +195,6 @@ public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize)
return this;
}

@NotNull
public int getTargetNumSplits()
{
return this.targetNumSplits;
Expand All @@ -210,7 +207,6 @@ public PulsarConnectorConfig setTargetNumSplits(int targetNumSplits)
return this;
}

@NotNull
public int getMaxSplitMessageQueueSize()
{
return this.maxSplitMessageQueueSize;
Expand All @@ -223,7 +219,6 @@ public PulsarConnectorConfig setMaxSplitMessageQueueSize(int maxSplitMessageQueu
return this;
}

@NotNull
public int getMaxSplitEntryQueueSize()
{
return this.maxSplitEntryQueueSize;
Expand All @@ -236,7 +231,6 @@ public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSiz
return this;
}

@NotNull
public long getMaxSplitQueueSizeBytes()
{
return this.maxSplitQueueSizeBytes;
Expand All @@ -249,7 +243,6 @@ public PulsarConnectorConfig setMaxSplitQueueSizeBytes(long maxSplitQueueSizeByt
return this;
}

@NotNull
public String getStatsProvider()
{
return statsProvider;
Expand All @@ -262,7 +255,6 @@ public PulsarConnectorConfig setStatsProvider(String statsProvider)
return this;
}

@NotNull
public Map<String, String> getStatsProviderConfigs()
{
return statsProviderConfigs;
Expand Down Expand Up @@ -573,7 +565,6 @@ public PulsarConnectorConfig setNarExtractionDirectory(String narExtractionDirec
return this;
}

@NotNull
public PulsarAdmin getPulsarAdmin()
throws PulsarClientException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static String restoreNamespaceDelimiterIfNeeded(String namespace, PulsarC

public static long roundToTrinoTime(long timestamp)
{
Instant.ofEpochMilli(timestamp);
var unused = Instant.ofEpochMilli(timestamp);
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
int roundedNanos = toIntExact(round(date.getNano(), 6));
LocalDateTime rounded = date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ static class ChunkedMessageCtx
{
private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>()
{
@Override
protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle)
{
return new ChunkedMessageCtx(handle);
Expand Down Expand Up @@ -846,9 +847,9 @@ public void run()
readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());

long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
long entriesToSkip =
(numEntries - cursor.getReadPosition().getEntryId()) + 1;
cursor.skipEntries(toIntExact((entriesToSkip)));
long entriesToSkip = (numEntries - cursor.getReadPosition().getEntryId()) + 1;

cursor.skipEntries(toIntExact(entriesToSkip));

entriesProcessed += entriesToSkip;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class PulsarRecordSetProvider

private final PulsarDispatchingRowDecoderFactory decoderFactory;

private final PulsarConnectorCache pulsarConnectorManagedLedgerFactory;
//private final PulsarConnectorCache pulsarConnectorManagedLedgerFactory;

@Inject
public PulsarRecordSetProvider(
Expand All @@ -44,7 +44,7 @@ public PulsarRecordSetProvider(
{
this.decoderFactory = requireNonNull(decoderFactory, "decoderFactory is null");
this.pulsarConnectorConfig = requireNonNull(pulsarConnectorConfig, "pulsarConnectorConfig is null");
this.pulsarConnectorManagedLedgerFactory = requireNonNull(pulsarConnectorManagedLedgerFactory, "pulsarConnectorManagedLedgerFactory is null");
//this.pulsarConnectorManagedLedgerFactory = requireNonNull(pulsarConnectorManagedLedgerFactory, "pulsarConnectorManagedLedgerFactory is null");
requireNonNull(this.pulsarConnectorConfig.getWebServiceUrl(), "web-service-url is null");
requireNonNull(this.pulsarConnectorConfig.getMetadataUrl(), "metadata-uri is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion)
return completedFuture(cache.get(BytesSchemaVersion.of(schemaVersion)));
}
catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
LOG.error("Can't get generic schema for topic %s schema version %s: %s",
topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
return FutureUtil.failedFuture(e.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSplit;
Expand All @@ -40,7 +39,7 @@
public class PulsarSplit
implements ConnectorSplit
{
private static final Logger log = Logger.get(PulsarSplit.class);
//private static final Logger log = Logger.get(PulsarSplit.class);

private final long splitId;
private final String connectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public class PulsarSplitManager
private static final Logger log = Logger.get(PulsarSplitManager.class);
private final String connectorId;
private final PulsarConnectorConfig pulsarConnectorConfig;
private final PulsarConnectorCache pulsarConnectorManagedLedgerFactory;
//private final PulsarConnectorCache pulsarConnectorManagedLedgerFactory;
private final ObjectMapper objectMapper = new ObjectMapper();

@Inject
public PulsarSplitManager(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig, PulsarConnectorCache pulsarConnectorManagedLedgerFactory)
{
this.pulsarConnectorConfig = requireNonNull(pulsarConnectorConfig, "pulsarConnectorConfig is null");
this.pulsarConnectorManagedLedgerFactory = requireNonNull(pulsarConnectorManagedLedgerFactory, "pulsarConnectorManagedLedgerFactory is null");
//this.pulsarConnectorManagedLedgerFactory = requireNonNull(pulsarConnectorManagedLedgerFactory, "pulsarConnectorManagedLedgerFactory is null");
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
}

Expand Down Expand Up @@ -180,7 +180,7 @@ else if (e.getStatusCode() == 404) {
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e); //e.printStackTrace();
}
return new FixedSplitSource(splits);
}
Expand Down Expand Up @@ -229,7 +229,7 @@ private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<C
{
int numPartitions;
try (PulsarAdmin pulsarAdmin = PulsarAdminClientProvider.getPulsarAdmin(pulsarConnectorConfig)) {
numPartitions = (pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
numPartitions = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString()).partitions;
}
catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
Expand Down Expand Up @@ -389,13 +389,13 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
private static class PredicatePushdownInfo
{
private final ImmutablePositionImpl startPosition;
private final ImmutablePositionImpl endPosition;
//private final ImmutablePositionImpl endPosition;
private final long numOfEntries;

private PredicatePushdownInfo(ImmutablePositionImpl startPosition, ImmutablePositionImpl endPosition, long numOfEntries)
private PredicatePushdownInfo(ImmutablePositionImpl startPosition, long numOfEntries)
{
this.startPosition = startPosition;
this.endPosition = endPosition;
//var _endPosition = endPosition;
this.numOfEntries = numOfEntries;
}

Expand Down Expand Up @@ -472,7 +472,7 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;

PredicatePushdownInfo predicatePushdownInfo =
new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
new PredicatePushdownInfo(overallStartPos, numOfEntries);
log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
return predicatePushdownInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private static Block serializeObject(BlockBuilder builder, Object value, Type ty
return serializeRow(builder, value, type, columnName);
}
if (type instanceof DecimalType && !((DecimalType) type).isShort()) {
return serializeLongDecimal(builder, value, type, columnName);
return serializeLongDecimal(builder, value, type);
}
serializePrimitive(builder, value, type, columnName);
return null;
Expand Down Expand Up @@ -194,8 +194,9 @@ private static Block serializeList(BlockBuilder parentBlockBuilder, Object value
}

private static Block serializeLongDecimal(
BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
BlockBuilder parentBlockBuilder, Object value, Type type)
{
//var neme = columnName;
final BlockBuilder blockBuilder;
if (parentBlockBuilder != null) {
blockBuilder = parentBlockBuilder;
Expand All @@ -204,6 +205,7 @@ private static Block serializeLongDecimal(
blockBuilder = type.createBlockBuilder(null, 1);
}
final ByteBuffer buffer = (ByteBuffer) value;
buffer.arrayOffset();
type.writeObject(blockBuilder, Int128.fromBigEndian(buffer.array()));
if (parentBlockBuilder == null) {
return ((Block) blockBuilder).getSingleValueBlock(0);
Expand Down
Loading

0 comments on commit 66a07b5

Please sign in to comment.