Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: sql: add COPY to sampled_query logging #86717

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,12 @@ func (s *Server) GetExecutorConfig() *ExecutorConfig {
//
// Args:
// args: The initial session parameters. They are validated by SetupConn
// and an error is returned if this validation fails.
// and an error is returned if this validation fails.
// stmtBuf: The incoming statement for the new connExecutor.
// clientComm: The interface through which the new connExecutor is going to
// produce results for the client.
// produce results for the client.
// memMetrics: The metrics that statements executed on this connection will
// contribute to.
// contribute to.
func (s *Server) SetupConn(
ctx context.Context,
args SessionArgs,
Expand Down Expand Up @@ -1556,7 +1556,8 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData {
// Args:
// parentMon: The root monitor.
// reserved: Memory reserved for the connection. The connExecutor takes
// ownership of this memory.
//
// ownership of this memory.
func (ex *connExecutor) activate(
ctx context.Context, parentMon *mon.BytesMonitor, reserved mon.BoundAccount,
) {
Expand Down Expand Up @@ -2110,19 +2111,13 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
func (ex *connExecutor) execCopyIn(
ctx context.Context, cmd CopyIn,
) (_ fsm.Event, retPayload fsm.EventPayload, retErr error) {
logStatements := logStatementsExecuteEnabled.Get(ex.planner.execCfg.SV())

ex.incrementStartedStmtCounter(cmd.Stmt)
defer func() {
if retErr != nil {
log.SqlExec.Errorf(ctx, "error executing %s: %+v", cmd, retErr)
}
}()

if logStatements {
log.SqlExec.Infof(ctx, "executing %s", cmd)
}

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()

Expand Down Expand Up @@ -2180,8 +2175,36 @@ func (ex *connExecutor) execCopyIn(
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS)
}

ex.planner.stmt = Statement{
Statement: cmd.ParsedStmt,
}
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Annotations = &ann
ex.planner.extendedEvalCtx.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt
var cm copyMachineInterface
var err error
defer func() {
var numInsertedRows int
if cm != nil {
numInsertedRows = cm.numInsertedRows()
}
// These fields are not available in COPY, so use the empty value.
var stmtFingerprintID roachpb.StmtFingerprintID
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
ex.extraTxnState.autoRetryCounter,
ex.extraTxnState.txnCounter,
numInsertedRows,
err,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
)
}()
if isCopyToExternalStorage(cmd) {
cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg)
} else {
Expand All @@ -2199,7 +2222,8 @@ func (ex *connExecutor) execCopyIn(
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil
}
if err := cm.run(ctx); err != nil {

if err = cm.run(ctx); err != nil {
// TODO(andrei): We don't have a retriable error story for the copy machine.
// When running outside of a txn, the copyMachine should probably do retries
// internally. When not, it's unclear what we should do. For now, we abort
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ var _ Command = Flush{}

// CopyIn is the command for execution of the Copy-in pgwire subprotocol.
type CopyIn struct {
Stmt *tree.CopyFrom
ParsedStmt parser.Statement
Stmt *tree.CopyFrom
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

type copyMachineInterface interface {
run(ctx context.Context) error
numInsertedRows() int
}

// copyMachine supports the Copy-in pgwire subprotocol (COPY...FROM STDIN). The
Expand Down Expand Up @@ -222,6 +223,13 @@ func newCopyMachine(
return c, nil
}

func (c *copyMachine) numInsertedRows() int {
if c == nil {
return 0
}
return c.insertedRows
}

// copyTxnOpt contains information about the transaction in which the copying
// should take place. Can be empty, in which case the copyMachine is responsible
// for managing its own transactions.
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func CopyInFileStmt(destination, schema, table string) string {
)
}

func (f *fileUploadMachine) numInsertedRows() int {
if f == nil {
return 0
}
return f.c.numInsertedRows()
}

func (f *fileUploadMachine) run(ctx context.Context) error {
err := f.c.run(ctx)
if err != nil && f.cancel != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ func logEventInternalForSQLStatements(
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(entry eventLogEntry) error {
event := entry.event
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
if txn != nil {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
}
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,13 @@ func (c *conn) serveImpl(
//
// Args:
// ac: An interface used by the authentication process to receive password data
// and to ultimately declare the authentication successful.
// and to ultimately declare the authentication successful.
// reserved: Reserved memory. This method takes ownership.
// cancelConn: A function to be called when this goroutine exits. Its goal is to
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
func (c *conn) processCommandsAsync(
ctx context.Context,
authOpt authOptions,
Expand Down Expand Up @@ -788,7 +788,7 @@ func (c *conn) handleSimpleQuery(
}
copyDone := sync.WaitGroup{}
copyDone.Add(1)
if err := c.stmtBuf.Push(ctx, sql.CopyIn{Conn: c, Stmt: cp, CopyDone: &copyDone}); err != nil {
if err := c.stmtBuf.Push(ctx, sql.CopyIn{Conn: c, ParsedStmt: stmts[i], Stmt: cp, CopyDone: &copyDone}); err != nil {
return err
}
copyDone.Wait()
Expand Down