diff --git a/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java index c419f6e56d8e..339cc27e958b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java @@ -22,6 +22,7 @@ import io.trino.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory; import io.trino.operator.project.PageProcessor; import io.trino.spi.Page; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; import io.trino.sql.planner.plan.PlanNodeId; @@ -52,10 +53,11 @@ private FilterAndProjectOperator( { AggregatedMemoryContext localAggregatedMemoryContext = newSimpleAggregatedMemoryContext(); LocalMemoryContext outputMemoryContext = localAggregatedMemoryContext.newLocalMemoryContext(FilterAndProjectOperator.class.getSimpleName()); + ConnectorSession connectorSession = session.toConnectorSession(); this.pages = sourcePages .flatMap(page -> pageProcessor.createWorkProcessor( - session.toConnectorSession(), + connectorSession, yieldSignal, outputMemoryContext, page, diff --git a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java index e03f5f16a8f7..1613011d11b5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java @@ -34,6 +34,7 @@ import io.trino.spi.PageBuilder; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.connector.RecordCursor; @@ -287,11 +288,12 @@ WorkProcessor processColumnSource() WorkProcessor processPageSource() { + ConnectorSession connectorSession = session.toConnectorSession(); return WorkProcessor .create(new ConnectorPageSourceToPages(pageSourceMemoryContext)) .yielding(yieldSignal::isSet) .flatMap(page -> pageProcessor.createWorkProcessor( - session.toConnectorSession(), + connectorSession, yieldSignal, outputMemoryContext, page, @@ -304,7 +306,7 @@ WorkProcessor processPageSource() private class RecordCursorToPages implements WorkProcessor.Process { - final Session session; + final ConnectorSession session; final DriverYieldSignal yieldSignal; final CursorProcessor cursorProcessor; final PageBuilder pageBuilder; @@ -321,7 +323,7 @@ private class RecordCursorToPages LocalMemoryContext pageSourceMemoryContext, LocalMemoryContext outputMemoryContext) { - this.session = session; + this.session = session.toConnectorSession(); this.yieldSignal = yieldSignal; this.cursorProcessor = cursorProcessor; this.pageBuilder = new PageBuilder(types); @@ -333,7 +335,7 @@ private class RecordCursorToPages public ProcessState process() { if (!finished) { - CursorProcessorOutput output = cursorProcessor.process(session.toConnectorSession(), yieldSignal, cursor, pageBuilder); + CursorProcessorOutput output = cursorProcessor.process(session, yieldSignal, cursor, pageBuilder); pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage()); processedPositions += output.getProcessedRows();