Skip to content

Commit

Permalink
bulker: bigquery switch to compressed json
Browse files Browse the repository at this point in the history
eventslog: added clickhouse events log
ingress-manager: added CleanupCerts method
  • Loading branch information
absorbb committed Feb 28, 2024
1 parent 0def546 commit 6d9dcf9
Show file tree
Hide file tree
Showing 30 changed files with 1,554 additions and 396 deletions.
14 changes: 13 additions & 1 deletion bulkerapp/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,24 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
a.cron = NewCron(a.config)

a.eventsLogService = &eventslog.DummyEventsLogService{}
elServices := []eventslog.EventsLogService{}
if a.config.ClickhouseURL != "" {
chEventsLogService, err := eventslog.NewClickhouseEventsLog(a.config.EventsLogConfig)
if err != nil {
return err
}
elServices = append(elServices, chEventsLogService)
}
eventsLogRedisUrl := utils.NvlString(a.config.EventsLogRedisURL, a.config.RedisURL)
if eventsLogRedisUrl != "" {
a.eventsLogService, err = eventslog.NewRedisEventsLog(eventsLogRedisUrl, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
redisEventsLogService, err := eventslog.NewRedisEventsLog(eventsLogRedisUrl, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
if err != nil {
return err
}
elServices = append(elServices, redisEventsLogService)
}
if len(elServices) > 0 {
a.eventsLogService = &eventslog.MultiEventsLogService{Services: elServices}
}

a.fastStore, err = NewFastStore(a.config)
Expand Down
4 changes: 3 additions & 1 deletion bulkerapp/app/app_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/kafkabase"
Expand All @@ -16,7 +17,8 @@ import (
type Config struct {
appbase.Config `mapstructure:",squash"`
kafkabase.KafkaConfig `mapstructure:",squash"`

// # EVENTS LOG CONFIG - settings for events log
eventslog.EventsLogConfig `mapstructure:",squash"`
// For ingest endpoint only
GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"`
// For ingest endpoint only
Expand Down
5 changes: 3 additions & 2 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ func (bc *BatchConsumerImpl) postEventsLog(state bulker.State, processedObjectSa
state.SetError(batchErr)
}
batchState := BatchState{State: state, LastMappedRow: processedObjectSample}
bc.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeBatchAll, bc.destinationId, batchState})
level := eventslog.LevelInfo
if batchErr != nil {
bc.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeBatchError, bc.destinationId, batchState})
level = eventslog.LevelError
}
bc.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeBatch, Level: level, ActorId: bc.destinationId, Event: batchState})
}

type BatchState struct {
Expand Down
108 changes: 6 additions & 102 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"crypto/sha512"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -62,7 +61,6 @@ func NewRouter(appContext *Context) *Router {
fast := engine.Group("")
fast.Use(timeout.Timeout(timeout.WithTimeout(10 * time.Second)))
fast.POST("/post/:destinationId", router.EventsHandler)
fast.POST("/ingest", router.IngestHandler)
fast.POST("/test", router.TestConnectionHandler)
fast.GET("/log/:eventType/:actorId", router.EventsLogHandler)
fast.GET("/ready", router.Health)
Expand Down Expand Up @@ -250,103 +248,6 @@ func maskWriteKey(wk string) string {
}
}

func (r *Router) IngestHandler(c *gin.Context) {
domain := ""
// TODO: use workspaceId as default for all stream identification errors
var eventsLogId string
var rError *appbase.RouterError
var body []byte
var asyncDestinations []string
var tagsDestinations []string

defer func() {
if len(body) > 0 {
_ = r.backupsLogger.Log(utils.DefaultString(eventsLogId, "UNKNOWN"), body)
}
if rError != nil {
obj := map[string]any{"body": string(body), "error": rError.PublicError.Error(), "status": "FAILED"}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingError, ActorId: eventsLogId, Event: obj})
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingAll, ActorId: eventsLogId, Event: obj})
metrics.IngestHandlerRequests(domain, "error", rError.ErrorType).Inc()
_ = r.producer.ProduceAsync(r.config.KafkaDestinationsDeadLetterTopicName, uuid.New(), body, map[string]string{"error": rError.Error.Error()})
} else {
obj := map[string]any{"body": string(body), "asyncDestinations": asyncDestinations, "tags": tagsDestinations}
if len(asyncDestinations) > 0 || len(tagsDestinations) > 0 {
obj["status"] = "SUCCESS"
} else {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingAll, ActorId: eventsLogId, Event: obj})
metrics.IngestHandlerRequests(domain, "success", "").Inc()
}
}()
c.Set(appbase.ContextLoggerName, "ingest")
body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
return
}
ingestMessage := IngestMessage{}
err = jsoniter.Unmarshal(body, &ingestMessage)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing IngestMessage", false, fmt.Errorf("%v: %s", err, string(body)), true)
return
}
messageId := ingestMessage.MessageId
c.Set(appbase.ContextMessageId, messageId)
domain = utils.DefaultString(ingestMessage.Origin.Slug, ingestMessage.Origin.Domain)
c.Set(appbase.ContextDomain, domain)

