Skip to content

Commit

Permalink
GOBBLIN-1715: Support vectorized row batch pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr committed Oct 7, 2022
1 parent a246e23 commit 02cf437
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Properties;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
Expand Down Expand Up @@ -90,8 +91,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {

private final OrcValueWriter<D> valueWriter;
@VisibleForTesting
final VectorizedRowBatch rowBatch;
VectorizedRowBatch rowBatch;
private final TypeDescription typeDescription;
private final Writer orcFileWriter;
private final RowBatchPool rowBatchPool;
private final boolean enableRowBatchPool;

// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private volatile boolean closed = false;
Expand All @@ -101,7 +105,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected final S inputSchema;

/**
* There are couple of parameters in ORC writer that requires manual tuning based on record size given that executor
* There are a couple of parameters in ORC writer that requires manual tuning based on record size given that executor
* for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the
* argument {@param properties}.
*
Expand Down Expand Up @@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)

// Create value-writer which is essentially a record-by-record-converter with buffering in batch.
this.inputSchema = builder.getSchema();
TypeDescription typeDescription = getOrcSchema();
this.typeDescription = getOrcSchema();
this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
this.rowBatch = typeDescription.createRowBatch(this.batchSize);
this.rowBatchPool = RowBatchPool.instance(properties);
this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize);
this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);

log.info("Created ORC writer, batch size: {}, {}: {}",
batchSize, OrcConf.ROWS_BETWEEN_CHECKS.name(), properties.getProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
properties.getProp(
OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));

// Create file-writer
Expand Down Expand Up @@ -235,6 +243,9 @@ private synchronized void closeInternal()
this.flush();
this.orcFileWriter.close();
this.closed = true;
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
Expand Down Expand Up @@ -269,6 +280,7 @@ public void commit()
@Override
public void write(D record)
throws IOException {
Preconditions.checkState(!closed, "Writer already closed");
valueWriter.write(record, rowBatch);
if (rowBatch.size == this.batchSize) {
orcFileWriter.addRowBatch(rowBatch);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/***
* Maintains a pool of row batches per orc schema.
* Expires row batches which have not been accessed for {@code ROW_BATCH_EXPIRY_INTERVAL}
*/
@Slf4j
public class RowBatchPool {
static final String PREFIX = "orc.row.batch.";
static final String ENABLE_ROW_BATCH_POOL = PREFIX + "enable";

static final String ROW_BATCH_EXPIRY_INTERVAL = PREFIX + "expiry.interval.secs";
static final int DEFAULT_ROW_BATCH_EXPIRY_INTERVAL = 10;

static final String ROW_BATCH_EXPIRY_PERIOD = PREFIX + "expiry.period.secs";
static final int DEFAULT_ROW_BATCH_EXPIRY_PERIOD = 1;

private static RowBatchPool INSTANCE;

private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
private final ScheduledExecutorService rowBatchExpiryThread;
private final long rowBatchExpiryInterval;

private RowBatchPool(State properties) {
rowBatches = Maps.newHashMap();
rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).build());
// expire row batches older N secs
rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, DEFAULT_ROW_BATCH_EXPIRY_INTERVAL);
// check every N secs
long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, DEFAULT_ROW_BATCH_EXPIRY_PERIOD);
rowBatchExpiryThread.scheduleAtFixedRate(
rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
}

private Runnable rowBatchExpiryFn() {
return () -> {
synchronized (rowBatches) {
for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
LinkedList<RowBatchHolder> val = e.getValue();
val.removeIf(this::candidateForRemoval);
}
}
};
}

private boolean candidateForRemoval(RowBatchHolder batch) {
long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
long interval = System.currentTimeMillis() - batch.lastUsed;
if (interval > expiryInterval) {
log.info("Expiring row batch {} as it has not been accessed since {} ms",
System.identityHashCode(batch.rowBatch), interval);
return true;
} else {
return false;
}
}

private static class RowBatchHolder {
long lastUsed;
VectorizedRowBatch rowBatch;

private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
this.rowBatch = rowBatch;
this.lastUsed = currentTimeMillis;
}
}

public synchronized static RowBatchPool instance(State properties) {
if (INSTANCE == null) {
INSTANCE = new RowBatchPool(properties);
}
return INSTANCE;
}

public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
synchronized (rowBatches) {
LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
VectorizedRowBatch rowBatch;

if (vals == null || vals.size() == 0) {
rowBatch = schema.createRowBatch(batchSize);
log.info("Creating new row batch {}", System.identityHashCode(rowBatch));
} else {
rowBatch = vals.removeLast().rowBatch;
log.info("Using existing row batch {}", System.identityHashCode(rowBatch));
}
return rowBatch;
}
}

public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) {
log.info("Recycling row batch {}", System.identityHashCode(rowBatch));
synchronized (rowBatches) {
rowBatches.computeIfAbsent(schema, ignore -> Lists.newLinkedList());
LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
vals.add(new RowBatchHolder(rowBatch, System.currentTimeMillis()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.testng.Assert;
import org.testng.annotations.Test;

public class RowBatchPoolTest {
@Test
public void testExpiry() throws Exception {
State state = WorkUnit.createEmpty();
RowBatchPool instance = RowBatchPool.instance(state);
TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
instance.recycle(schema, rowBatch1);
VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
// existing rowbatch is fetched from pool
Assert.assertEquals(rowBatch1, rowBatch2);

// since the pool has no existing rowbatch, a new one is created
VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
Assert.assertNotEquals(rowBatch1, rowBatch3);

// recyle fetched rowbatches
instance.recycle(schema, rowBatch2);
instance.recycle(schema, rowBatch3);

// wait for their expiry
Thread.sleep(RowBatchPool.DEFAULT_ROW_BATCH_EXPIRY_INTERVAL * 1000L);
VectorizedRowBatch rowBatch4 = instance.getRowBatch(schema, 1024);
// new rowbatch is created, all old ones are expired
Assert.assertNotEquals(rowBatch1, rowBatch4);
}
}

0 comments on commit 02cf437

Please sign in to comment.