diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs
index dc345cd8cdcd..b26a08dd0fad 100644
--- a/datafusion/physical-plan/src/metrics/baseline.rs
+++ b/datafusion/physical-plan/src/metrics/baseline.rs
@@ -56,7 +56,7 @@ pub struct BaselineMetrics {
 }
 
 impl BaselineMetrics {
-    /// Create a new BaselineMetric structure, and set  `start_time` to now
+    /// Create a new BaselineMetric structure, and set `start_time` to now
     pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
         let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
         start_time.record();
diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs
index e92a57493141..e0041194016c 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -26,6 +26,7 @@ use std::sync::Arc;
 
 use crate::common::spawn_buffered;
 use crate::expressions::PhysicalSortExpr;
+use crate::limit::LimitStream;
 use crate::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
@@ -51,6 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::LexOrdering;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;
 
 use futures::{StreamExt, TryStreamExt};
 use log::{debug, trace};
@@ -737,9 +739,22 @@ impl SortExec {
     /// This can reduce the memory pressure required by the sort
     /// operation since rows that are not going to be included
     /// can be dropped.
-    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
-        self.fetch = fetch;
-        self
+    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
+        let mut cache = self.cache.clone();
+        if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded {
+            // When a theoretically unnecessary sort becomes a top-K (which
+            // sometimes arises as an intermediate state before full removal),
+            // its execution mode should become `Bounded`.
+            cache.execution_mode = ExecutionMode::Bounded;
+        }
+        SortExec {
+            input: Arc::clone(&self.input),
+            expr: self.expr.clone(),
+            metrics_set: self.metrics_set.clone(),
+            preserve_partitioning: self.preserve_partitioning,
+            fetch,
+            cache,
+        }
     }
 
     /// Input schema
@@ -775,6 +790,16 @@ impl SortExec {
         sort_exprs: LexOrdering,
         preserve_partitioning: bool,
     ) -> PlanProperties {
+        // Determine execution mode:
+        let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
+            PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
+        );
+        let mode = match input.execution_mode() {
+            ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
+            ExecutionMode::Bounded => ExecutionMode::Bounded,
+            _ => ExecutionMode::PipelineBreaking,
+        };
+
         // Calculate equivalence properties; i.e. reset the ordering equivalence
         // class with the new ordering:
         let eq_properties = input
@@ -786,14 +811,6 @@ impl SortExec {
         let output_partitioning =
             Self::output_partitioning_helper(input, preserve_partitioning);
 
-        // Determine execution mode:
-        let mode = match input.execution_mode() {
-            ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
-                ExecutionMode::PipelineBreaking
-            }
-            ExecutionMode::Bounded => ExecutionMode::Bounded,
-        };
-
         PlanProperties::new(eq_properties, output_partitioning, mode)
     }
 }
@@ -874,53 +891,68 @@ impl ExecutionPlan for SortExec {
 
         trace!("End SortExec's input.execute for partition: {}", partition);
 
-        if let Some(fetch) = self.fetch.as_ref() {
-            let mut topk = TopK::try_new(
-                partition,
-                input.schema(),
-                self.expr.clone(),
-                *fetch,
-                context.session_config().batch_size(),
-                context.runtime_env(),
-                &self.metrics_set,
-                partition,
-            )?;
-
-            Ok(Box::pin(RecordBatchStreamAdapter::new(
-                self.schema(),
-                futures::stream::once(async move {
-                    while let Some(batch) = input.next().await {
-                        let batch = batch?;
-                        topk.insert_batch(batch)?;
-                    }
-                    topk.emit()
-                })
-                .try_flatten(),
-            )))
-        } else {
-            let mut sorter = ExternalSorter::new(
-                partition,
-                input.schema(),
-                self.expr.clone(),
-                context.session_config().batch_size(),
-                self.fetch,
-                execution_options.sort_spill_reservation_bytes,
-                execution_options.sort_in_place_threshold_bytes,
-                &self.metrics_set,
-                context.runtime_env(),
+        let sort_satisfied = self
+            .input
+            .equivalence_properties()
+            .ordering_satisfy_requirement(
+                PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
             );
 
-            Ok(Box::pin(RecordBatchStreamAdapter::new(
-                self.schema(),
-                futures::stream::once(async move {
-                    while let Some(batch) = input.next().await {
-                        let batch = batch?;
-                        sorter.insert_batch(batch).await?;
-                    }
-                    sorter.sort()
-                })
-                .try_flatten(),
-            )))
+        match (sort_satisfied, self.fetch.as_ref()) {
+            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
+                input,
+                0,
+                Some(*fetch),
+                BaselineMetrics::new(&self.metrics_set, partition),
+            ))),
+            (true, None) => Ok(input),
+            (false, Some(fetch)) => {
+                let mut topk = TopK::try_new(
+                    partition,
+                    input.schema(),
+                    self.expr.clone(),
+                    *fetch,
+                    context.session_config().batch_size(),
+                    context.runtime_env(),
+                    &self.metrics_set,
+                    partition,
+                )?;
+                Ok(Box::pin(RecordBatchStreamAdapter::new(
+                    self.schema(),
+                    futures::stream::once(async move {
+                        while let Some(batch) = input.next().await {
+                            let batch = batch?;
+                            topk.insert_batch(batch)?;
+                        }
+                        topk.emit()
+                    })
+                    .try_flatten(),
+                )))
+            }
+            (false, None) => {
+                let mut sorter = ExternalSorter::new(
+                    partition,
+                    input.schema(),
+                    self.expr.clone(),
+                    context.session_config().batch_size(),
+                    self.fetch,
+                    execution_options.sort_spill_reservation_bytes,
+                    execution_options.sort_in_place_threshold_bytes,
+                    &self.metrics_set,
+                    context.runtime_env(),
+                );
+                Ok(Box::pin(RecordBatchStreamAdapter::new(
+                    self.schema(),
+                    futures::stream::once(async move {
+                        while let Some(batch) = input.next().await {
+                            let batch = batch?;
+                            sorter.insert_batch(batch).await?;
+                        }
+                        sorter.sort()
+                    })
+                    .try_flatten(),
+                )))
+            }
         }
     }
 
