Skip to content

Commit

Permalink
fix: async framework destination debugger
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 5, 2025
1 parent c6e0225 commit 7d6ae36
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 3 deletions.
81 changes: 81 additions & 0 deletions mocks/services/debugger/destination/eventDeliveryStatusUploader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 56 additions & 1 deletion router/batchrouter/handle_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
mockdestinationdebugger "github.com/rudderlabs/rudder-server/mocks/services/debugger/destination"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -52,7 +53,6 @@ func (m mockAsyncDestinationManager) GetUploadStats(common.GetUploadStatsInput)
func defaultHandle(destType string) *Handle {
batchRouter := &Handle{}
batchRouter.destType = destType
batchRouter.debugger = destinationdebugger.NewNoOpService()
batchRouter.destinationsMap = make(map[string]*routerutils.DestinationWithSources)
batchRouter.uploadIntervalMap = make(map[string]time.Duration)
batchRouter.asyncDestinationStruct = make(map[string]*common.AsyncDestinationStruct)
Expand Down Expand Up @@ -110,9 +110,11 @@ func TestAsyncDestinationManager(t *testing.T) {

mockCtrl := gomock.NewController(t)
mockBatchRouterJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockDestinationDebugger := mockdestinationdebugger.NewMockDestinationDebugger(mockCtrl)

batchRouter := defaultHandle(destType)
batchRouter.jobsDB = mockBatchRouterJobsDB
batchRouter.debugger = mockDestinationDebugger

mockBatchRouterJobsDB.EXPECT().GetImporting(
gomock.Any(),
Expand Down Expand Up @@ -160,6 +162,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
mockDestinationDebugger.EXPECT().RecordEventDeliveryStatus(gomock.Any(), gomock.Any()).Times(1).Do(func(destinationID string, deliveryStatus *destinationdebugger.DeliveryStatusT) {
require.Equal(t, "3 events", deliveryStatus.EventName)
require.Empty(t, deliveryStatus.EventType)
require.JSONEq(t, `{"success":"1 events","failed":"2 events"}`, string(deliveryStatus.Payload))
require.Equal(t, 1, deliveryStatus.AttemptNum)
require.Equal(t, jobsdb.Failed.State, deliveryStatus.JobState)
require.Equal(t, "500", deliveryStatus.ErrorCode)
require.NotEmpty(t, deliveryStatus.ErrorResponse)
}).Return(true)

for _, source := range sources {
for _, destination := range source.Destinations {
Expand Down Expand Up @@ -204,9 +215,11 @@ func TestAsyncDestinationManager(t *testing.T) {

mockCtrl := gomock.NewController(t)
mockBatchRouterJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockDestinationDebugger := mockdestinationdebugger.NewMockDestinationDebugger(mockCtrl)

batchRouter := defaultHandle(destType)
batchRouter.jobsDB = mockBatchRouterJobsDB
batchRouter.debugger = mockDestinationDebugger

statsStore, err := memstats.New()
require.NoError(t, err)
Expand Down Expand Up @@ -252,6 +265,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
mockDestinationDebugger.EXPECT().RecordEventDeliveryStatus(gomock.Any(), gomock.Any()).Times(1).Do(func(destinationID string, deliveryStatus *destinationdebugger.DeliveryStatusT) {
require.Equal(t, "1 events", deliveryStatus.EventName)
require.Empty(t, deliveryStatus.EventType)
require.JSONEq(t, `{"success":"0 events","failed":"1 events"}`, string(deliveryStatus.Payload))
require.Equal(t, 1, deliveryStatus.AttemptNum)
require.Equal(t, jobsdb.Failed.State, deliveryStatus.JobState)
require.Equal(t, "500", deliveryStatus.ErrorCode)
require.NotEmpty(t, deliveryStatus.ErrorResponse)
}).Return(true)

for _, source := range sources {
for _, destination := range source.Destinations {
Expand Down Expand Up @@ -288,9 +310,11 @@ func TestAsyncDestinationManager(t *testing.T) {

mockCtrl := gomock.NewController(t)
mockBatchRouterJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockDestinationDebugger := mockdestinationdebugger.NewMockDestinationDebugger(mockCtrl)

batchRouter := defaultHandle(destType)
batchRouter.jobsDB = mockBatchRouterJobsDB
batchRouter.debugger = mockDestinationDebugger

statsStore, err := memstats.New()
require.NoError(t, err)
Expand Down Expand Up @@ -336,6 +360,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
mockDestinationDebugger.EXPECT().RecordEventDeliveryStatus(gomock.Any(), gomock.Any()).Times(1).Do(func(destinationID string, deliveryStatus *destinationdebugger.DeliveryStatusT) {
require.Equal(t, "1 events", deliveryStatus.EventName)
require.Empty(t, deliveryStatus.EventType)
require.JSONEq(t, `{"success":"1 events","failed":"0 events"}`, string(deliveryStatus.Payload))
require.Equal(t, 1, deliveryStatus.AttemptNum)
require.Equal(t, jobsdb.Succeeded.State, deliveryStatus.JobState)
require.Equal(t, "200", deliveryStatus.ErrorCode)
require.JSONEq(t, `{"success":"OK"}`, string(deliveryStatus.ErrorResponse))
}).Return(true)

for _, source := range sources {
for _, destination := range source.Destinations {
Expand Down Expand Up @@ -375,13 +408,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockBatchRouterJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockErrJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockDestinationDebugger := mockdestinationdebugger.NewMockDestinationDebugger(mockCtrl)

batchRouter := defaultHandle(destType)
batchRouter.now = func() time.Time {
return time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
}
batchRouter.jobsDB = mockBatchRouterJobsDB
batchRouter.errorDB = mockErrJobsDB
batchRouter.debugger = mockDestinationDebugger

mockErrJobsDB.EXPECT().Store(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -462,6 +497,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
mockDestinationDebugger.EXPECT().RecordEventDeliveryStatus(gomock.Any(), gomock.Any()).Times(1).Do(func(destinationID string, deliveryStatus *destinationdebugger.DeliveryStatusT) {
require.Equal(t, "4 events", deliveryStatus.EventName)
require.Empty(t, deliveryStatus.EventType)
require.JSONEq(t, `{"success":"2 events","failed":"2 events"}`, string(deliveryStatus.Payload))
require.Equal(t, 1, deliveryStatus.AttemptNum)
require.Equal(t, jobsdb.Failed.State, deliveryStatus.JobState)
require.Equal(t, "500", deliveryStatus.ErrorCode)
require.NotEmpty(t, deliveryStatus.ErrorResponse)
}).Return(true)

for _, source := range sources {
for _, destination := range source.Destinations {
Expand Down Expand Up @@ -521,12 +565,14 @@ func TestAsyncDestinationManager(t *testing.T) {

mockCtrl := gomock.NewController(t)
mockBatchRouterJobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
mockDestinationDebugger := mockdestinationdebugger.NewMockDestinationDebugger(mockCtrl)

batchRouter := defaultHandle(destType)
batchRouter.now = func() time.Time {
return time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
}
batchRouter.jobsDB = mockBatchRouterJobsDB
batchRouter.debugger = mockDestinationDebugger

statsStore, err := memstats.New()
require.NoError(t, err)
Expand Down Expand Up @@ -572,6 +618,15 @@ func TestAsyncDestinationManager(t *testing.T) {
mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
mockDestinationDebugger.EXPECT().RecordEventDeliveryStatus(gomock.Any(), gomock.Any()).Times(1).Do(func(destinationID string, deliveryStatus *destinationdebugger.DeliveryStatusT) {
require.Equal(t, "1 events", deliveryStatus.EventName)
require.Empty(t, deliveryStatus.EventType)
require.JSONEq(t, `{"success":"0 events","failed":"1 events"}`, string(deliveryStatus.Payload))
require.Equal(t, 1, deliveryStatus.AttemptNum)
require.Equal(t, jobsdb.Failed.State, deliveryStatus.JobState)
require.Equal(t, "500", deliveryStatus.ErrorCode)
require.NotEmpty(t, deliveryStatus.ErrorResponse)
}).Return(true)

for _, source := range sources {
for _, destination := range source.Destinations {
Expand Down
9 changes: 7 additions & 2 deletions router/batchrouter/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/jobsdb"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
"github.com/rudderlabs/rudder-server/services/diagnostics"
Expand Down Expand Up @@ -79,13 +80,17 @@ func (brt *Handle) recordAsyncDestinationDeliveryStatus(sourceID, destinationID
)

for _, status := range statusList {
if status.JobState == jobsdb.Succeeded.State {
switch status.JobState {
case jobsdb.Succeeded.State:
successCount++
} else {
case jobsdb.Failed.State, jobsdb.Aborted.State:
failureCount++
failedReason = string(status.ErrorResponse)
}
}
if failureCount == 0 && successCount == 0 {
return
}

if failureCount > 0 {
jobState = jobsdb.Failed.State
Expand Down
2 changes: 2 additions & 0 deletions services/debugger/destination/eventDeliveryStatusUploader.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:generate mockgen -destination=../../../mocks/services/debugger/destination/eventDeliveryStatusUploader.go -package mock_destinationdebugger github.com/rudderlabs/rudder-server/services/debugger/destination/ DestinationDebugger

package destinationdebugger

import (
Expand Down

0 comments on commit 7d6ae36

Please sign in to comment.