Skip to content

Commit

Permalink
Hoist computation of ConnectorSession
Browse files Browse the repository at this point in the history
It was being performed once per page in some cases, which matters
for scenarios where there are lots of small pages or when ramping
up page size in WorkProcessor.
  • Loading branch information
martint committed Oct 15, 2021
1 parent 292a7ae commit e2fafde
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;

Expand Down Expand Up @@ -52,10 +53,11 @@ private FilterAndProjectOperator(
{
AggregatedMemoryContext localAggregatedMemoryContext = newSimpleAggregatedMemoryContext();
LocalMemoryContext outputMemoryContext = localAggregatedMemoryContext.newLocalMemoryContext(FilterAndProjectOperator.class.getSimpleName());
ConnectorSession connectorSession = session.toConnectorSession();

this.pages = sourcePages
.flatMap(page -> pageProcessor.createWorkProcessor(
session.toConnectorSession(),
connectorSession,
yieldSignal,
outputMemoryContext,
page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.RecordCursor;
Expand Down Expand Up @@ -287,11 +288,12 @@ WorkProcessor<Page> processColumnSource()

WorkProcessor<Page> processPageSource()
{
ConnectorSession connectorSession = session.toConnectorSession();
return WorkProcessor
.create(new ConnectorPageSourceToPages(pageSourceMemoryContext))
.yielding(yieldSignal::isSet)
.flatMap(page -> pageProcessor.createWorkProcessor(
session.toConnectorSession(),
connectorSession,
yieldSignal,
outputMemoryContext,
page,
Expand All @@ -304,7 +306,7 @@ WorkProcessor<Page> processPageSource()
private class RecordCursorToPages
implements WorkProcessor.Process<Page>
{
final Session session;
final ConnectorSession session;
final DriverYieldSignal yieldSignal;
final CursorProcessor cursorProcessor;
final PageBuilder pageBuilder;
Expand All @@ -321,7 +323,7 @@ private class RecordCursorToPages
LocalMemoryContext pageSourceMemoryContext,
LocalMemoryContext outputMemoryContext)
{
this.session = session;
this.session = session.toConnectorSession();
this.yieldSignal = yieldSignal;
this.cursorProcessor = cursorProcessor;
this.pageBuilder = new PageBuilder(types);
Expand All @@ -333,7 +335,7 @@ private class RecordCursorToPages
public ProcessState<Page> process()
{
if (!finished) {
CursorProcessorOutput output = cursorProcessor.process(session.toConnectorSession(), yieldSignal, cursor, pageBuilder);
CursorProcessorOutput output = cursorProcessor.process(session, yieldSignal, cursor, pageBuilder);
pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage());

processedPositions += output.getProcessedRows();
Expand Down

0 comments on commit e2fafde

Please sign in to comment.