Skip to content

Commit

Permalink
bulker: support WithSchema options
Browse files Browse the repository at this point in the history
bulker: bulker endpoint read schema from X-Jitsu-Schema header
sync-sidecar: pass stream schema to bulker
sync-sidecar: support customized table_name for streams
  • Loading branch information
absorbb committed Mar 27, 2024
1 parent 63b9090 commit de811f6
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 34 deletions.
15 changes: 14 additions & 1 deletion bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"crypto/sha512"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -158,9 +159,10 @@ func (r *Router) BulkHandler(c *gin.Context) {
jobId := c.DefaultQuery("jobId", fmt.Sprintf("%s_%s_%s", destinationId, tableName, taskId))
bulkMode := bulker.BulkMode(c.DefaultQuery("mode", string(bulker.ReplaceTable)))
pkeys := c.QueryArray("pk")

schemaHeader := c.GetHeader("X-Jitsu-Schema")
mode := ""
bytesRead := 0
var err error
var rError *appbase.RouterError
var processedObjectSample types.Object
var state bulker.State
Expand Down Expand Up @@ -190,6 +192,17 @@ func (r *Router) BulkHandler(c *gin.Context) {
if len(pkeys) > 0 {
streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithDeduplicate())
}
if schemaHeader != "" {
schema := types.Schema{}
err = json.Unmarshal([]byte(schemaHeader), &schema)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "schema unmarshal error", false, err, true)
return
}
if !schema.IsEmpty() {
streamOptions = append(streamOptions, bulker.WithSchema(schema))
}
}
//streamOptions = append(streamOptions, sql.WithoutOmitNils())
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
Expand Down
22 changes: 14 additions & 8 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
const unmappedDataColumn = "_unmapped_data"

type AbstractSQLStream struct {
id string
sqlAdapter SQLAdapter
mode bulker.BulkMode
options bulker.StreamOptions
tableName string
merge bool
mergeWindow int
omitNils bool
id string
sqlAdapter SQLAdapter
mode bulker.BulkMode
options bulker.StreamOptions
tableName string
merge bool
mergeWindow int
omitNils bool
schemaFromOptions *Table

state bulker.State
inited bool
Expand Down Expand Up @@ -56,6 +57,11 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
ps.timestampColumn = bulker.TimestampOption.Get(&ps.options)
ps.omitNils = OmitNilsOption.Get(&ps.options)

schema := bulker.SchemaOption.Get(&ps.options)
if !schema.IsEmpty() {
ps.schemaFromOptions = ps.sqlAdapter.TableHelper().MapSchema(ps.sqlAdapter, schema)
}

//TODO: max column?
ps.state = bulker.State{Status: bulker.Active}
ps.customTypes = customFields
Expand Down
3 changes: 3 additions & 0 deletions bulkerlib/implementations/sql/autocommit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
return
}
table, processedObject, err := ps.preprocess(object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object)
}
if err != nil {
return
}
Expand Down
27 changes: 27 additions & 0 deletions bulkerlib/implementations/sql/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,33 @@ func TestBasics(t *testing.T) {
configIds: allBulkerConfigs,
streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp")},
},
{
name: "schema_option",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream, bulker.ReplaceTable, bulker.ReplacePartition},
expectPartitionId: true,
dataFile: "test_data/schema_option.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "column1", "column2", "column3", "id", "name"),
},
expectedRowsCount: 2,
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": nil, "column1": nil, "column2": nil, "column3": nil},
{"_timestamp": constantTime, "id": 2, "name": nil, "column1": nil, "column2": nil, "column3": nil},
},
expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported},
configIds: allBulkerConfigs,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "schema_option",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "column1", Type: types2.STRING},
{Name: "column2", Type: types2.STRING},
{Name: "column3", Type: types2.STRING},
},
})},
},
}
for _, tt := range tests {
tt := tt
Expand Down
3 changes: 3 additions & 0 deletions bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := tableForObject
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
return &Table{
Name: tmpTableName,
Expand Down
6 changes: 5 additions & 1 deletion bulkerlib/implementations/sql/replacetable_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti
return nil, err
}
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
return &Table{
tmpTable := &Table{
Name: fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405")),
PrimaryKeyName: tableForObject.PrimaryKeyName,
//PrimaryKeyName: fmt.Sprintf("%s_%s", tableForObject.PrimaryKeyName, time.Now().Format("060102_150405")),
PKFields: tableForObject.PKFields,
Columns: tableForObject.Columns,
TimestampColumn: tableForObject.TimestampColumn,
}
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(tmpTable, nil, ps.schemaFromOptions, object)
}
return tmpTable
}
return &ps, nil
}
Expand Down
20 changes: 20 additions & 0 deletions bulkerlib/implementations/sql/table_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesH
}
}

// MapSchema maps types.Schema into types.Table (structure with SQL types)
func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema) *Table {
table := &Table{
Name: sqlAdapter.TableName(schema.Name),
Columns: Columns{},
}

for _, field := range schema.Fields {
colName := th.ColumnName(field.Name)
//map Jitsu type -> SQL type
sqlType, ok := sqlAdapter.GetSQLType(field.Type)
if ok {
table.Columns[colName] = types2.SQLColumn{DataType: field.Type, Type: sqlType, New: true}
} else {
logging.SystemErrorf("Unknown column type %s mapping for %s", field.Type, sqlAdapter.Type())
}
}
return table
}

