diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 5180a3069b56..9a9f61f21a73 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -507,6 +507,13 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { if err := r.conn.Flush(r.pos); err != nil { return err } + case sql.Flush: + // Flush has no client response, so just advance the position and flush + // any existing results. + r.conn.stmtBuf.AdvanceOne() + if err := r.conn.Flush(r.pos); err != nil { + return err + } default: // If the portal is immediately followed by a COMMIT, we can proceed and // let the portal be destroyed at the end of the transaction. diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index def589e3c10c..8f641e91fe46 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -114,6 +114,7 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} # Execute a portal with limited rows inside a transaction. +# This also tests that we can handle Flush during the portal execution. send Query {"String": "BEGIN"} @@ -174,6 +175,71 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"COMMIT"} {"Type":"ReadyForQuery","TxStatus":"I"} +# Execute a portal with limited rows inside a transaction, and verify that we +# allow the Flush message. + +send +Query {"String": "BEGIN"} +Parse {"Query": "SELECT * FROM generate_series(1, 2)"} +Bind +Execute {"MaxRows": 1} +Flush +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# This is the second of 2 rows, but we don't expect a command complete +# yet. + +send +Execute {"MaxRows": 1} +Flush +Sync +---- + +until +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# There were only 2 rows, so this third execute should return a command +# complete. + +send +Execute {"MaxRows": 1} +Flush +Sync +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SELECT 0"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + send Query {"String": "SELECT 'here'"} ---- @@ -354,7 +420,7 @@ Execute {"MaxRows": 2} Sync ---- -until +until ignore=NoticeResponse ReadyForQuery ReadyForQuery ReadyForQuery diff --git a/pkg/testutils/pgtest/datadriven.go b/pkg/testutils/pgtest/datadriven.go index 2062359f3fcf..212769b7838c 100644 --- a/pkg/testutils/pgtest/datadriven.go +++ b/pkg/testutils/pgtest/datadriven.go @@ -288,6 +288,8 @@ func toMessage(typ string) interface{} { return &pgproto3.ErrorResponse{} case "Execute": return &pgproto3.Execute{} + case "Flush": + return &pgproto3.Flush{} case "Parse": return &pgproto3.Parse{} case "PortalSuspended":