diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go
index 60303cf4bf5..9eef54bd0bb 100644
--- a/go/vt/vttablet/endtoend/config_test.go
+++ b/go/vt/vttablet/endtoend/config_test.go
@@ -108,64 +108,88 @@ func TestDisableConsolidator(t *testing.T) {
 }
 
 func TestConsolidatorReplicasOnly(t *testing.T) {
-	totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
-	initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
-	var wg sync.WaitGroup
-	wg.Add(2)
-	go func() {
-		framework.NewClient().Execute("select sleep(0.5) from dual", nil)
-		wg.Done()
-	}()
-	go func() {
-		framework.NewClient().Execute("select sleep(0.5) from dual", nil)
-		wg.Done()
-	}()
-	wg.Wait()
-	afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
-	assert.Equal(t, initial+1, afterOne, "expected one consolidation")
-
-	revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
-	defer revert()
-
-	// primary should not do query consolidation
-	var wg2 sync.WaitGroup
-	wg2.Add(2)
-	go func() {
-		framework.NewClient().Execute("select sleep(0.5) from dual", nil)
-		wg2.Done()
-	}()
-	go func() {
-		framework.NewClient().Execute("select sleep(0.5) from dual", nil)
-		wg2.Done()
-	}()
-	wg2.Wait()
-	noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
-	assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
-
-	// become a replica, where query consolidation should happen
-	client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
-
-	err := client.SetServingType(topodatapb.TabletType_REPLICA)
-	require.NoError(t, err)
-	defer func() {
-		err = client.SetServingType(topodatapb.TabletType_PRIMARY)
-		require.NoError(t, err)
-	}()
+	type executeFn func(
+		query string, bindvars map[string]*querypb.BindVariable,
+	) (*sqltypes.Result, error)
+
+	testCases := []struct {
+		name                   string
+		getExecuteFn           func(qc *framework.QueryClient) executeFn
+		totalConsolidationsTag string
+	}{
+		{
+			name:                   "Execute",
+			getExecuteFn:           func(qc *framework.QueryClient) executeFn { return qc.Execute },
+			totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
+		},
+		{
+			name:                   "StreamExecute",
+			getExecuteFn:           func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
+			totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
+		},
+	}
 
-	initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
-	var wg3 sync.WaitGroup
-	wg3.Add(2)
-	go func() {
-		client.Execute("select sleep(0.5) from dual", nil)
-		wg3.Done()
-	}()
-	go func() {
-		client.Execute("select sleep(0.5) from dual", nil)
-		wg3.Done()
-	}()
-	wg3.Wait()
-	afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
-	assert.Equal(t, initial+1, afterOne, "expected another consolidation")
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+			var wg sync.WaitGroup
+			wg.Add(2)
+			go func() {
+				testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+				wg.Done()
+			}()
+			go func() {
+				testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+				wg.Done()
+			}()
+			wg.Wait()
+			afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+			assert.Equal(t, initial+1, afterOne, "expected one consolidation")
+
+			revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
+			defer revert()
+
+			// primary should not do query consolidation
+			var wg2 sync.WaitGroup
+			wg2.Add(2)
+			go func() {
+				testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+				wg2.Done()
+			}()
+			go func() {
+				testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+				wg2.Done()
+			}()
+			wg2.Wait()
+			noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+			assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
+
+			// become a replica, where query consolidation should happen
+			client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
+
+			err := client.SetServingType(topodatapb.TabletType_REPLICA)
+			require.NoError(t, err)
+			defer func() {
+				err = client.SetServingType(topodatapb.TabletType_PRIMARY)
+				require.NoError(t, err)
+			}()
+
+			initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+			var wg3 sync.WaitGroup
+			wg3.Add(2)
+			go func() {
+				testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
+				wg3.Done()
+			}()
+			go func() {
+				testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
+				wg3.Done()
+			}()
+			wg3.Wait()
+			afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+			assert.Equal(t, initial+1, afterOne, "expected another consolidation")
+		})
+	}
 }
 
 func TestQueryPlanCache(t *testing.T) {
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 3d1d9b4b87c..c524cb3b3bd 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -340,7 +340,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
 
 	if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
 		if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
-			return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
+			return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
 				func(callback StreamCallback) error {
 					dbConn, err := qre.getStreamConn()
 					if err != nil {
diff --git a/go/vt/vttablet/tabletserver/stream_consolidator.go b/go/vt/vttablet/tabletserver/stream_consolidator.go
index 497c9011040..22888419dc9 100644
--- a/go/vt/vttablet/tabletserver/stream_consolidator.go
+++ b/go/vt/vttablet/tabletserver/stream_consolidator.go
@@ -19,9 +19,11 @@ package tabletserver
 import (
 	"sync"
 	"sync/atomic"
+	"time"
 
 	"vitess.io/vitess/go/sqltypes"
 	vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
+	"vitess.io/vitess/go/vt/servenv"
 	"vitess.io/vitess/go/vt/vterrors"
 	"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
 )
@@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
 // `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
 // query in the upstream MySQL server, yielding results into the modified callback that it receives
 // as an argument.
-func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
+func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
 	var (
 		inflight        *streamInFlight
 		catchup         []*sqltypes.Result
@@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri
 
 	// if we have a followChan, we're following up on a query that is already being served
 	if followChan != nil {
+		startTime := time.Now()
 		defer func() {
 			memchange := inflight.unfollow(followChan, sc.cleanup)
 			atomic.AddInt64(&sc.memory, memchange)
+			waitTimings.Record("StreamConsolidations", startTime)
 		}()
 
 		logStats.QuerySources |= tabletenv.QuerySourceConsolidator
diff --git a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
index 0c903933412..caa519cc477 100644
--- a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
+++ b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
@@ -28,6 +28,7 @@ import (
 
 	"github.com/stretchr/testify/require"
 
+	"vitess.io/vitess/go/vt/servenv"
 	"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
 
 	"vitess.io/vitess/go/sqltypes"
@@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string
 
 		go func(worker int) {
 			defer wg.Done()
+			exporter := servenv.NewExporter("ConsolidatorTest", "")
+			timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
 			logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
 			query, callback := generateCallback(worker)
 			start := time.Now()
-			err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
+			err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
 				cr := ct.results[worker]
 				cr.items = append(cr.items, result)
 				atomic.AddInt64(&cr.count, 1)
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index fa789b7144f..3247ec1d6b3 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -913,6 +913,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
 				ctx:            ctx,
 				logStats:       logStats,
 				tsv:            tsv,
+				tabletType:     target.GetTabletType(),
 				setting:        connSetting,
 			}
 			return qre.Stream(callback)