Skip to content

Commit

Permalink
explicitly unmap hydrant files when abandonSegment to recycle mmap me…
Browse files Browse the repository at this point in the history
…mory (#4341)

* fix TestKafkaExtractionCluster fail due to port already used

* explicitly unmap hydrant files when abandonSegment to recyle mmap memory

* address the comments

* apply to AppenderatorImpl
  • Loading branch information
kaijianding authored and leventov committed Jun 1, 2017
1 parent 1150bf7 commit 0efd182
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

/**
*
Expand Down Expand Up @@ -128,6 +129,7 @@ public void close() throws IOException
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));

kafkaConfig = new KafkaConfig(serverProperties);

Expand Down
43 changes: 26 additions & 17 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,8 @@ public File mergeQueryableIndex(
ProgressIndicator progress
) throws IOException
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
List<IndexableAdapter> indexAdapteres = Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
return merge(
indexAdapteres,
toIndexableAdapters(indexes),
rollup,
metricAggs,
outDir,
Expand Down Expand Up @@ -268,6 +252,26 @@ public Iterable<String> apply(@Nullable IndexableAdapter input)
);
}

private static List<IndexableAdapter> toIndexableAdapters(List<QueryableIndex> indexes)
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
return Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
}

private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> indexes)
{
int maxSize = 0;
Expand Down Expand Up @@ -303,6 +307,11 @@ private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> inde
return ImmutableList.copyOf(orderingCandidate);
}

public static List<String> getMergedDimensionsFromQueryableIndexes(List<QueryableIndex> indexes)
{
return getMergedDimensions(toIndexableAdapters(indexes));
}

public static List<String> getMergedDimensions(List<IndexableAdapter> indexes)
{
if (indexes.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,9 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
tuningConfig.getIndexSpec()
);

QueryableIndex index = indexIO.loadIndex(mergedFile);

DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);

objectMapper.writeValue(descriptorFile, segment);
Expand Down Expand Up @@ -925,6 +923,14 @@ public Object apply(@Nullable Object input)
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
try {
hydrant.getSegment().close();
}
catch (IOException e) {
log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource())
.addData("identifier", hydrant.getSegment().getIdentifier())
.emit();
}
}

if (removeOnDiskData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -424,12 +424,11 @@ public void doRun()
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));

QueryableIndex index = indexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());

DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);
Expand Down Expand Up @@ -861,6 +860,7 @@ protected void abandonSegment(final long truncatedTime, final Sink sink)
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
hydrant.getSegment().close();
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();
Expand Down

0 comments on commit 0efd182

Please sign in to comment.