Skip to content

Commit

Permalink
Allow concurrent writes into a bucketed table
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Sep 5, 2019
1 parent 59a230a commit c594611
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class SystemSessionProperties
public static final String MAX_CONCURRENT_MATERIALIZATIONS = "max_concurrent_materializations";
public static final String PUSHDOWN_SUBFIELDS_ENABLED = "pushdown_subfields_enabled";
public static final String TABLE_WRITER_MERGE_OPERATOR_ENABLED = "table_writer_merge_operator_enabled";
public static final String CONCURRENT_WRITES_TO_PARTITIONED_TABLE_ENABLED = "concurrent_writes_to_partitioned_table_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -632,6 +633,11 @@ public SystemSessionProperties(
TABLE_WRITER_MERGE_OPERATOR_ENABLED,
"Experimental: enable table writer merge operator",
featuresConfig.isTableWriterMergeOperatorEnabled(),
false),
booleanProperty(
CONCURRENT_WRITES_TO_PARTITIONED_TABLE_ENABLED,
"Experimental: enable concurrent writes to partitioned table",
featuresConfig.isConcurrentWritesToPartitionedTableEnabled(),
false));
}

Expand Down Expand Up @@ -1074,4 +1080,9 @@ public static boolean isTableWriterMergeOperatorEnabled(Session session)
{
return session.getSystemProperty(TABLE_WRITER_MERGE_OPERATOR_ENABLED, Boolean.class);
}