// EnsureTableWithCaching calls ensureTable with cacheTable = true
// it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)
func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions bulkerlib/implementations/sql/test_data/schema_option.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 1}
{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 2}
3 changes: 3 additions & 0 deletions bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := tableForObject
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
return &Table{
Name: tmpTableName,
Expand Down
26 changes: 26 additions & 0 deletions bulkerlib/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package bulkerlib

import (
"encoding/json"
"fmt"
"github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/jitsubase/utils"
)

Expand Down Expand Up @@ -88,6 +90,25 @@ var (
Key: "timestampColumn",
ParseFunc: utils.ParseString,
}

SchemaOption = ImplementationOption[types.Schema]{
Key: "schema",
ParseFunc: func(serialized any) (types.Schema, error) {
switch v := serialized.(type) {
case types.Schema:
return v, nil
case string:
schema := types.Schema{}
err := json.Unmarshal([]byte(v), &schema)
if err != nil {
return types.Schema{}, fmt.Errorf("failed to parse schema: %v", err)
}
return schema, nil
default:
return types.Schema{}, fmt.Errorf("invalid value type of schema option: %T", v)
}
},
}
)

func init() {
Expand All @@ -100,6 +121,7 @@ func init() {
RegisterOption(&DeduplicateOption)
RegisterOption(&PartitionIdOption)
RegisterOption(&TimestampOption)
RegisterOption(&SchemaOption)

dummyParse := func(_ any) (any, error) { return nil, nil }
for _, ignoredOption := range ignoredOptions {
Expand Down Expand Up @@ -211,3 +233,7 @@ func WithPartition(partitionId string) StreamOption {
func WithTimestamp(timestampField string) StreamOption {
return WithOption(&TimestampOption, timestampField)
}

func WithSchema(schema types.Schema) StreamOption {
return WithOption(&SchemaOption, schema)
}
15 changes: 15 additions & 0 deletions bulkerlib/types/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

type Schema struct {
Name string `json:"name"`
Fields []SchemaField `json:"fields"`
}

type SchemaField struct {
Name string `json:"name"`
Type DataType `json:"type"`
}

func (s Schema) IsEmpty() bool {
return len(s.Fields) == 0
}
14 changes: 14 additions & 0 deletions jitsubase/utils/bool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"fmt"
"strconv"
"strings"
)

// ParseBool parses value of string, int or bool into a bool.
Expand All @@ -19,6 +20,19 @@ func ParseBool(value any) (bool, error) {
}
}

func IsTruish(value any) bool {
switch v := value.(type) {
case string:
return strings.ToLower(v) == "true" || v == "1"
case int:
return v != 0
case bool:
return v
default:
return false
}
}

func BoolPointer(b bool) *bool {
return &b
}
25 changes: 9 additions & 16 deletions sync-sidecar/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package main

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/sync-sidecar/db"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -145,25 +144,19 @@ func (s *AbstractSideCar) sendLog(logger, level string, message string) error {
return db.InsertTaskLog(s.dbpool, uuid.New().String(), level, logger, message, s.syncId, s.taskId, time.Now())
}

func (s *AbstractSideCar) bulkerEvent(connection, tableName string, payload any) error {
v, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("error marshalling event payload %v for %s: %v", payload, tableName, err)
}
body := bytes.NewReader(v)
_, err = s.bulkerRequest(fmt.Sprintf("%s/post/%s?tableName=%s", s.bulkerURL, connection, url.QueryEscape(tableName)), body)
if err != nil {
return fmt.Errorf("error sending event to %s: %v", tableName, err)
}
return nil
}

func (s *AbstractSideCar) bulkerRequest(url string, payload io.Reader) ([]byte, error) {
func (s *AbstractSideCar) bulkerRequest(url string, payload io.Reader, schema types.Schema) ([]byte, error) {
req, err := http.NewRequest("POST", url, payload)
if err != nil {
return nil, fmt.Errorf("error creating POST %s request: %v", url, err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.bulkerAuthToken))
if !schema.IsEmpty() {
schemaBytes, err := json.Marshal(schema)
if err != nil {
return nil, fmt.Errorf("error marshalling schema: %v", err)
}
req.Header.Set("X-Jitsu-Schema", string(schemaBytes))
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("POST %s error: %v", url, err)
Expand Down
11 changes: 6 additions & 5 deletions sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/sync-sidecar/db"
"io"
"net/url"
Expand Down Expand Up @@ -41,7 +42,7 @@ func (s *StreamStat) Merge(chunk *StreamStat) {
type ActiveStream struct {
name string
mode string
// write stream of io.Pipe. Bulker reads from this read stream of this pipe
// write stream of io.Pipe. Bulker reads from the read stream of this pipe
writer *io.PipeWriter
// wait group to wait for stream to finish HTTP request to bulker fully completed after closing writer
waitGroup sync.WaitGroup
Expand Down Expand Up @@ -312,14 +313,14 @@ func (s *ReadSideCar) openStream(streamName string, previousStats *StreamStat) *
// switch from replace_table to batch mode, to continue adding data to the table
mode = "batch"
}

bulkerConnFunc := func(streamReader *io.PipeReader) {
s.log("Creating bulker stream: %s mode: %s primary keys: %s", streamName, mode, str.GetPrimaryKeys())
bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(s.tableNamePrefix+streamName), mode, s.taskId)
tableName := utils.NvlString(str.TableName, s.tableNamePrefix+streamName)
s.log("Creating bulker stream: %s table: %s mode: %s primary keys: %s", streamName, tableName, mode, str.GetPrimaryKeys())
bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(tableName), mode, s.taskId)
for _, v := range str.GetPrimaryKeys() {
bulkerUrl += fmt.Sprintf("&pk=%s", url.QueryEscape(v))
}
_, err := s.bulkerRequest(bulkerUrl, streamReader)
_, err := s.bulkerRequest(bulkerUrl, streamReader, str.ToSchema())
if err != nil {
s.err("error sending bulk: %v", err)
return
Expand Down
Loading

0 comments on commit de811f6

Please sign in to comment.