Skip to content

Commit

Permalink
ingest: move 'destination not found' error to debug level
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 3, 2024
1 parent 8963609 commit e9480ae
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 51 deletions.
40 changes: 20 additions & 20 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,21 @@ func (r *Router) EventsHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true)
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true, true)
return
}
mode = utils.DefaultString(modeOverride, string(destination.Mode()))
if mode != string(bulker.Batch) && mode != string(bulker.Stream) {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid bulker mode", false, fmt.Errorf("invalid bulker mode: %s", mode), true)
rError = r.ResponseError(c, http.StatusBadRequest, "invalid bulker mode", false, fmt.Errorf("invalid bulker mode: %s", mode), true, true)
return
}
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true)
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true, true)
return
}
topicId, err := destination.TopicId(tableName, mode, r.config.KafkaTopicPrefix)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, true)
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, true, true)
return
}
err = r.topicManager.EnsureDestinationTopic(destination, topicId)
Expand All @@ -135,20 +135,20 @@ func (r *Router) EventsHandler(c *gin.Context) {
if ok && kafkaErr.Code() == kafka.ErrTopicAlreadyExists {
r.Warnf("Topic %s already exists", topicId)
} else {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), true)
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), true, true)
return
}
}

body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true, true)
return
}
bytesRead = len(body)
err = r.producer.ProduceAsync(topicId, uuid.New(), body, map[string]string{MetricsMetaHeader: metricsMeta}, kafka.PartitionAny)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, true)
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, true, true)
return
}
c.JSON(http.StatusOK, gin.H{"message": "ok"})
Expand Down Expand Up @@ -183,12 +183,12 @@ func (r *Router) BulkHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true)
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true, true)
return
}
mode = string(destination.Mode())
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true)
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true, true)
return
}
var streamOptions []bulker.StreamOption
Expand All @@ -199,7 +199,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
schema := types.Schema{}
err = jsoniter.Unmarshal([]byte(schemaHeader), &schema)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "schema unmarshal error", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "schema unmarshal error", false, err, true, true)
return
}
if !schema.IsEmpty() {
Expand All @@ -211,7 +211,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, true)
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, true, true)
return
}
scanner := bufio.NewScanner(c.Request.Body)
Expand All @@ -221,32 +221,32 @@ func (r *Router) BulkHandler(c *gin.Context) {
eventBytes := scanner.Bytes()
if len(eventBytes) >= 5 && string(eventBytes[:5]) == "ABORT" {
state = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), true)
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), true, true)
return
}
bytesRead += len(eventBytes)
var obj types.Object
if err = jsonorder.Unmarshal(eventBytes, &obj); err != nil {
state = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, true, true)
return
}
if _, processedObjectSample, err = bulkerStream.Consume(c, obj); err != nil {
state = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, true, true)
return
}
consumed++
}
if err = scanner.Err(); err != nil {
state = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, true, true)
return
}
if consumed > 0 {
state, err = bulkerStream.Complete(c)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, true, true)
return
}
r.Infof("Bulk stream for %s mode: %s Completed. Processed: %d in %dms.", jobId, mode, state.SuccessfulRows, time.Since(start).Milliseconds())
Expand Down Expand Up @@ -299,7 +299,7 @@ func (r *Router) FailedHandler(c *gin.Context) {
err = consumer.Assign([]kafka.TopicPartition{{Topic: &topicId, Partition: 0, Offset: kafka.OffsetBeginning}})
}
if err != nil {
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, true)
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, true, true)
return
}
start := time.Now()
Expand Down Expand Up @@ -336,14 +336,14 @@ func (r *Router) FailedHandler(c *gin.Context) {
func (r *Router) TestConnectionHandler(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true, true)
return
}
bulkerCfg := bulker.Config{}
destinationConfig := map[string]any{}
err = utils.ParseObject(body, &destinationConfig)
if err != nil {
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, true)
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, true, true)
return
} else {
r.Debugf("[test] parsed config for destination %s: %+v", utils.MapNVL(destinationConfig, "id", ""), destinationConfig)
Expand All @@ -357,7 +357,7 @@ func (r *Router) TestConnectionHandler(c *gin.Context) {
if b != nil {
_ = b.Close()
}
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, true)
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, true, true)
return
}
_ = b.Close()
Expand Down
4 changes: 2 additions & 2 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (r *Router) sendToRotor(c *gin.Context, ingestMessageBytes []byte, stream *

if stream.Throttle > 0 {
if stream.Throttle >= 100 || rand.Int31n(100) < int32(stream.Throttle) {
rError = r.ResponseError(c, http.StatusPaymentRequired, ErrThrottledType, false, fmt.Errorf(ErrThrottledDescription), sendResponse)
rError = r.ResponseError(c, http.StatusPaymentRequired, ErrThrottledType, false, fmt.Errorf(ErrThrottledDescription), sendResponse, true)
return
}
}
Expand All @@ -234,7 +234,7 @@ func (r *Router) sendToRotor(c *gin.Context, ingestMessageBytes []byte, stream *
for _, id := range asyncDestinations {
IngestedMessages(id, "error", "producer error").Inc()
}
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, sendResponse)
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, sendResponse, true)
}
for _, id := range asyncDestinations {
IngestedMessages(id, "success", "").Inc()
Expand Down
16 changes: 8 additions & 8 deletions ingest/router_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func (r *Router) BatchHandler(c *gin.Context) {
}()
defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), true)
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), true, true)
}
}()
c.Set(appbase.ContextLoggerName, "batch")
if !strings.HasSuffix(c.ContentType(), "application/json") && !strings.HasSuffix(c.ContentType(), "text/plain") {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), true)
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), true, true)
return
}
bodyReader := c.Request.Body
Expand All @@ -45,7 +45,7 @@ func (r *Router) BatchHandler(c *gin.Context) {
}
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing message", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing message", false, err, true, true)
return
}
if c.FullPath() == "/api/s/s2s/batch" {
Expand All @@ -54,15 +54,15 @@ func (r *Router) BatchHandler(c *gin.Context) {
}
loc, err := r.getDataLocator(c, IngestTypeWriteKeyDefined, func() string { return payload.WriteKey })
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error processing message", false, err, true)
rError = r.ResponseError(c, http.StatusBadRequest, "error processing message", false, err, true, true)
return
}
domain = utils.DefaultString(loc.Slug, loc.Domain)
c.Set(appbase.ContextDomain, domain)

