Skip to content

Commit

Permalink
sql/pgwire: populate table and column ID in RowDescription
Browse files Browse the repository at this point in the history
The RowDescription in the wire protocol contains metadata that points to
the source of the returned column. Drivers such as pgjdbc rely on this
metadata in order for certain functionality to work.

Release note (sql change): The RowDescription message of the pgwire
protocol now contains the table OID and column ID for each column in the
result set. These values correspond to pg_attribute.attrelid and
pg_attribute.attnum. If a result column does not refer to a simple table
or view, these values will be zero, as they were before.
  • Loading branch information
rafiss committed May 12, 2020
1 parent b899959 commit 1a085c0
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 29 deletions.
7 changes: 6 additions & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ func newCopyMachine(
}
c.resultColumns = make(sqlbase.ResultColumns, len(cols))
for i := range cols {
c.resultColumns[i] = sqlbase.ResultColumn{Typ: &cols[i].Type}
c.resultColumns[i] = sqlbase.ResultColumn{
Name: cols[i].Name,
Typ: &cols[i].Type,
TableID: tableDesc.GetID(),
PGAttributeNum: cols[i].ID,
}
}
c.rowsMemAcc = c.p.extendedEvalCtx.Mon.MakeBoundAccount()
c.bufMemAcc = c.p.extendedEvalCtx.Mon.MakeBoundAccount()
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func MakeTableDesc(
// Now that we've constructed our columns, we pop into any of our computed
// columns so that we can dequalify any column references.
sourceInfo := sqlbase.NewSourceInfoForSingleTable(
n.Table, sqlbase.ResultColumnsFromColDescs(desc.Columns),
n.Table, sqlbase.ResultColumnsFromColDescs(desc.GetID(), desc.Columns),
)

for i := range desc.Columns {
Expand Down Expand Up @@ -2134,7 +2134,10 @@ func MakeCheckConstraint(
sort.Sort(sqlbase.ColumnIDs(colIDs))

sourceInfo := sqlbase.NewSourceInfoForSingleTable(
tableName, sqlbase.ResultColumnsFromColDescs(desc.TableDesc().AllNonDropColumns()),
tableName, sqlbase.ResultColumnsFromColDescs(
desc.GetID(),
desc.TableDesc().AllNonDropColumns(),
),
)

expr, err = dequalifyColumnRefs(ctx, sourceInfo, d.Expr)
Expand Down
34 changes: 22 additions & 12 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (ef *execFactory) ConstructVirtualScan(table cat.Table) (exec.Node, error)
if err != nil {
return nil, err
}
columns, constructor := virtual.getPlanInfo()
columns, constructor := virtual.getPlanInfo(table.(*optVirtualTable).desc.TableDesc())

return &delayedNode{
columns: columns,
Expand Down Expand Up @@ -220,9 +220,9 @@ func (ef *execFactory) ConstructSimpleProject(
for i, col := range cols {
v := rb.r.ivarHelper.IndexedVar(int(col))
if colNames == nil {
rb.addExpr(v, inputCols[col].Name)
rb.addExpr(v, inputCols[col].Name, inputCols[col].TableID, inputCols[col].PGAttributeNum)
} else {
rb.addExpr(v, colNames[i])
rb.addExpr(v, colNames[i], 0 /* tableID */, 0 /* pgAttributeNum */)
}
}
return rb.res, nil
Expand All @@ -247,7 +247,7 @@ func (ef *execFactory) ConstructRender(
rb.init(n, reqOrdering, len(exprs))
for i, expr := range exprs {
expr = rb.r.ivarHelper.Rebind(expr, false /* alsoReset */, true /* normalizeToNonNil */)
rb.addExpr(expr, colNames[i])
rb.addExpr(expr, colNames[i], 0 /* tableID */, 0 /* pgAttributeNum */)
}
return rb.res, nil
}
Expand Down Expand Up @@ -565,7 +565,7 @@ func (ef *execFactory) ConstructIndexJoin(
input: input.(planNode),
table: tableScan,
cols: colDescs,
resultColumns: sqlbase.ResultColumnsFromColDescs(colDescs),
resultColumns: sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), colDescs),
reqOrdering: ReqOrdering(reqOrdering),
}

Expand Down Expand Up @@ -1175,7 +1175,7 @@ func (ef *execFactory) ConstructInsert(
// If rows are not needed, no columns are returned.
if rowsNeeded {
returnColDescs := makeColDescList(table, returnColOrdSet)
ins.columns = sqlbase.ResultColumnsFromColDescs(returnColDescs)
ins.columns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), returnColDescs)

// Set the tabColIdxToRetIdx for the mutation. Insert always returns
// non-mutation columns in the same order they are defined in the table.
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func (ef *execFactory) ConstructInsertFastPath(
// If rows are not needed, no columns are returned.
if rowsNeeded {
returnColDescs := makeColDescList(table, returnColOrdSet)
ins.columns = sqlbase.ResultColumnsFromColDescs(returnColDescs)
ins.columns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), returnColDescs)

// Set the tabColIdxToRetIdx for the mutation. Insert always returns
// non-mutation columns in the same order they are defined in the table.
Expand Down Expand Up @@ -1371,7 +1371,7 @@ func (ef *execFactory) ConstructUpdate(
if rowsNeeded {
returnColDescs := makeColDescList(table, returnColOrdSet)

upd.columns = sqlbase.ResultColumnsFromColDescs(returnColDescs)
upd.columns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), returnColDescs)
// Add the passthrough columns to the returning columns.
upd.columns = append(upd.columns, passthrough...)

Expand Down Expand Up @@ -1527,7 +1527,7 @@ func (ef *execFactory) ConstructUpsert(
// If rows are not needed, no columns are returned.
if rowsNeeded {
returnColDescs := makeColDescList(table, returnColOrdSet)
ups.columns = sqlbase.ResultColumnsFromColDescs(returnColDescs)
ups.columns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), returnColDescs)

// Update the tabColIdxToRetIdx for the mutation. Upsert returns
// non-mutation columns specified, in the same order they are defined
Expand Down Expand Up @@ -1626,7 +1626,7 @@ func (ef *execFactory) ConstructDelete(
returnColDescs := makeColDescList(table, returnColOrdSet)
// Delete returns the non-mutation columns specified, in the same
// order they are defined in the table.
del.columns = sqlbase.ResultColumnsFromColDescs(returnColDescs)
del.columns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), returnColDescs)

del.run.rowIdxToRetIdx = row.ColMapping(rd.FetchCols, returnColDescs)
del.run.rowsNeeded = true
Expand Down Expand Up @@ -1885,9 +1885,19 @@ func (rb *renderBuilder) init(n exec.Node, reqOrdering exec.OutputOrdering, cap
}

// addExpr adds a new render expression with the given name.
func (rb *renderBuilder) addExpr(expr tree.TypedExpr, colName string) {
func (rb *renderBuilder) addExpr(
expr tree.TypedExpr, colName string, tableID sqlbase.ID, pgAttributeNum sqlbase.ColumnID,
) {
rb.r.render = append(rb.r.render, expr)
rb.r.columns = append(rb.r.columns, sqlbase.ResultColumn{Name: colName, Typ: expr.ResolvedType()})
rb.r.columns = append(
rb.r.columns,
sqlbase.ResultColumn{
Name: colName,
Typ: expr.ResolvedType(),
TableID: tableID,
PGAttributeNum: pgAttributeNum,
},
)
}

// makeColDescList returns a list of table column descriptors. Columns are
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,8 +1284,8 @@ func (c *conn) writeRowDescription(
}
c.msgBuilder.writeTerminatedString(column.Name)
typ := pgTypeForParserType(column.Typ)
c.msgBuilder.putInt32(0) // Table OID (optional).
c.msgBuilder.putInt16(0) // Column attribute ID (optional).
c.msgBuilder.putInt32(int32(column.TableID)) // Table OID (optional).
c.msgBuilder.putInt16(int16(column.PGAttributeNum)) // Column attribute ID (optional).
c.msgBuilder.putInt32(int32(mapResultOid(typ.oid)))
c.msgBuilder.putInt16(int16(typ.size))
// The type modifier (atttypmod) is used to include various extra information
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/pgwire/testdata/pgtest/int_size
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Query {"String": "SELECT * FROM t1"}
until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":52,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":52,"TableAttributeNumber":2,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":52,"TableAttributeNumber":3,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"CommandComplete","CommandTag":"SELECT 0"}
{"Type":"ReadyForQuery","TxStatus":"I"}

Expand Down Expand Up @@ -80,7 +80,7 @@ Query {"String": "SELECT * FROM t2"}
until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":53,"TableAttributeNumber":1,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":53,"TableAttributeNumber":2,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":53,"TableAttributeNumber":3,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"CommandComplete","CommandTag":"SELECT 0"}
{"Type":"ReadyForQuery","TxStatus":"I"}

Expand All @@ -92,6 +92,6 @@ Query {"String": "SELECT * FROM t1"}
until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":52,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"b","TableOID":52,"TableAttributeNumber":2,"DataTypeOID":23,"DataTypeSize":4,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":52,"TableAttributeNumber":3,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"CommandComplete","CommandTag":"SELECT 0"}
{"Type":"ReadyForQuery","TxStatus":"I"}
89 changes: 89 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/row_description
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# This test verifies that we're populating the TableID and PGAttributeNum in the
# RowDescription message of the wire protocol. The IDs should remain consistent
# even when joining tables or when using views.

send
Query {"String": "CREATE TABLE tab1 (a int primary key, b int)"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE tab2 (c int primary key, tab1_a int REFERENCES tab1(a))"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "INSERT INTO tab1 VALUES(1,2)"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "INSERT INTO tab2 VALUES(4,1)"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE VIEW v (v1, v2) AS SELECT a, tab1_a FROM tab1 JOIN tab2 ON tab1.a = tab2.tab1_a"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE VIEW"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "SELECT a FROM tab1"}
----

until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":55,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "SELECT tab1.a, tab2.c FROM tab1 JOIN tab2 ON tab1.a = tab2.tab1_a"}
----

until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"a","TableOID":55,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"c","TableOID":56,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"4"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "SELECT * FROM v WHERE v1 = 1"}
----

until
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"v1","TableOID":55,"TableAttributeNumber":1,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0},{"Name":"v2","TableOID":56,"TableAttributeNumber":2,"DataTypeOID":20,"DataTypeSize":8,"TypeModifier":-1,"Format":0}]}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
2 changes: 1 addition & 1 deletion pkg/sql/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (n *scanNode) initDescDefaults(planDeps planDependencies, colCfg scanColumn
}