stream := r.getStream(ingestMessage)
if stream == nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream not found", false, nil, true)
return
}
eventsLogId = stream.Stream.Id
if len(stream.AsynchronousDestinations) == 0 && len(stream.SynchronousDestinations) == 0 {
c.JSON(http.StatusOK, gin.H{"message": "no destinations found for stream"})
return
}
asyncDestinations = utils.ArrayMap(stream.AsynchronousDestinations, func(d ShortDestinationConfig) string { return d.ConnectionId })
tagsDestinations = utils.ArrayMap(stream.SynchronousDestinations, func(d ShortDestinationConfig) string { return d.ConnectionId })

r.Debugf("[ingest] Message ID: %s Domain: %s to Connections: [%s] Tags: [%s]", messageId, domain,
strings.Join(asyncDestinations, ", "), strings.Join(tagsDestinations, ", "))
for _, destination := range stream.AsynchronousDestinations {
messageCopy := ingestMessage
messageCopy.ConnectionId = destination.ConnectionId
//multithreading, ok := destination.Options["multithreading"].(bool)
topic := r.config.KafkaDestinationsTopicName
messageKey := uuid.New()
payload, err := json.Marshal(messageCopy)
r.Debugf("[ingest] Message ID: %s Producing for: %s topic: %s key: %s", messageId, destination.ConnectionId, topic, messageKey)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "message marshal error").Inc()
rError = r.ResponseError(c, http.StatusBadRequest, "message marshal error", false, err, true)
continue
}
err = r.producer.ProduceAsync(topic, messageKey, payload, nil)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "producer error").Inc()
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, true)
continue
}
metrics.IngestedMessages(destination.ConnectionId, "success", "").Inc()
}
if len(stream.SynchronousDestinations) == 0 {
c.JSON(http.StatusOK, gin.H{"ok": true})
return
}

tags := make(map[string]TagDestinationConfig, len(stream.SynchronousDestinations))
for _, destination := range stream.SynchronousDestinations {
tags[destination.Id] = destination.TagDestinationConfig
metrics.IngestedMessages(destination.ConnectionId, "success", "").Inc()
}
c.JSON(http.StatusOK, gin.H{"ok": true, "tags": tags})
}

