Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
max-hoffman committed Dec 19, 2024
1 parent 68d431c commit 5fd4a67
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
3 changes: 2 additions & 1 deletion enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5743,6 +5743,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve
require.NoError(t, err)
expectedRowSet := script.Results[queryIdx]
expectedRowIdx := 0
buf := sql.NewByteBuffer(1000)
var engineRow sql.Row
for engineRow, err = engineIter.Next(ctx); err == nil; engineRow, err = engineIter.Next(ctx) {
if !assert.True(t, r.Next()) {
Expand All @@ -5760,7 +5761,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve
break
}
expectedEngineRow := make([]*string, len(engineRow))
row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil)
row, err := server.RowToSQL(ctx, sch, engineRow, nil, buf)
if !assert.NoError(t, err) {
break
}
Expand Down
38 changes: 20 additions & 18 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,10 @@ func (h *Handler) doQuery(
var r *sqltypes.Result
var processedAtLeastOneBatch bool

buf := sql.SingletonBuf
buf := sql.ByteBufPool.Get().(*sql.ByteBuffer)
defer func() {
sql.SingletonBuf.Reset()
buf.Reset()
sql.ByteBufPool.Put(buf)
}()

// zero/single return schema use spooling shortcut
Expand Down Expand Up @@ -930,28 +931,35 @@ func updateMaxUsedConnectionsStatusVariable() {
}()
}

func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, error) {
if buf == nil {
return typ.SQL(ctx, buf.Get(), val)
}
spare := buf.Spare()
ret, err := typ.SQL(ctx, buf.Get(), val)
if ret.Len() > spare {
buf.Double()
} else {
buf.Advance(ret.Len())
}
return ret, err
}

func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, buf *sql.ByteBuffer) ([]sqltypes.Value, error) {
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(sch) == 0 {
return []sqltypes.Value{}, nil
}

outVals := make([]sqltypes.Value, len(sch))
var err error
if len(projs) == 0 {
for i, col := range sch {
if row[i] == nil {
outVals[i] = sqltypes.NULL
continue
}
var err error
spare := buf.Spare()
outVals[i], err = col.Type.SQL(ctx, buf.Get(), row[i])
if outVals[i].Len() > spare {
buf.Double()
} else {
buf.Advance(outVals[i].Len())
}

outVals[i], err = toSqlHelper(ctx, col.Type, buf, row[i])
if err != nil {
return nil, err
}
Expand All @@ -968,13 +976,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express
outVals[i] = sqltypes.NULL
continue
}
spare := buf.Spare()
outVals[i], err = col.Type.SQL(ctx, buf.Get(), field)
if outVals[i].Len() > spare {
buf.Double()
} else {
buf.Advance(outVals[i].Len())
}
outVals[i], err = toSqlHelper(ctx, col.Type, buf, field)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions sql/byte_buffer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
package sql

import (
"sync"
)

var SingletonBuf = NewByteBuffer(16000)

var defaultByteBuffCap = 1000

var ByteBufPool = sync.Pool{
New: func() any {
// The Pool's New function should generally only return pointer
// types, since a pointer can be put into the return interface
// value without an allocation:
return NewByteBuffer(defaultByteBuffCap)
},
}

type ByteBuffer struct {
buf []byte
i int
Expand Down

0 comments on commit 5fd4a67

Please sign in to comment.