// Set up the rest of the scanNode.
n.resultColumns = sqlbase.ResultColumnsFromColDescs(n.cols)
n.resultColumns = sqlbase.ResultColumnsFromColDescs(n.desc.GetID(), n.cols)
n.colIdxMap = make(map[sqlbase.ColumnID]int, len(n.cols))
for i, c := range n.cols {
n.colIdxMap[c.ID] = i
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlbase/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewEvalCheckHelper(
c.cols = tableDesc.AllNonDropColumns()
c.sourceInfo = NewSourceInfoForSingleTable(
tree.MakeUnqualifiedTableName(tree.Name(tableDesc.Name)),
ResultColumnsFromColDescs(c.cols),
ResultColumnsFromColDescs(tableDesc.GetID(), c.cols),
)

c.Exprs = make([]tree.TypedExpr, len(tableDesc.ActiveChecks()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlbase/computed_exprs.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ func MakeComputedExprs(
iv := &descContainer{tableDesc.Columns}
ivarHelper := tree.MakeIndexedVarHelper(iv, len(tableDesc.Columns))

source := NewSourceInfoForSingleTable(*tn, ResultColumnsFromColDescs(tableDesc.Columns))
source := NewSourceInfoForSingleTable(*tn, ResultColumnsFromColDescs(tableDesc.GetID(), tableDesc.Columns))
semaCtx := tree.MakeSemaContext()
semaCtx.IVarContainer = iv

addColumnInfo := func(col *ColumnDescriptor) {
ivarHelper.AppendSlot()
iv.cols = append(iv.cols, *col)
newCols := ResultColumnsFromColDescs([]ColumnDescriptor{*col})
newCols := ResultColumnsFromColDescs(tableDesc.GetID(), []ColumnDescriptor{*col})
source.SourceColumns = append(source.SourceColumns, newCols...)
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/sqlbase/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ type ResultColumn struct {

// If set, this is an implicit column; used internally.
Hidden bool

// TableID/PGAttributeNum identify the source of the column, if it is a simple
// reference to a column of a base table (or view). If it is not a simple
// reference, these fields are zeroes.
TableID ID // OID of column's source table (pg_attribute.attrelid).
PGAttributeNum ColumnID // Column's number in source table (pg_attribute.attnum).
}

// ResultColumns is the type used throughout the sql module to
// describe the column types of a table.
type ResultColumns []ResultColumn

// ResultColumnsFromColDescs converts ColumnDescriptors to ResultColumns.
func ResultColumnsFromColDescs(colDescs []ColumnDescriptor) ResultColumns {
func ResultColumnsFromColDescs(tableID ID, colDescs []ColumnDescriptor) ResultColumns {
cols := make(ResultColumns, 0, len(colDescs))
for i := range colDescs {
// Convert the ColumnDescriptor to ResultColumn.
Expand All @@ -42,7 +48,16 @@ func ResultColumnsFromColDescs(colDescs []ColumnDescriptor) ResultColumns {
}

hidden := colDesc.Hidden
cols = append(cols, ResultColumn{Name: colDesc.Name, Typ: typ, Hidden: hidden})
cols = append(
cols,
ResultColumn{
Name: colDesc.Name,
Typ: typ,
Hidden: hidden,
TableID: tableID,
PGAttributeNum: colDesc.ID,
},
)
}
return cols
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/virtual_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,17 @@ func newInvalidVirtualDefEntryError() error {
// valuesNode for the virtual table. We use deferred construction here
// so as to avoid populating a RowContainer during query preparation,
// where we can't guarantee it will be Close()d in case of error.
func (e virtualDefEntry) getPlanInfo() (sqlbase.ResultColumns, virtualTableConstructor) {
func (e virtualDefEntry) getPlanInfo(
table *sqlbase.TableDescriptor,
) (sqlbase.ResultColumns, virtualTableConstructor) {
var columns sqlbase.ResultColumns
for i := range e.desc.Columns {
col := &e.desc.Columns[i]
columns = append(columns, sqlbase.ResultColumn{
Name: col.Name,
Typ: &col.Type,
Name: col.Name,
Typ: &col.Type,
TableID: table.GetID(),
PGAttributeNum: col.ID,
})
}

Expand Down

0 comments on commit 1a085c0

Please sign in to comment.