func (r *Router) FailedHandler(c *gin.Context) {
destinationId := c.Param("destinationId")
status := utils.DefaultString(c.Query("status"), "dead")
Expand Down Expand Up @@ -454,7 +355,7 @@ func (r *Router) TestConnectionHandler(c *gin.Context) {

// EventsLogHandler - gets events log by EventType, actor id. Filtered by date range and cursorId
func (r *Router) EventsLogHandler(c *gin.Context) {
eventType := c.Param("eventType")
eventKey := c.Param("eventType")
actorId := c.Param("actorId")
beforeId := c.Query("beforeId")
start := c.Query("start")
Expand Down Expand Up @@ -494,8 +395,11 @@ func (r *Router) EventsLogHandler(c *gin.Context) {
iLimit = iLimit2
}
}
parts := strings.Split(eventKey, ".")
eventType := parts[0]
level := parts[1]
eventsLogFilter.BeforeId = eventslog.EventsLogRecordId(beforeId)
records, err := r.eventsLogService.GetEvents(eventslog.EventType(eventType), actorId, eventsLogFilter, iLimit)
records, err := r.eventsLogService.GetEvents(eventslog.EventType(eventType), actorId, level, eventsLogFilter, iLimit)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get events log: " + err.Error()})
return
Expand Down Expand Up @@ -539,7 +443,7 @@ func (r *Router) EventsLogHandler(c *gin.Context) {
}

func maskWriteKeyInObj(eventType string, record eventslog.EventsLogRecord) {
if strings.HasPrefix(eventType, "incoming.") {
if eventType == "incoming" {
o, ok := record.Content.(map[string]any)
if ok {
b, ok := o["body"].(string)
Expand Down
5 changes: 3 additions & 2 deletions bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,11 @@ func (sc *StreamConsumerImpl) postEventsLog(message []byte, representation any,
object["mappedData"] = processedObject
}

level := eventslog.LevelInfo
if processedErr != nil {
object["error"] = processedErr.Error()
object["status"] = "FAILED"
sc.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeProcessedError, sc.destination.Id(), object})
level = eventslog.LevelError
}
sc.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeProcessedAll, sc.destination.Id(), object})
sc.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeProcessed, level, sc.destination.Id(), object})
}
2 changes: 1 addition & 1 deletion bulkerapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron/v2 v2.1.0
github.com/go-co-op/gocron/v2 v2.2.4
github.com/gomodule/redigo v1.8.9
github.com/hashicorp/go-multierror v1.1.1
github.com/hjson/hjson-go/v4 v4.3.1
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (ps *AbstractFileStorageStream) init(ctx context.Context) error {

func (ps *AbstractFileStorageStream) preprocess(object types2.Object) (types2.Object, error) {
if ps.flatten {
flatObject, err := implementations2.NewFlattener(false).FlattenObject(object, nil)
flatObject, err := implementations2.NewFlattener(false, false).FlattenObject(object, nil)
if err != nil {
return nil, err
} else {
Expand Down
23 changes: 15 additions & 8 deletions bulkerlib/implementations/flattener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ type Flattener interface {

type FlattenerImpl struct {
omitNilValues bool
// stringifyObjects objects types like JSON, array will be stringified before sent to warehouse (warehouse will parse them back)
stringifyObjects bool
}

func NewFlattener(omitNilValues bool) Flattener {
func NewFlattener(omitNilValues, stringifyObjects bool) Flattener {
return &FlattenerImpl{
omitNilValues: omitNilValues,
omitNilValues: omitNilValues,
stringifyObjects: stringifyObjects,
}
}

Expand Down Expand Up @@ -53,13 +56,17 @@ func (f *FlattenerImpl) flatten(key string, value any, destination map[string]an
case reflect.Map:
unboxed := value.(map[string]any)
if _, ok := sqlTypeHints[key]; ok {
// if there is sql type hint for nested object - we don't flatten it.
// Instead, we marshal it to json string hoping that database cast function will do the job
b, err := jsoniter.Marshal(value)
if err != nil {
return fmt.Errorf("error marshaling json object with key %s: %v", key, err)
if f.stringifyObjects {
// if there is sql type hint for nested object - we don't flatten it.
// Instead, we marshal it to json string hoping that database cast function will do the job
b, err := jsoniter.Marshal(value)
if err != nil {
return fmt.Errorf("error marshaling json object with key %s: %v", key, err)
}
destination[key] = string(b)
} else {
destination[key] = unboxed
}
destination[key] = string(b)
return nil
}
for k, v := range unboxed {
Expand Down
10 changes: 7 additions & 3 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
if ps.state.Status != bulker.Active {
return nil, nil, fmt.Errorf("stream is not active. Status: %s", ps.state.Status)
}
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.omitNils)
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.omitNils, ps.sqlAdapter.StringifyObjects())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -182,8 +182,12 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
jsonSQLType, _ := ps.sqlAdapter.GetSQLType(types.JSON)
added := utils.MapPutIfAbsent(current, ps.sqlAdapter.ColumnName(unmappedDataColumn), types.SQLColumn{DataType: types.JSON, Type: jsonSQLType})
columnsAdded = columnsAdded || added
b, _ := jsoniter.Marshal(unmappedObj)
values[ps.sqlAdapter.ColumnName(unmappedDataColumn)] = string(b)
if ps.sqlAdapter.StringifyObjects() {
b, _ := jsoniter.Marshal(unmappedObj)
values[ps.sqlAdapter.ColumnName(unmappedDataColumn)] = string(b)
} else {
values[ps.sqlAdapter.ColumnName(unmappedDataColumn)] = unmappedObj
}
}
currentTable.Columns = current
return columnsAdded
Expand Down
12 changes: 7 additions & 5 deletions bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func (ps *AbstractTransactionalSQLStream) init(ctx context.Context) (err error)
}
localBatchFile := localBatchFileOption.Get(&ps.options)
if localBatchFile != "" && ps.batchFile == nil {
ps.batchFile, err = os.CreateTemp("", localBatchFile)
if err != nil {
return err
}
ps.marshaller, _ = types.NewMarshaller(types.FileFormatNDJSON, types.FileCompressionNONE)
ps.targetMarshaller, err = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
if err != nil {
Expand All @@ -78,6 +74,10 @@ func (ps *AbstractTransactionalSQLStream) init(ctx context.Context) (err error)
//without merge we can write file with compression - no need to convert
ps.marshaller, _ = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
}
ps.batchFile, err = os.CreateTemp("", localBatchFile+"_*"+ps.marshaller.FileExtension())
if err != nil {
return err
}
}
err = ps.AbstractSQLStream.init(ctx)
if err != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
needToConvert = true
}
if len(ps.batchFileSkipLines) > 0 || needToConvert {
workingFile, err = os.CreateTemp("", path.Base(ps.batchFile.Name())+"_2")
workingFile, err = os.CreateTemp("", path.Base(ps.batchFile.Name())+"_*"+ps.targetMarshaller.FileExtension())
if err != nil {
return nil, errorj.Decorate(err, "failed to create tmp file for deduplication")
}
Expand Down Expand Up @@ -195,6 +195,8 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
}
ps.targetMarshaller.Flush()
workingFile.Sync()
} else {
ps.marshaller.Flush()
}
if needToConvert {
logging.Infof("[%s] Converted batch file from %s(%s) to %s(%s) in %s", ps.id, ps.marshaller.Format(), ps.marshaller.Compression(), ps.targetMarshaller.Format(), ps.targetMarshaller.Compression(), time.Now().Sub(convertStart))
Expand Down
8 changes: 6 additions & 2 deletions bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,15 @@ func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, str
}

func (bq *BigQuery) GetBatchFileFormat() types2.FileFormat {
return types2.FileFormatCSV
return types2.FileFormatNDJSON
}

func (bq *BigQuery) GetBatchFileCompression() types2.FileCompression {
return types2.FileCompressionNONE
return types2.FileCompressionGZIP
}

func (bq *BigQuery) StringifyObjects() bool {
return false
}

func (bq *BigQuery) validateOptions(streamOptions []bulker.StreamOption) error {
Expand Down
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ const SqlTypePrefix = "__sql_type"
// ProcessEvents processes events objects without applying mapping rules
// returns table headerm array of processed objects
// or error if at least 1 was occurred
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool) (*TypesHeader, types.Object, error) {
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool, stringifyObjects bool) (*TypesHeader, types.Object, error) {
sqlTypesHints, err := extractSQLTypesHints(event)
if err != nil {
return nil, nil, err
}
for k, v := range customTypes {
sqlTypesHints[k] = v
}
flatObject, err := implementations.NewFlattener(omitNils).FlattenObject(event, sqlTypesHints)
flatObject, err := implementations.NewFlattener(omitNils, stringifyObjects).FlattenObject(event, sqlTypesHints)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 6d9dcf9

Please sign in to comment.