@@ -933,14 +965,7 @@ impl ExecutionPlan for SortExec {
     }
 
     fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
-        Some(Arc::new(SortExec {
-            input: Arc::clone(&self.input),
-            expr: self.expr.clone(),
-            metrics_set: self.metrics_set.clone(),
-            preserve_partitioning: self.preserve_partitioning,
-            fetch: limit,
-            cache: self.cache.clone(),
-        }))
+        Some(Arc::new(SortExec::with_fetch(self, limit)))
     }
 
     fn fetch(&self) -> Option<usize> {
@@ -951,6 +976,8 @@ impl ExecutionPlan for SortExec {
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
+    use std::pin::Pin;
+    use std::task::{Context, Poll};
 
     use super::*;
     use crate::coalesce_partitions::CoalescePartitionsExec;
@@ -965,12 +992,124 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::*;
     use datafusion_common::cast::as_primitive_array;
+    use datafusion_common::{assert_batches_eq, Result, ScalarValue};
     use datafusion_execution::config::SessionConfig;
     use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+    use datafusion_execution::RecordBatchStream;
+    use datafusion_physical_expr::expressions::{Column, Literal};
+    use datafusion_physical_expr::EquivalenceProperties;
+
+    use futures::{FutureExt, Stream};
+
+    #[derive(Debug, Clone)]
+    pub struct SortedUnboundedExec {
+        schema: Schema,
+        batch_size: u64,
+        cache: PlanProperties,
+    }
+
+    impl DisplayAs for SortedUnboundedExec {
+        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
+            match t {
+                DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                    write!(f, "UnboundableExec",).unwrap()
+                }
+            }
+            Ok(())
+        }
+    }
+
+    impl SortedUnboundedExec {
+        fn compute_properties(schema: SchemaRef) -> PlanProperties {
+            let mut eq_properties = EquivalenceProperties::new(schema);
+            eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("c1", 0)),
+                SortOptions::default(),
+            )]]);
+            let mode = ExecutionMode::Unbounded;
+            PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode)
+        }
+    }
+
+    impl ExecutionPlan for SortedUnboundedExec {
+        fn name(&self) -> &'static str {
+            Self::static_name()
+        }
+
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn properties(&self) -> &PlanProperties {
+            &self.cache
+        }
+
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+
+        fn with_new_children(
+            self: Arc<Self>,
+            _: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            Ok(Box::pin(SortedUnboundedStream {
+                schema: Arc::new(self.schema.clone()),
+                batch_size: self.batch_size,
+                offset: 0,
+            }))
+        }
+    }
+
+    #[derive(Debug)]
+    pub struct SortedUnboundedStream {
+        schema: SchemaRef,
+        batch_size: u64,
+        offset: u64,
+    }
 
