Skip to content

Commit

Permalink
pgwire: allow Flush message during portal exeuction
Browse files Browse the repository at this point in the history
Release note (bug fix): A Flush message sent during portal execution in
the pgwire extended protocol no longer results in an error.
  • Loading branch information
rafiss committed Jun 30, 2022
1 parent 9d21103 commit 9413d38
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,13 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
if err := r.conn.Flush(r.pos); err != nil {
return err
}
case sql.Flush:
// Flush has no client response, so just advance the position and flush
// any existing results.
r.conn.stmtBuf.AdvanceOne()
if err := r.conn.Flush(r.pos); err != nil {
return err
}
default:
// If the portal is immediately followed by a COMMIT, we can proceed and
// let the portal be destroyed at the end of the transaction.
Expand Down
68 changes: 67 additions & 1 deletion pkg/sql/pgwire/testdata/pgtest/portals
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

# Execute a portal with limited rows inside a transaction.
# This also tests that we can handle Flush during the portal execution.

send
Query {"String": "BEGIN"}
Expand Down Expand Up @@ -174,6 +175,71 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"COMMIT"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Execute a portal with limited rows inside a transaction, and verify that we
# allow the Flush message.

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Bind
Execute {"MaxRows": 1}
Flush
Sync
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"ReadyForQuery","TxStatus":"T"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"T"}

# This is the second of 2 rows, but we don't expect a command complete
# yet.

send
Execute {"MaxRows": 1}
Flush
Sync
----

until
ReadyForQuery
----
{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"T"}

# There were only 2 rows, so this third execute should return a command
# complete.

send
Execute {"MaxRows": 1}
Flush
Sync
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SELECT 0"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Query {"String": "COMMIT"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"COMMIT"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "SELECT 'here'"}
----
Expand Down Expand Up @@ -354,7 +420,7 @@ Execute {"MaxRows": 2}
Sync
----

until
until ignore=NoticeResponse
ReadyForQuery
ReadyForQuery
ReadyForQuery
Expand Down
2 changes: 2 additions & 0 deletions pkg/testutils/pgtest/datadriven.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func toMessage(typ string) interface{} {
return &pgproto3.ErrorResponse{}
case "Execute":
return &pgproto3.Execute{}
case "Flush":
return &pgproto3.Flush{}
case "Parse":
return &pgproto3.Parse{}
case "PortalSuspended":
Expand Down

0 comments on commit 9413d38

Please sign in to comment.