stream := r.getStream(&loc, true, s2sEndpoint)
if stream == nil {
rError = r.ResponseError(c, http.StatusUnauthorized, "stream not found", false, fmt.Errorf("for: %+v", loc), true)
rError = r.ResponseError(c, http.StatusUnauthorized, "stream not found", false, fmt.Errorf("for: %+v", loc), true, true)
return
}
s2sEndpoint = s2sEndpoint || loc.IngestType == IngestTypeS2S
Expand All @@ -85,12 +85,12 @@ func (r *Router) BatchHandler(c *gin.Context) {
var asyncDestinations, tagsDestinations []string
if err1 == nil {
if len(stream.AsynchronousDestinations) == 0 {
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), false)
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), false, false)
} else {
asyncDestinations, tagsDestinations, rError = r.sendToRotor(c, ingestMessageBytes, stream, false)
}
} else {
rError = r.ResponseError(c, http.StatusOK, "event error", false, err1, false)
rError = r.ResponseError(c, http.StatusOK, "event error", false, err1, false, true)
}
if len(ingestMessageBytes) >= 0 {
_ = r.backupsLogger.Log(utils.DefaultString(eventsLogId, "UNKNOWN"), ingestMessageBytes)
Expand All @@ -108,7 +108,7 @@ func (r *Router) BatchHandler(c *gin.Context) {
obj["status"] = "SUCCESS"
} else {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
obj["error"] = ErrNoDst
errors = append(errors, fmt.Sprintf("Message ID: %s: %v", messageId, rError.PublicError))
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelInfo, ActorId: eventsLogId, Event: obj})
Expand Down
18 changes: 9 additions & 9 deletions ingest/router_ingest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ func (r *Router) IngestHandler(c *gin.Context) {
obj["status"] = "SUCCESS"
} else {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
obj["error"] = ErrNoDst
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelInfo, ActorId: eventsLogId, Event: obj})
IngestHandlerRequests(domain, "success", "").Inc()
}
}()
defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), true)
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), true, true)
}
}()
c.Set(appbase.ContextLoggerName, "ingest")
if !strings.HasSuffix(c.ContentType(), "application/json") && !strings.HasSuffix(c.ContentType(), "text/plain") {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), true)
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), true, true)
return
}
if c.FullPath() == "/api/s/s2s/:tp" {
Expand All @@ -87,13 +87,13 @@ func (r *Router) IngestHandler(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error reading HTTP body", false, err, true)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error reading HTTP body", false, err, true, true)
return
}
var message types.Json
err = jsonorder.Unmarshal(body, &message)
if err != nil {
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error parsing message", false, fmt.Errorf("%v: %s", err, string(body)), true)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error parsing message", false, fmt.Errorf("%v: %s", err, string(body)), true, true)
return
}
messageId := message.GetS("messageId")
Expand All @@ -106,7 +106,7 @@ func (r *Router) IngestHandler(c *gin.Context) {
//func() string { wk, _ := message["writeKey"].(string); return wk }
loc, err := r.getDataLocator(c, ingestType, nil)
if err != nil {
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error processing message", false, fmt.Errorf("%v: %s", err, string(body)), true)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "error processing message", false, fmt.Errorf("%v: %s", err, string(body)), true, true)
return
}

Expand All @@ -115,7 +115,7 @@ func (r *Router) IngestHandler(c *gin.Context) {

stream := r.getStream(&loc, false, s2sEndpoint)
if stream == nil {
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusUnauthorized, http.StatusOK), "stream not found", false, fmt.Errorf("for: %+v", loc), true)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusUnauthorized, http.StatusOK), "stream not found", false, fmt.Errorf("for: %+v", loc), true, true)
return
}
s2sEndpoint = s2sEndpoint || loc.IngestType == IngestTypeS2S
Expand All @@ -126,11 +126,11 @@ func (r *Router) IngestHandler(c *gin.Context) {
//}
ingestMessage, ingestMessageBytes, err := r.buildIngestMessage(c, messageId, message, nil, tp, loc, stream)
if err != nil {
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "event error", false, err, true)
rError = r.ResponseError(c, utils.Ternary(s2sEndpoint, http.StatusBadRequest, http.StatusOK), "event error", false, err, true, true)
return
}
if len(stream.AsynchronousDestinations) == 0 && (len(stream.SynchronousDestinations) == 0 || s2sEndpoint) {
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), true)
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), true, false)
return
}
asyncDestinations, tagsDestinations, rError = r.sendToRotor(c, ingestMessageBytes, stream, true)
Expand Down
14 changes: 7 additions & 7 deletions ingest/router_pixel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (r *Router) PixelHandler(c *gin.Context) {
obj["status"] = "SUCCESS"
} else {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
obj["error"] = ErrNoDst
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelInfo, ActorId: eventsLogId, Event: obj})
IngestHandlerRequests(domain, "success", "").Inc()
}
}()
defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusOK, "panic", true, fmt.Errorf("%v", rerr), true)
rError = r.ResponseError(c, http.StatusOK, "panic", true, fmt.Errorf("%v", rerr), true, true)
}
}()
// disable cache
Expand All @@ -73,7 +73,7 @@ func (r *Router) PixelHandler(c *gin.Context) {
tp := c.Param("tp")
message, err := r.parsePixelEvent(c, tp)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, true)
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, true, true)
return
}
messageId := message.GetS("messageId")
Expand All @@ -86,7 +86,7 @@ func (r *Router) PixelHandler(c *gin.Context) {
//func() string { wk, _ := message["writeKey"].(string); return wk }
loc, err := r.getDataLocator(c, ingestType, nil)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, err, true)
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, err, true, true)
return
}

Expand All @@ -95,19 +95,19 @@ func (r *Router) PixelHandler(c *gin.Context) {

stream := r.getStream(&loc, false, false)
if stream == nil {
rError = r.ResponseError(c, http.StatusOK, "stream not found", false, fmt.Errorf("for: %+v", loc), true)
rError = r.ResponseError(c, http.StatusOK, "stream not found", false, fmt.Errorf("for: %+v", loc), true, true)
return
}

eventsLogId = stream.Stream.Id
//}
_, ingestMessageBytes, err = r.buildIngestMessage(c, messageId, message, nil, tp, loc, stream)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "event error", false, err, true)
rError = r.ResponseError(c, http.StatusOK, "event error", false, err, true, true)
return
}
if len(stream.AsynchronousDestinations) == 0 {
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), true)
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), true, false)
return
}
asyncDestinations, _, rError = r.sendToRotor(c, ingestMessageBytes, stream, true)
Expand Down
Loading

0 comments on commit e9480ae

Please sign in to comment.