-    use datafusion_common::ScalarValue;
-    use datafusion_physical_expr::expressions::Literal;
-    use futures::FutureExt;
+    impl Stream for SortedUnboundedStream {
+        type Item = Result<RecordBatch>;
+
+        fn poll_next(
+            mut self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Option<Self::Item>> {
+            let batch = SortedUnboundedStream::create_record_batch(
+                Arc::clone(&self.schema),
+                self.offset,
+                self.batch_size,
+            );
+            self.offset += self.batch_size;
+            Poll::Ready(Some(Ok(batch)))
+        }
+    }
+
+    impl RecordBatchStream for SortedUnboundedStream {
+        fn schema(&self) -> SchemaRef {
+            Arc::clone(&self.schema)
+        }
+    }
+
+    impl SortedUnboundedStream {
+        fn create_record_batch(
+            schema: SchemaRef,
+            offset: u64,
+            batch_size: u64,
+        ) -> RecordBatch {
+            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
+            let array = UInt64Array::from(values);
+            let array_ref: ArrayRef = Arc::new(array);
+            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
+        }
+    }
 
     #[tokio::test]
     async fn test_in_mem_sort() -> Result<()> {
@@ -1414,4 +1553,42 @@ mod tests {
         let result = sort_batch(&batch, &expressions, None).unwrap();
         assert_eq!(result.num_rows(), 1);
     }
+
+    #[tokio::test]
+    async fn topk_unbounded_source() -> Result<()> {
+        let task_ctx = Arc::new(TaskContext::default());
+        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
+        let source = SortedUnboundedExec {
+            schema: schema.clone(),
+            batch_size: 2,
+            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
+        };
+        let mut plan = SortExec::new(
+            vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("c1", 0)),
+                SortOptions::default(),
+            )],
+            Arc::new(source),
+        );
+        plan = plan.with_fetch(Some(9));
+
+        let batches = collect(Arc::new(plan), task_ctx).await?;
+        #[rustfmt::skip]
+        let expected = [
+            "+----+",
+            "| c1 |",
+            "+----+",
+            "| 0  |",
+            "| 1  |",
+            "| 2  |",
+            "| 3  |",
+            "| 4  |",
+            "| 5  |",
+            "| 6  |",
+            "| 7  |",
+            "| 8  |",
+            "+----+",];
+        assert_batches_eq!(expected, &batches);
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c1bcd83a6fd2..001e134581c0 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -551,7 +551,7 @@ impl PartitionSearcher for LinearSearch {
         window_expr: &[Arc<dyn WindowExpr>],
     ) -> Result<Vec<(PartitionKey, RecordBatch)>> {
         let partition_bys =
-            self.evaluate_partition_by_column_values(record_batch, window_expr)?;
+            evaluate_partition_by_column_values(record_batch, window_expr)?;
         // NOTE: In Linear or PartiallySorted modes, we are sure that
         //       `partition_bys` are not empty.
         // Calculate indices for each partition and construct a new record
@@ -618,25 +618,6 @@ impl LinearSearch {
         }
     }
 
-    /// Calculates partition by expression results for each window expression
-    /// on `record_batch`.
-    fn evaluate_partition_by_column_values(
-        &self,
-        record_batch: &RecordBatch,
-        window_expr: &[Arc<dyn WindowExpr>],
-    ) -> Result<Vec<ArrayRef>> {
-        window_expr[0]
-            .partition_by()
-            .iter()
-            .map(|item| match item.evaluate(record_batch)? {
-                ColumnarValue::Array(array) => Ok(array),
-                ColumnarValue::Scalar(scalar) => {
-                    scalar.to_array_of_size(record_batch.num_rows())
-                }
-            })
-            .collect()
-    }
-
     /// Calculate indices of each partition (according to PARTITION BY expression)
     /// `columns` contain partition by expression results.
     fn get_per_partition_indices(
@@ -683,7 +664,7 @@ impl LinearSearch {
         window_expr: &[Arc<dyn WindowExpr>],
     ) -> Result<Vec<(PartitionKey, Vec<u32>)>> {
         let partition_by_columns =
-            self.evaluate_partition_by_column_values(input_buffer, window_expr)?;
+            evaluate_partition_by_column_values(input_buffer, window_expr)?;
         // Reset the row_map state:
         self.row_map_out.clear();
         let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
@@ -852,6 +833,24 @@ impl SortedSearch {
     }
 }
 
+/// Calculates partition by expression results for each window expression
+/// on `record_batch`.
+fn evaluate_partition_by_column_values(
+    record_batch: &RecordBatch,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<ArrayRef>> {
+    window_expr[0]
+        .partition_by()
+        .iter()
+        .map(|item| match item.evaluate(record_batch)? {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(scalar) => {
+                scalar.to_array_of_size(record_batch.num_rows())
+            }
+        })
+        .collect()
+}
+
 /// Stream for the bounded window aggregation plan.
 pub struct BoundedWindowAggStream {
     schema: SchemaRef,