Skip to content

Commit

Permalink
Merge pull request #17 from yzp0n/scatter-local-executor2
Browse files Browse the repository at this point in the history
Support ScatteredExecutor
  • Loading branch information
yuokada authored Feb 25, 2019
2 parents 728c10d + 85a8276 commit 82c7f44
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ dependencies {
compile "org.embulk:embulk-core:0.8.34"
provided "org.embulk:embulk-core:0.8.34"

compile "org.apache.orc:orc:1.4.4"
compile "org.apache.orc:orc-core:1.4.4"
compile "org.apache.orc:orc:1.5.4"
compile "org.apache.orc:orc-core:1.5.4"
compile "org.apache.hadoop:hadoop-hdfs:2.7.5"

compile 'org.embulk.input.s3:embulk-util-aws-credentials:0.2.8'
Expand Down
80 changes: 59 additions & 21 deletions src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.util.VersionInfo;
import org.apache.orc.CompressionKind;
import org.apache.orc.MemoryManager;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
Expand Down Expand Up @@ -163,6 +164,7 @@ private Writer createWriter(PluginTask task, Schema schema, int processorIndex)
writer = OrcFile.createWriter(
new Path(buildPath(task, processorIndex)),
writerOptions.setSchema(oschema)
.memory(new WriterLocalMemoryManager())
.version(OrcFile.Version.V_0_12)
);
}
Expand Down Expand Up @@ -201,33 +203,31 @@ public OrcTransactionalPageOutput(PageReader reader, Writer writer, PluginTask t
@Override
public void add(Page page)
{
synchronized (this) {
try {
// int size = page.getStringReferences().size();
final TypeDescription schema = getSchema(reader.getSchema());
final VectorizedRowBatch batch = schema.createRowBatch();
// batch.size = size;

reader.setPage(page);
while (reader.nextRecord()) {
final int row = batch.size++;
reader.getSchema().visitColumns(
new OrcColumnVisitor(reader, batch, row)
);
if (batch.size >= batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
try {
// int size = page.getStringReferences().size();
final TypeDescription schema = getSchema(reader.getSchema());
final VectorizedRowBatch batch = schema.createRowBatch();
// batch.size = size;

reader.setPage(page);
while (reader.nextRecord()) {
final int row = batch.size++;
reader.getSchema().visitColumns(
new OrcColumnVisitor(reader, batch, row)
);
if (batch.size >= batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
catch (IOException e) {
e.printStackTrace();
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
}
catch (IOException e) {
e.printStackTrace();
}
}

@Override
Expand Down Expand Up @@ -257,4 +257,42 @@ public TaskReport commit()
return Exec.newTaskReport();
}
}

// We avoid using orc.MemoryManagerImpl since it is not threadsafe, but embulk is multi-threaded.
// Embulk creates and uses multiple instances of TransactionalPageOutput in worker threads.
// As a workaround, WriterLocalMemoryManager is bound to a single orc.Writer instance, and
// notifies checkMemory() only to that instance.
private static class WriterLocalMemoryManager implements MemoryManager
{
final long rowsBetweenChecks = 10000;

private int rowsAddedSinceCheck = 0;
Callback boundCallback = null;

@Override
public void addWriter(Path path, long requestedAllocation, Callback callback) throws IOException
{
if (boundCallback != null) {
throw new IllegalStateException("WriterLocalMemoryManager should be bound to a single orc.Writer instance.");
}

boundCallback = callback;
}

@Override
public void removeWriter(Path path) throws IOException
{
boundCallback = null;
}

@Override
public void addedRow(int rows) throws IOException
{
rowsAddedSinceCheck += rows;
if (rowsAddedSinceCheck > rowsBetweenChecks) {
boundCallback.checkMemory(1);
rowsAddedSinceCheck = 0;
}
}
}
}

0 comments on commit 82c7f44

Please sign in to comment.