Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOBBLIN-1715: Support for Vectorized row batch pooling #3574

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Properties;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
Expand Down Expand Up @@ -90,8 +91,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {

private final OrcValueWriter<D> valueWriter;
@VisibleForTesting
final VectorizedRowBatch rowBatch;
VectorizedRowBatch rowBatch;
private final TypeDescription typeDescription;
private final Writer orcFileWriter;
private final RowBatchPool rowBatchPool;
private final boolean enableRowBatchPool;

// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private volatile boolean closed = false;
Expand All @@ -101,7 +105,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected final S inputSchema;

/**
* There are couple of parameters in ORC writer that requires manual tuning based on record size given that executor
* There are a couple of parameters in ORC writer that requires manual tuning based on record size given that executor
* for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the
* argument {@param properties}.
*
Expand Down Expand Up @@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)

// Create value-writer which is essentially a record-by-record-converter with buffering in batch.
this.inputSchema = builder.getSchema();
TypeDescription typeDescription = getOrcSchema();
this.typeDescription = getOrcSchema();
this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
this.rowBatch = typeDescription.createRowBatch(this.batchSize);
this.rowBatchPool = RowBatchPool.instance(properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is GobblinBaseOrcWriter created / GC'ed multiple often during lifetime of the pipeline? Or is this just created once at the start of the pipeline. Not sure if it's overkill, but if the recycling is frequent enough it doesn't hurt to use https://www.baeldung.com/java-singleton-double-checked-locking instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some digging, and it seems like we use the GobblinOrcWriter in 2 spots.

  1. Fork
  2. AbstractJobLauncher

In (1) we init a writer per fork and we may have multiple forks in a Task, especially in streaming model task runner. From my understanding, a fork is long running so we only create a new orc writer during the the initial run of a task and retries
In (2) we init once during the job launch.

I think I am missing something here though because we wouldn't need to recycle the row batches via a pool if they were create once and forget. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are not created once. A new OrcBaseWriter is created whenever we flush a file in FI which is happening every 5 mins. Also a new ORC writer will be created if we are writing 2+ partitions at once. See PartitionedDataWriter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the frequency of recycles, what's your opinion on https://www.baeldung.com/java-singleton-double-checked-locking for the singleton. Reduces the need for as many lock accesses

this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize);
this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);

log.info("Created ORC writer, batch size: {}, {}: {}",
batchSize, OrcConf.ROWS_BETWEEN_CHECKS.name(), properties.getProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason for changing this because attribute is more general and name is specific to hive? I noticed that for this specific enum value they are the same string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() is the right key to use not the other one

properties.getProp(
OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));

// Create file-writer
Expand Down Expand Up @@ -235,6 +243,9 @@ private synchronized void closeInternal()
this.flush();
this.orcFileWriter.close();
this.closed = true;
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
Expand Down Expand Up @@ -269,6 +280,7 @@ public void commit()
@Override
public void write(D record)
throws IOException {
Preconditions.checkState(!closed, "Writer already closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When did you see this edge case happen? And does this cause the fork to immediately terminate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there's so much usage and abstract classes which this extends I wanted to be sure that this condition is preserved

valueWriter.write(record, rowBatch);
if (rowBatch.size == this.batchSize) {
orcFileWriter.addRowBatch(rowBatch);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/***
* Maintains a pool of row batches per orc schema.
* Expires row batches which have not been accessed for {@code ROW_BATCH_EXPIRY_INTERVAL}
*/
@Slf4j
public class RowBatchPool {
static final String PREFIX = "orc.row.batch.";
static final String ENABLE_ROW_BATCH_POOL = PREFIX + "enable";

static final String ROW_BATCH_EXPIRY_INTERVAL = PREFIX + "expiry.interval.secs";
static final int DEFAULT_ROW_BATCH_EXPIRY_INTERVAL = 10;

static final String ROW_BATCH_EXPIRY_PERIOD = PREFIX + "expiry.period.secs";
static final int DEFAULT_ROW_BATCH_EXPIRY_PERIOD = 1;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really minor nits about style. Typically we follow a pattern of having a shared prefix variable for orc.row.batch.expiry. or even just orc.row.batch as the prefix. And then we start the default values with the name DEFAULT_

See

public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
for an example.

Also, maybe add the word pool to these settings since these are specific to the batch pool and not for regular row batch?

private static RowBatchPool INSTANCE;

private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
private final ScheduledExecutorService rowBatchExpiryThread;
private final long rowBatchExpiryInterval;

private RowBatchPool(State properties) {
rowBatches = Maps.newHashMap();
rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).build());
// expire row batches older N secs
rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, DEFAULT_ROW_BATCH_EXPIRY_INTERVAL);
// check every N secs
long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, DEFAULT_ROW_BATCH_EXPIRY_PERIOD);
rowBatchExpiryThread.scheduleAtFixedRate(
rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
}

private Runnable rowBatchExpiryFn() {
return () -> {
synchronized (rowBatches) {
for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
LinkedList<RowBatchHolder> val = e.getValue();
val.removeIf(this::candidateForRemoval);
}
}
};
}

private boolean candidateForRemoval(RowBatchHolder batch) {
long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
long interval = System.currentTimeMillis() - batch.lastUsed;
if (interval > expiryInterval) {
log.info("Expiring row batch {} as it has not been accessed since {} ms",
System.identityHashCode(batch.rowBatch), interval);
return true;
} else {
return false;
}
}

private static class RowBatchHolder {
long lastUsed;
VectorizedRowBatch rowBatch;

private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
this.rowBatch = rowBatch;
this.lastUsed = currentTimeMillis;
}
}

public synchronized static RowBatchPool instance(State properties) {
if (INSTANCE == null) {
INSTANCE = new RowBatchPool(properties);
}
return INSTANCE;
}

public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
synchronized (rowBatches) {
LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
VectorizedRowBatch rowBatch;

if (vals == null || vals.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question but do we have a preference between apache commons CollectionUtils.isEmpty and a basic null check like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not keen on adding more dependencies here if this is a small change

rowBatch = schema.createRowBatch(batchSize);
log.info("Creating new row batch {}", System.identityHashCode(rowBatch));
} else {
rowBatch = vals.removeLast().rowBatch;
log.info("Using existing row batch {}", System.identityHashCode(rowBatch));
}
return rowBatch;
}
}

public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) {
log.info("Recycling row batch {}", System.identityHashCode(rowBatch));
synchronized (rowBatches) {
rowBatches.computeIfAbsent(schema, ignore -> Lists.newLinkedList());
LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
vals.add(new RowBatchHolder(rowBatch, System.currentTimeMillis()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.testng.Assert;
import org.testng.annotations.Test;

public class RowBatchPoolTest {
@Test
public void testExpiry() throws Exception {
State state = WorkUnit.createEmpty();
RowBatchPool instance = RowBatchPool.instance(state);
TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
instance.recycle(schema, rowBatch1);
VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
// existing rowbatch is fetched from pool
Assert.assertEquals(rowBatch1, rowBatch2);

// since the pool has no existing rowbatch, a new one is created
VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
Assert.assertNotEquals(rowBatch1, rowBatch3);

// recyle fetched rowbatches
instance.recycle(schema, rowBatch2);
instance.recycle(schema, rowBatch3);

// wait for their expiry
Thread.sleep(RowBatchPool.DEFAULT_ROW_BATCH_EXPIRY_INTERVAL * 1000L);
VectorizedRowBatch rowBatch4 = instance.getRowBatch(schema, 1024);
// new rowbatch is created, all old ones are expired
Assert.assertNotEquals(rowBatch1, rowBatch4);
}
}