diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java index a419efdc6b6d..6b723ff5053d 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; /** * @@ -128,6 +129,7 @@ public void close() throws IOException serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath); serverProperties.put("zookeeper.session.timeout.ms", "10000"); serverProperties.put("zookeeper.sync.time.ms", "200"); + serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000)); kafkaConfig = new KafkaConfig(serverProperties); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index bd271d599eeb..8c8e3be841b9 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -214,24 +214,8 @@ public File mergeQueryableIndex( ProgressIndicator progress ) throws IOException { - // We are materializing the list for performance reasons. Lists.transform - // only creates a "view" of the original list, meaning the function gets - // applied every time you access an element. - List indexAdapteres = Lists.newArrayList( - Iterables.transform( - indexes, - new Function() - { - @Override - public IndexableAdapter apply(final QueryableIndex input) - { - return new QueryableIndexIndexableAdapter(input); - } - } - ) - ); return merge( - indexAdapteres, + toIndexableAdapters(indexes), rollup, metricAggs, outDir, @@ -268,6 +252,26 @@ public Iterable apply(@Nullable IndexableAdapter input) ); } + private static List toIndexableAdapters(List indexes) + { + // We are materializing the list for performance reasons. Lists.transform + // only creates a "view" of the original list, meaning the function gets + // applied every time you access an element. + return Lists.newArrayList( + Iterables.transform( + indexes, + new Function() + { + @Override + public IndexableAdapter apply(final QueryableIndex input) + { + return new QueryableIndexIndexableAdapter(input); + } + } + ) + ); + } + private static List getLongestSharedDimOrder(List indexes) { int maxSize = 0; @@ -303,6 +307,11 @@ private static List getLongestSharedDimOrder(List inde return ImmutableList.copyOf(orderingCandidate); } + public static List getMergedDimensionsFromQueryableIndexes(List indexes) + { + return getMergedDimensions(toIndexableAdapters(indexes)); + } + public static List getMergedDimensions(List indexes) { if (indexes.size() == 0) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index c2bfdf1e9b93..4e3062a5d0cf 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -575,11 +575,9 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink tuningConfig.getIndexSpec() ); - QueryableIndex index = indexIO.loadIndex(mergedFile); - DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) ); objectMapper.writeValue(descriptorFile, segment); @@ -925,6 +923,14 @@ public Object apply(@Nullable Object input) if (cache != null) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } + try { + hydrant.getSegment().close(); + } + catch (IOException e) { + log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource()) + .addData("identifier", hydrant.getSegment().getIdentifier()) + .emit(); + } } if (removeOnDiskData) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 161cc6a3905a..1e1b3cb18092 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -42,10 +42,10 @@ import io.druid.concurrent.TaskThreadPriority; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.granularity.Granularity; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -424,12 +424,11 @@ public void doRun() metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); - QueryableIndex index = indexIO.loadIndex(mergedFile); log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) ); log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); segmentPublisher.publishSegment(segment); @@ -861,6 +860,7 @@ protected void abandonSegment(final long truncatedTime, final Sink sink) ); for (FireHydrant hydrant : sink) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + hydrant.getSegment().close(); } synchronized (handoffCondition) { handoffCondition.notifyAll();