diff --git a/bulkerapp/app/router.go b/bulkerapp/app/router.go index 583fafd..bc8559e 100644 --- a/bulkerapp/app/router.go +++ b/bulkerapp/app/router.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "crypto/sha512" + "encoding/json" "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/gin-gonic/gin" @@ -158,9 +159,10 @@ func (r *Router) BulkHandler(c *gin.Context) { jobId := c.DefaultQuery("jobId", fmt.Sprintf("%s_%s_%s", destinationId, tableName, taskId)) bulkMode := bulker.BulkMode(c.DefaultQuery("mode", string(bulker.ReplaceTable))) pkeys := c.QueryArray("pk") - + schemaHeader := c.GetHeader("X-Jitsu-Schema") mode := "" bytesRead := 0 + var err error var rError *appbase.RouterError var processedObjectSample types.Object var state bulker.State @@ -190,6 +192,17 @@ func (r *Router) BulkHandler(c *gin.Context) { if len(pkeys) > 0 { streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithDeduplicate()) } + if schemaHeader != "" { + schema := types.Schema{} + err = json.Unmarshal([]byte(schemaHeader), &schema) + if err != nil { + rError = r.ResponseError(c, http.StatusBadRequest, "schema unmarshal error", false, err, true) + return + } + if !schema.IsEmpty() { + streamOptions = append(streamOptions, bulker.WithSchema(schema)) + } + } //streamOptions = append(streamOptions, sql.WithoutOmitNils()) destination.InitBulkerInstance() bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...) diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index 08ce29b..56f1592 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -17,14 +17,15 @@ import ( const unmappedDataColumn = "_unmapped_data" type AbstractSQLStream struct { - id string - sqlAdapter SQLAdapter - mode bulker.BulkMode - options bulker.StreamOptions - tableName string - merge bool - mergeWindow int - omitNils bool + id string + sqlAdapter SQLAdapter + mode bulker.BulkMode + options bulker.StreamOptions + tableName string + merge bool + mergeWindow int + omitNils bool + schemaFromOptions *Table state bulker.State inited bool @@ -56,6 +57,11 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu ps.timestampColumn = bulker.TimestampOption.Get(&ps.options) ps.omitNils = OmitNilsOption.Get(&ps.options) + schema := bulker.SchemaOption.Get(&ps.options) + if !schema.IsEmpty() { + ps.schemaFromOptions = ps.sqlAdapter.TableHelper().MapSchema(ps.sqlAdapter, schema) + } + //TODO: max column? ps.state = bulker.State{Status: bulker.Active} ps.customTypes = customFields diff --git a/bulkerlib/implementations/sql/autocommit_stream.go b/bulkerlib/implementations/sql/autocommit_stream.go index d373a1b..adeddaa 100644 --- a/bulkerlib/implementations/sql/autocommit_stream.go +++ b/bulkerlib/implementations/sql/autocommit_stream.go @@ -31,6 +31,9 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s return } table, processedObject, err := ps.preprocess(object) + if ps.schemaFromOptions != nil { + ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object) + } if err != nil { return } diff --git a/bulkerlib/implementations/sql/bulker_test.go b/bulkerlib/implementations/sql/bulker_test.go index 129883e..9f945b1 100644 --- a/bulkerlib/implementations/sql/bulker_test.go +++ b/bulkerlib/implementations/sql/bulker_test.go @@ -384,6 +384,33 @@ func TestBasics(t *testing.T) { configIds: allBulkerConfigs, streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp")}, }, + { + name: "schema_option", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream, bulker.ReplaceTable, bulker.ReplacePartition}, + expectPartitionId: true, + dataFile: "test_data/schema_option.ndjson", + expectedTable: ExpectedTable{ + Columns: justColumns("_timestamp", "column1", "column2", "column3", "id", "name"), + }, + expectedRowsCount: 2, + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": nil, "column1": nil, "column2": nil, "column3": nil}, + {"_timestamp": constantTime, "id": 2, "name": nil, "column1": nil, "column2": nil, "column3": nil}, + }, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{ + Name: "schema_option", + Fields: []types2.SchemaField{ + {Name: "_timestamp", Type: types2.TIMESTAMP}, + {Name: "id", Type: types2.INT64}, + {Name: "name", Type: types2.STRING}, + {Name: "column1", Type: types2.STRING}, + {Name: "column2", Type: types2.STRING}, + {Name: "column3", Type: types2.STRING}, + }, + })}, + }, } for _, tt := range tests { tt := tt diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index 4e1ade0..5b1e309 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -35,6 +35,9 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { dstTable := tableForObject ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) + if ps.schemaFromOptions != nil { + ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) + } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{ Name: tmpTableName, diff --git a/bulkerlib/implementations/sql/replacetable_stream.go b/bulkerlib/implementations/sql/replacetable_stream.go index 22b8c2f..9dba6d8 100644 --- a/bulkerlib/implementations/sql/replacetable_stream.go +++ b/bulkerlib/implementations/sql/replacetable_stream.go @@ -26,7 +26,7 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti return nil, err } ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { - return &Table{ + tmpTable := &Table{ Name: fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405")), PrimaryKeyName: tableForObject.PrimaryKeyName, //PrimaryKeyName: fmt.Sprintf("%s_%s", tableForObject.PrimaryKeyName, time.Now().Format("060102_150405")), @@ -34,6 +34,10 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti Columns: tableForObject.Columns, TimestampColumn: tableForObject.TimestampColumn, } + if ps.schemaFromOptions != nil { + ps.adjustTableColumnTypes(tmpTable, nil, ps.schemaFromOptions, object) + } + return tmpTable } return &ps, nil } diff --git a/bulkerlib/implementations/sql/table_helper.go b/bulkerlib/implementations/sql/table_helper.go index b9f57d2..643f6ad 100644 --- a/bulkerlib/implementations/sql/table_helper.go +++ b/bulkerlib/implementations/sql/table_helper.go @@ -117,6 +117,26 @@ func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesH } } +// MapSchema maps types.Schema into types.Table (structure with SQL types) +func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema) *Table { + table := &Table{ + Name: sqlAdapter.TableName(schema.Name), + Columns: Columns{}, + } + + for _, field := range schema.Fields { + colName := th.ColumnName(field.Name) + //map Jitsu type -> SQL type + sqlType, ok := sqlAdapter.GetSQLType(field.Type) + if ok { + table.Columns[colName] = types2.SQLColumn{DataType: field.Type, Type: sqlType, New: true} + } else { + logging.SystemErrorf("Unknown column type %s mapping for %s", field.Type, sqlAdapter.Type()) + } + } + return table +} + // EnsureTableWithCaching calls ensureTable with cacheTable = true // it is used in stream destinations (because we don't have time to select table schema, but there is retry on error) func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error) { diff --git a/bulkerlib/implementations/sql/test_data/schema_option.ndjson b/bulkerlib/implementations/sql/test_data/schema_option.ndjson new file mode 100644 index 0000000..8c6c055 --- /dev/null +++ b/bulkerlib/implementations/sql/test_data/schema_option.ndjson @@ -0,0 +1,2 @@ +{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 1} +{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 2} diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index ede9a65..7b7dd63 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -27,6 +27,9 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { dstTable := tableForObject ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) + if ps.schemaFromOptions != nil { + ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) + } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{ Name: tmpTableName, diff --git a/bulkerlib/options.go b/bulkerlib/options.go index 91337bc..04c74e7 100644 --- a/bulkerlib/options.go +++ b/bulkerlib/options.go @@ -1,7 +1,9 @@ package bulkerlib import ( + "encoding/json" "fmt" + "github.com/jitsucom/bulker/bulkerlib/types" "github.com/jitsucom/bulker/jitsubase/utils" ) @@ -88,6 +90,25 @@ var ( Key: "timestampColumn", ParseFunc: utils.ParseString, } + + SchemaOption = ImplementationOption[types.Schema]{ + Key: "schema", + ParseFunc: func(serialized any) (types.Schema, error) { + switch v := serialized.(type) { + case types.Schema: + return v, nil + case string: + schema := types.Schema{} + err := json.Unmarshal([]byte(v), &schema) + if err != nil { + return types.Schema{}, fmt.Errorf("failed to parse schema: %v", err) + } + return schema, nil + default: + return types.Schema{}, fmt.Errorf("invalid value type of schema option: %T", v) + } + }, + } ) func init() { @@ -100,6 +121,7 @@ func init() { RegisterOption(&DeduplicateOption) RegisterOption(&PartitionIdOption) RegisterOption(&TimestampOption) + RegisterOption(&SchemaOption) dummyParse := func(_ any) (any, error) { return nil, nil } for _, ignoredOption := range ignoredOptions { @@ -211,3 +233,7 @@ func WithPartition(partitionId string) StreamOption { func WithTimestamp(timestampField string) StreamOption { return WithOption(&TimestampOption, timestampField) } + +func WithSchema(schema types.Schema) StreamOption { + return WithOption(&SchemaOption, schema) +} diff --git a/bulkerlib/types/schema.go b/bulkerlib/types/schema.go new file mode 100644 index 0000000..9c90a55 --- /dev/null +++ b/bulkerlib/types/schema.go @@ -0,0 +1,15 @@ +package types + +type Schema struct { + Name string `json:"name"` + Fields []SchemaField `json:"fields"` +} + +type SchemaField struct { + Name string `json:"name"` + Type DataType `json:"type"` +} + +func (s Schema) IsEmpty() bool { + return len(s.Fields) == 0 +} diff --git a/jitsubase/utils/bool.go b/jitsubase/utils/bool.go index fe9547c..ce47f6a 100644 --- a/jitsubase/utils/bool.go +++ b/jitsubase/utils/bool.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "strconv" + "strings" ) // ParseBool parses value of string, int or bool into a bool. @@ -19,6 +20,19 @@ func ParseBool(value any) (bool, error) { } } +func IsTruish(value any) bool { + switch v := value.(type) { + case string: + return strings.ToLower(v) == "true" || v == "1" + case int: + return v != 0 + case bool: + return v + default: + return false + } +} + func BoolPointer(b bool) *bool { return &b } diff --git a/sync-sidecar/main.go b/sync-sidecar/main.go index f791e77..3598b9f 100644 --- a/sync-sidecar/main.go +++ b/sync-sidecar/main.go @@ -1,16 +1,15 @@ package main import ( - "bytes" "encoding/json" "errors" "fmt" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" + "github.com/jitsucom/bulker/bulkerlib/types" "github.com/jitsucom/bulker/sync-sidecar/db" "io" "net/http" - "net/url" "os" "strings" "time" @@ -145,25 +144,19 @@ func (s *AbstractSideCar) sendLog(logger, level string, message string) error { return db.InsertTaskLog(s.dbpool, uuid.New().String(), level, logger, message, s.syncId, s.taskId, time.Now()) } -func (s *AbstractSideCar) bulkerEvent(connection, tableName string, payload any) error { - v, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("error marshalling event payload %v for %s: %v", payload, tableName, err) - } - body := bytes.NewReader(v) - _, err = s.bulkerRequest(fmt.Sprintf("%s/post/%s?tableName=%s", s.bulkerURL, connection, url.QueryEscape(tableName)), body) - if err != nil { - return fmt.Errorf("error sending event to %s: %v", tableName, err) - } - return nil -} - -func (s *AbstractSideCar) bulkerRequest(url string, payload io.Reader) ([]byte, error) { +func (s *AbstractSideCar) bulkerRequest(url string, payload io.Reader, schema types.Schema) ([]byte, error) { req, err := http.NewRequest("POST", url, payload) if err != nil { return nil, fmt.Errorf("error creating POST %s request: %v", url, err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.bulkerAuthToken)) + if !schema.IsEmpty() { + schemaBytes, err := json.Marshal(schema) + if err != nil { + return nil, fmt.Errorf("error marshalling schema: %v", err) + } + req.Header.Set("X-Jitsu-Schema", string(schemaBytes)) + } res, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("POST %s error: %v", url, err) diff --git a/sync-sidecar/read.go b/sync-sidecar/read.go index a7f0d8b..a2ca260 100644 --- a/sync-sidecar/read.go +++ b/sync-sidecar/read.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/jitsucom/bulker/jitsubase/pg" + "github.com/jitsucom/bulker/jitsubase/utils" "github.com/jitsucom/bulker/sync-sidecar/db" "io" "net/url" @@ -41,7 +42,7 @@ func (s *StreamStat) Merge(chunk *StreamStat) { type ActiveStream struct { name string mode string - // write stream of io.Pipe. Bulker reads from this read stream of this pipe + // write stream of io.Pipe. Bulker reads from the read stream of this pipe writer *io.PipeWriter // wait group to wait for stream to finish HTTP request to bulker fully completed after closing writer waitGroup sync.WaitGroup @@ -312,14 +313,14 @@ func (s *ReadSideCar) openStream(streamName string, previousStats *StreamStat) * // switch from replace_table to batch mode, to continue adding data to the table mode = "batch" } - bulkerConnFunc := func(streamReader *io.PipeReader) { - s.log("Creating bulker stream: %s mode: %s primary keys: %s", streamName, mode, str.GetPrimaryKeys()) - bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(s.tableNamePrefix+streamName), mode, s.taskId) + tableName := utils.NvlString(str.TableName, s.tableNamePrefix+streamName) + s.log("Creating bulker stream: %s table: %s mode: %s primary keys: %s", streamName, tableName, mode, str.GetPrimaryKeys()) + bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(tableName), mode, s.taskId) for _, v := range str.GetPrimaryKeys() { bulkerUrl += fmt.Sprintf("&pk=%s", url.QueryEscape(v)) } - _, err := s.bulkerRequest(bulkerUrl, streamReader) + _, err := s.bulkerRequest(bulkerUrl, streamReader, str.ToSchema()) if err != nil { s.err("error sending bulk: %v", err) return diff --git a/sync-sidecar/types.go b/sync-sidecar/types.go index fa331ee..ac58574 100644 --- a/sync-sidecar/types.go +++ b/sync-sidecar/types.go @@ -1,5 +1,11 @@ package main +import ( + "fmt" + "github.com/jitsucom/bulker/bulkerlib/types" + "github.com/jitsucom/bulker/jitsubase/utils" +) + const ( LogType = "LOG" ConnectionStatusType = "CONNECTION_STATUS" @@ -81,9 +87,79 @@ type Catalog struct { } type StreamMeta struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - PrimaryKeys [][]string `json:"source_defined_primary_key"` + Name string `json:"name"` + Namespace string `json:"namespace"` + TableName string `json:"table_name,omitempty"` + JSONSchema StreamJsonSchema `json:"json_schema"` + PrimaryKeys [][]string `json:"source_defined_primary_key"` +} + +type StreamJsonSchema struct { + Properties map[string]StreamSchemaProperty `json:"properties"` +} + +func (s *StreamMeta) ToSchema() types.Schema { + fields := make([]types.SchemaField, 0, len(s.JSONSchema.Properties)) + for name, prop := range s.JSONSchema.Properties { + fields = append(fields, types.SchemaField{ + Name: name, + Type: prop.ToDataType(), + }) + } + return types.Schema{ + Name: s.Name, + Fields: fields, + } +} + +type StreamSchemaProperty struct { + Type any `json:"type"` + Format string `json:"format"` + AirbyteType string `json:"airbyte_type"` + OneOf []any `json:"oneOf"` +} + +func (ssp *StreamSchemaProperty) ToDataType() types.DataType { + if len(ssp.OneOf) > 0 { + return types.STRING + } + var tp string + switch v := ssp.Type.(type) { + case string: + tp = v + case []string: + a := utils.ArrayExcluding(v, "null") + if len(a) > 0 { + tp = a[0] + } + case []any: + a := utils.ArrayExcluding(v, "null") + if len(a) > 0 { + tp = fmt.Sprint(a[0]) + } + } + switch tp { + case "string": + if ssp.Format == "date-time" { + return types.TIMESTAMP + } + return types.STRING + case "boolean": + return types.BOOL + case "integer": + return types.INT64 + case "number": + if ssp.AirbyteType == "integer" { + return types.INT64 + } + return types.FLOAT64 + case "array": + return types.JSON + case "object": + return types.JSON + default: + return types.STRING + } } func (s *StreamMeta) GetPrimaryKeys() []string {