Skip to content

Commit

Permalink
Rename TaskSource to SplitAssignment
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and martint committed Jan 21, 2022
1 parent 9391711 commit a4264ea
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class TaskSource
public class SplitAssignment
{
private final PlanNodeId planNodeId;
private final Set<ScheduledSplit> splits;
private final Set<Lifespan> noMoreSplitsForLifespan;
private final boolean noMoreSplits;

@JsonCreator
public TaskSource(
public SplitAssignment(
@JsonProperty("planNodeId") PlanNodeId planNodeId,
@JsonProperty("splits") Set<ScheduledSplit> splits,
@JsonProperty("noMoreSplitsForLifespan") Set<Lifespan> noMoreSplitsForLifespan,
Expand All @@ -44,7 +44,7 @@ public TaskSource(
this.noMoreSplits = noMoreSplits;
}

public TaskSource(PlanNodeId planNodeId, Set<ScheduledSplit> splits, boolean noMoreSplits)
public SplitAssignment(PlanNodeId planNodeId, Set<ScheduledSplit> splits, boolean noMoreSplits)
{
this(planNodeId, splits, ImmutableSet.of(), noMoreSplits);
}
Expand Down Expand Up @@ -73,43 +73,43 @@ public boolean isNoMoreSplits()
return noMoreSplits;
}

public TaskSource update(TaskSource source)
public SplitAssignment update(SplitAssignment assignment)
{
checkArgument(planNodeId.equals(source.getPlanNodeId()), "Expected source %s, but got source %s", planNodeId, source.getPlanNodeId());
checkArgument(planNodeId.equals(assignment.getPlanNodeId()), "Expected assignment for node %s, but got assignment for node %s", planNodeId, assignment.getPlanNodeId());

if (isNewer(source)) {
// assure the new source is properly formed
// we know that either the new source one has new splits and/or it is marking the source as closed
checkArgument(!noMoreSplits || splits.containsAll(source.getSplits()), "Source %s has new splits, but no more splits already set", planNodeId);
if (isNewer(assignment)) {
// assure the new assignment is properly formed
// we know that either the new assignment one has new splits and/or it is marking the assignment as closed
checkArgument(!noMoreSplits || splits.containsAll(assignment.getSplits()), "Assignment %s has new splits, but no more splits already set", planNodeId);

Set<ScheduledSplit> newSplits = ImmutableSet.<ScheduledSplit>builder()
.addAll(splits)
.addAll(source.getSplits())
.addAll(assignment.getSplits())
.build();
Set<Lifespan> newNoMoreSplitsForDriverGroup = ImmutableSet.<Lifespan>builder()
.addAll(noMoreSplitsForLifespan)
.addAll(source.getNoMoreSplitsForLifespan())
.addAll(assignment.getNoMoreSplitsForLifespan())
.build();

return new TaskSource(
return new SplitAssignment(
planNodeId,
newSplits,
newNoMoreSplitsForDriverGroup,
source.isNoMoreSplits());
assignment.isNoMoreSplits());
}
else {
// the specified source is older than this one
// the specified assignment is older than this one
return this;
}
}

private boolean isNewer(TaskSource source)
private boolean isNewer(SplitAssignment assignment)
{
// the specified source is newer if it changes the no more
// the specified assignment is newer if it changes the no more
// splits flag or if it contains new splits
return (!noMoreSplits && source.isNoMoreSplits()) ||
(!noMoreSplitsForLifespan.containsAll(source.getNoMoreSplitsForLifespan())) ||
(!splits.containsAll(source.getSplits()));
return (!noMoreSplits && assignment.isNoMoreSplits()) ||
(!noMoreSplitsForLifespan.containsAll(assignment.getNoMoreSplitsForLifespan())) ||
(!splits.containsAll(assignment.getSplits()));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ public synchronized ListenableFuture<TaskInfo> getTaskInfo(long callersCurrentVe
public TaskInfo updateTask(
Session session,
Optional<PlanFragment> fragment,
List<TaskSource> sources,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
{
Expand Down Expand Up @@ -462,7 +462,7 @@ public TaskInfo updateTask(
}

if (taskExecution != null) {
taskExecution.addSources(sources);
taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class SqlTaskExecution

// guarded for update only
@GuardedBy("this")
private final ConcurrentMap<PlanNodeId, TaskSource> unpartitionedSources = new ConcurrentHashMap<>();
private final ConcurrentMap<PlanNodeId, SplitAssignment> unpartitionedSplitAssignments = new ConcurrentHashMap<>();

@GuardedBy("this")
private long maxAcknowledgedSplit = Long.MIN_VALUE;
Expand Down Expand Up @@ -283,17 +283,17 @@ public TaskContext getTaskContext()
return taskContext;
}

public void addSources(List<TaskSource> sources)
public void addSplitAssignments(List<SplitAssignment> splitAssignments)
{
requireNonNull(sources, "sources is null");
checkState(!Thread.holdsLock(this), "Cannot add sources while holding a lock on the %s", getClass().getSimpleName());
requireNonNull(splitAssignments, "splitAssignments is null");
checkState(!Thread.holdsLock(this), "Cannot add split assignments while holding a lock on the %s", getClass().getSimpleName());

try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
// update our record of sources and schedule drivers for new partitioned splits
Map<PlanNodeId, TaskSource> updatedUnpartitionedSources = updateSources(sources);
// update our record of split assignments and schedule drivers for new partitioned splits
Map<PlanNodeId, SplitAssignment> updatedUnpartitionedSources = updateSplitAssignments(splitAssignments);

// tell existing drivers about the new splits; it is safe to update drivers
// multiple times and out of order because sources contain full record of
// multiple times and out of order because split assignments contain full record of
// the unpartitioned splits
for (WeakReference<Driver> driverReference : drivers) {
Driver driver = driverReference.get();
Expand All @@ -308,26 +308,26 @@ public void addSources(List<TaskSource> sources)
if (sourceId.isEmpty()) {
continue;
}
TaskSource sourceUpdate = updatedUnpartitionedSources.get(sourceId.get());
if (sourceUpdate == null) {
SplitAssignment splitAssignmentUpdate = updatedUnpartitionedSources.get(sourceId.get());
if (splitAssignmentUpdate == null) {
continue;
}
driver.updateSource(sourceUpdate);
driver.updateSplitAssignment(splitAssignmentUpdate);
}

// we may have transitioned to no more splits, so check for completion
checkTaskCompletion();
}
}

private synchronized Map<PlanNodeId, TaskSource> updateSources(List<TaskSource> sources)
private synchronized Map<PlanNodeId, SplitAssignment> updateSplitAssignments(List<SplitAssignment> splitAssignments)
{
Map<PlanNodeId, TaskSource> updatedUnpartitionedSources = new HashMap<>();
Map<PlanNodeId, SplitAssignment> updatedUnpartitionedSplitAssignments = new HashMap<>();

// first remove any split that was already acknowledged
long currentMaxAcknowledgedSplit = this.maxAcknowledgedSplit;
sources = sources.stream()
.map(source -> new TaskSource(
splitAssignments = splitAssignments.stream()
.map(source -> new SplitAssignment(
source.getPlanNodeId(),
source.getSplits().stream()
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit)
Expand All @@ -338,13 +338,13 @@ private synchronized Map<PlanNodeId, TaskSource> updateSources(List<TaskSource>
source.isNoMoreSplits()))
.collect(toList());

// update task with new sources
for (TaskSource source : sources) {
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(source.getPlanNodeId())) {
schedulePartitionedSource(source);
// update task with new assignments
for (SplitAssignment assignment : splitAssignments) {
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(assignment.getPlanNodeId())) {
schedulePartitionedSource(assignment);
}
else {
scheduleUnpartitionedSource(source, updatedUnpartitionedSources);
scheduleUnpartitionedSource(assignment, updatedUnpartitionedSplitAssignments);
}
}

Expand All @@ -354,12 +354,12 @@ private synchronized Map<PlanNodeId, TaskSource> updateSources(List<TaskSource>
}

// update maxAcknowledgedSplit
maxAcknowledgedSplit = sources.stream()
maxAcknowledgedSplit = splitAssignments.stream()
.flatMap(source -> source.getSplits().stream())
.mapToLong(ScheduledSplit::getSequenceId)
.max()
.orElse(maxAcknowledgedSplit);
return updatedUnpartitionedSources;
return updatedUnpartitionedSplitAssignments;
}

@GuardedBy("this")
Expand Down Expand Up @@ -387,9 +387,9 @@ private void mergeIntoPendingSplits(PlanNodeId planNodeId, Set<ScheduledSplit> s
}
}

private synchronized void schedulePartitionedSource(TaskSource sourceUpdate)
private synchronized void schedulePartitionedSource(SplitAssignment splitAssignmentUpdate)
{
mergeIntoPendingSplits(sourceUpdate.getPlanNodeId(), sourceUpdate.getSplits(), sourceUpdate.getNoMoreSplitsForLifespan(), sourceUpdate.isNoMoreSplits());
mergeIntoPendingSplits(splitAssignmentUpdate.getPlanNodeId(), splitAssignmentUpdate.getSplits(), splitAssignmentUpdate.getNoMoreSplitsForLifespan(), splitAssignmentUpdate.isNoMoreSplits());

while (true) {
// SchedulingLifespanManager tracks how far each Lifespan has been scheduled. Here is an example.
Expand All @@ -409,10 +409,10 @@ private synchronized void schedulePartitionedSource(TaskSource sourceUpdate)
SchedulingLifespan schedulingLifespan = activeLifespans.next();
Lifespan lifespan = schedulingLifespan.getLifespan();

// Continue using the example from above. Let's say the sourceUpdate adds some new splits for source node B.
// Continue using the example from above. Let's say the splitAssignmentUpdate adds some new splits for source node B.
//
// For lifespan 30, it could start new drivers and assign a pending split to each.
// Pending splits could include both pre-existing pending splits, and the new ones from sourceUpdate.
// Pending splits could include both pre-existing pending splits, and the new ones from splitAssignmentUpdate.
// If there is enough driver slots to deplete pending splits, one of the below would happen.
// * If it is marked that all splits for node B in lifespan 30 has been received, SchedulingLifespanManager
// will be updated so that lifespan 30 now processes source node C. It will immediately start processing them.
Expand Down Expand Up @@ -477,27 +477,27 @@ private synchronized void schedulePartitionedSource(TaskSource sourceUpdate)
}
}

if (sourceUpdate.isNoMoreSplits()) {
schedulingLifespanManager.noMoreSplits(sourceUpdate.getPlanNodeId());
if (splitAssignmentUpdate.isNoMoreSplits()) {
schedulingLifespanManager.noMoreSplits(splitAssignmentUpdate.getPlanNodeId());
}
}

private synchronized void scheduleUnpartitionedSource(TaskSource sourceUpdate, Map<PlanNodeId, TaskSource> updatedUnpartitionedSources)
private synchronized void scheduleUnpartitionedSource(SplitAssignment splitAssignmentUpdate, Map<PlanNodeId, SplitAssignment> updatedUnpartitionedSources)
{
// create new source
TaskSource newSource;
TaskSource currentSource = unpartitionedSources.get(sourceUpdate.getPlanNodeId());
if (currentSource == null) {
newSource = sourceUpdate;
SplitAssignment newSplitAssignment;
SplitAssignment currentSplitAssignment = unpartitionedSplitAssignments.get(splitAssignmentUpdate.getPlanNodeId());
if (currentSplitAssignment == null) {
newSplitAssignment = splitAssignmentUpdate;
}
else {
newSource = currentSource.update(sourceUpdate);
newSplitAssignment = currentSplitAssignment.update(splitAssignmentUpdate);
}

// only record new source if something changed
if (newSource != currentSource) {
unpartitionedSources.put(sourceUpdate.getPlanNodeId(), newSource);
updatedUnpartitionedSources.put(sourceUpdate.getPlanNodeId(), newSource);
if (newSplitAssignment != currentSplitAssignment) {
unpartitionedSplitAssignments.put(splitAssignmentUpdate.getPlanNodeId(), newSplitAssignment);
updatedUnpartitionedSources.put(splitAssignmentUpdate.getPlanNodeId(), newSplitAssignment);
}
}

Expand Down Expand Up @@ -611,9 +611,9 @@ public synchronized Set<PlanNodeId> getNoMoreSplits()
noMoreSplits.add(entry.getKey());
}
}
for (TaskSource taskSource : unpartitionedSources.values()) {
if (taskSource.isNoMoreSplits()) {
noMoreSplits.add(taskSource.getPlanNodeId());
for (SplitAssignment splitAssignment : unpartitionedSplitAssignments.values()) {
if (splitAssignment.isNoMoreSplits()) {
noMoreSplits.add(splitAssignment.getPlanNodeId());
}
}
return noMoreSplits.build();
Expand Down Expand Up @@ -655,7 +655,7 @@ public String toString()
return toStringHelper(this)
.add("taskId", taskId)
.add("remainingDrivers", status.getRemainingDriver())
.add("unpartitionedSources", unpartitionedSources)
.add("unpartitionedSplitAssignments", unpartitionedSplitAssignments)
.toString();
}

Expand Down Expand Up @@ -947,15 +947,15 @@ public Driver createDriver(DriverContext driverContext, @Nullable ScheduledSplit

if (partitionedSplit != null) {
// TableScanOperator requires partitioned split to be added before the first call to process
driver.updateSource(new TaskSource(partitionedSplit.getPlanNodeId(), ImmutableSet.of(partitionedSplit), true));
driver.updateSplitAssignment(new SplitAssignment(partitionedSplit.getPlanNodeId(), ImmutableSet.of(partitionedSplit), true));
}

// add unpartitioned sources
Optional<PlanNodeId> sourceId = driver.getSourceId();
if (sourceId.isPresent()) {
TaskSource taskSource = unpartitionedSources.get(sourceId.get());
if (taskSource != null) {
driver.updateSource(taskSource);
SplitAssignment splitAssignment = unpartitionedSplitAssignments.get(sourceId.get());
if (splitAssignment != null) {
driver.updateSplitAssignment(splitAssignment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ public TaskInfo updateTask(
Session session,
TaskId taskId,
Optional<PlanFragment> fragment,
List<TaskSource> sources,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
{
try {
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, fragment, sources, outputBuffers, dynamicFilterDomains)).call();
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, fragment, splitAssignments, outputBuffers, dynamicFilterDomains)).call();
}
catch (Exception e) {
throwIfUnchecked(e);
Expand All @@ -399,14 +399,14 @@ private TaskInfo doUpdateTask(
Session session,
TaskId taskId,
Optional<PlanFragment> fragment,
List<TaskSource> sources,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(fragment, "fragment is null");
requireNonNull(sources, "sources is null");
requireNonNull(splitAssignments, "splitAssignments is null");
requireNonNull(outputBuffers, "outputBuffers is null");

SqlTask sqlTask = tasks.getUnchecked(taskId);
Expand All @@ -430,7 +430,7 @@ private TaskInfo doUpdateTask(
}

sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers, dynamicFilterDomains);
return sqlTask.updateTask(session, fragment, splitAssignments, outputBuffers, dynamicFilterDomains);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ public interface TaskManager
void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments);

/**
* Updates the task plan, sources and output buffers. If the task does not
* Updates the task plan, splitAssignments and output buffers. If the task does not
* already exist, it is created and then updated.
*/
TaskInfo updateTask(
Session session,
TaskId taskId,
Optional<PlanFragment> fragment,
List<TaskSource> sources,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains);

Expand Down
Loading

0 comments on commit a4264ea

Please sign in to comment.