public static boolean isConcurrentWritesToPartitionedTableEnabled(Session session)
{
return session.getSystemProperty(CONCURRENT_WRITES_TO_PARTITIONED_TABLE_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class FeaturesConfig
private boolean pushdownSubfieldsEnabled;

private boolean tableWriterMergeOperatorEnabled = true;
private boolean concurrentWritesToPartitionedTableEnabled = true;

public enum JoinReorderingStrategy
{
Expand Down Expand Up @@ -1058,4 +1059,16 @@ public FeaturesConfig setTableWriterMergeOperatorEnabled(boolean tableWriterMerg
this.tableWriterMergeOperatorEnabled = tableWriterMergeOperatorEnabled;
return this;
}

public boolean isConcurrentWritesToPartitionedTableEnabled()
{
return concurrentWritesToPartitionedTableEnabled;
}

@Config("experimental.concurrent-writes-to-partitioned-table-enabled")
public FeaturesConfig setConcurrentWritesToPartitionedTableEnabled(boolean concurrentWritesToPartitionedTableEnabled)
{
this.concurrentWritesToPartitionedTableEnabled = concurrentWritesToPartitionedTableEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isConcurrentWritesToPartitionedTableEnabled;
import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.operator.DistinctLimitOperator.DistinctLimitOperatorFactory;
Expand Down Expand Up @@ -2144,7 +2145,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
if (node.getPartitioningScheme().isPresent()) {
if (node.getPartitioningScheme().isPresent() && !isConcurrentWritesToPartitionedTableEnabled(session)) {
context.setDriverInstanceCount(1);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@

import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isConcurrentWritesToPartitionedTableEnabled;
import static com.facebook.presto.SystemSessionProperties.isDynamicScheduleForGroupedExecution;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForEligibleTableScansEnabled;
Expand All @@ -112,6 +113,7 @@
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
import static com.facebook.presto.sql.planner.planPrinter.PlanPrinter.jsonFragmentPlan;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -701,36 +703,47 @@ private TableFinishNode createTemporaryTableWrite(
SchemaTableName temporaryTableName = metadata.getTableMetadata(session, tableHandle).getTable();
InsertHandle insertHandle = new InsertHandle(insertTableHandle, new SchemaTableName(temporaryTableName.getSchemaName(), temporaryTableName.getTableName()));

PartitioningScheme partitioningScheme = new PartitioningScheme(
Partitioning.create(partitioningHandle, partitioningVariables),
outputs,
Optional.empty(),
false,
Optional.empty());

ExchangeNode writerRemoteSource = new ExchangeNode(
idAllocator.getNextId(),
REPARTITION,
REMOTE_STREAMING,
partitioningScheme,
sources,
inputs,
Optional.empty());

ExchangeNode writerSource;
if (getTaskWriterCount(session) == 1 || !isConcurrentWritesToPartitionedTableEnabled(session)) {
writerSource = gatheringExchange(
idAllocator.getNextId(),
LOCAL,
writerRemoteSource);
}
else {
writerSource = partitionedExchange(
idAllocator.getNextId(),
LOCAL,
writerRemoteSource,
partitioningScheme);
}

TableWriterNode tableWriter = new TableWriterNode(
idAllocator.getNextId(),
gatheringExchange(
idAllocator.getNextId(),
LOCAL,
new ExchangeNode(
idAllocator.getNextId(),
REPARTITION,
REMOTE_STREAMING,
new PartitioningScheme(
Partitioning.create(partitioningHandle, partitioningVariables),
outputs,
Optional.empty(),
false,
Optional.empty()),
sources,
inputs,
Optional.empty())),
writerSource,
insertHandle,
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
Optional.of(new PartitioningScheme(
Partitioning.create(partitioningHandle, partitioningVariables),
outputs,
Optional.empty(),
false,
Optional.empty())),
Optional.of(partitioningScheme),
Optional.empty());

PlanNode tableWriterMerge = tableWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isConcurrentWritesToPartitionedTableEnabled;
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
Expand All @@ -87,6 +88,7 @@
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.mergingExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.roundRobinExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.systemPartitionedExchange;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -482,7 +484,8 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, StreamPrefe
@Override
public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNode, StreamPreferredProperties parentPreferences)
{
if (getTaskWriterCount(session) == 1) {
if (getTaskWriterCount(session) == 1
|| (originalTableWriterNode.getPartitioningScheme().isPresent() && !isConcurrentWritesToPartitionedTableEnabled(session))) {
return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session));
}

Expand All @@ -496,20 +499,47 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
variableAllocator,
metadata.getFunctionManager()));

PlanWithProperties tableWriter = planAndEnforceChildren(
new TableWriterNode(
originalTableWriterNode.getId(),
originalTableWriterNode.getSource(),
originalTableWriterNode.getTarget(),
variableAllocator.newVariable("partialrowcount", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialcontext", VARBINARY),
originalTableWriterNode.getColumns(),
originalTableWriterNode.getColumnNames(),
originalTableWriterNode.getPartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation)),
fixedParallelism(),
fixedParallelism());
PlanWithProperties tableWriter;

if (!originalTableWriterNode.getPartitioningScheme().isPresent()) {
tableWriter = planAndEnforceChildren(
new TableWriterNode(
originalTableWriterNode.getId(),
originalTableWriterNode.getSource(),
originalTableWriterNode.getTarget(),
variableAllocator.newVariable("partialrowcount", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialcontext", VARBINARY),
originalTableWriterNode.getColumns(),
originalTableWriterNode.getColumnNames(),
originalTableWriterNode.getPartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation)),
fixedParallelism(),
fixedParallelism());
}
else {
PlanWithProperties source = originalTableWriterNode.getSource().accept(this, fixedParallelism());
PlanWithProperties exchange = deriveProperties(
partitionedExchange(
idAllocator.getNextId(),
LOCAL,
source.getNode(),
originalTableWriterNode.getPartitioningScheme().get()),
source.getProperties());
tableWriter = deriveProperties(
new TableWriterNode(
originalTableWriterNode.getId(),
exchange.getNode(),
originalTableWriterNode.getTarget(),
variableAllocator.newVariable("partialrowcount", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialcontext", VARBINARY),
originalTableWriterNode.getColumns(),
originalTableWriterNode.getColumnNames(),
originalTableWriterNode.getPartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation)),
exchange.getProperties());
}

PlanWithProperties gatheringExchange = deriveProperties(
gatheringExchange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void testDefaults()
.setPushLimitThroughOuterJoin(true)
.setMaxConcurrentMaterializations(3)
.setPushdownSubfieldsEnabled(false)
.setTableWriterMergeOperatorEnabled(true));
.setTableWriterMergeOperatorEnabled(true)
.setConcurrentWritesToPartitionedTableEnabled(true));
}

@Test
Expand Down Expand Up @@ -196,6 +197,7 @@ public void testExplicitPropertyMappings()
.put("max-concurrent-materializations", "5")
.put("experimental.pushdown-subfields-enabled", "true")
.put("experimental.table-writer-merge-operator-enabled", "false")
.put("experimental.concurrent-writes-to-partitioned-table-enabled", "false")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -269,7 +271,8 @@ public void testExplicitPropertyMappings()
.setPushLimitThroughOuterJoin(false)
.setMaxConcurrentMaterializations(5)
.setPushdownSubfieldsEnabled(true)
.setTableWriterMergeOperatorEnabled(false);
.setTableWriterMergeOperatorEnabled(false)
.setConcurrentWritesToPartitionedTableEnabled(false);
assertFullMapping(properties, expected);
}

Expand Down

0 comments on commit c594611

Please sign in to comment.