diff --git a/router/router.go b/router/router.go index ef08a56373..62697d36b3 100644 --- a/router/router.go +++ b/router/router.go @@ -663,10 +663,24 @@ func (worker *workerT) processDestinationJobs() { if worker.rt.transformerProxy { jobID := destinationJob.JobMetadataArray[0].JobID pkgLogger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request started`, worker.rt.destName, jobID) + + // setting metadata + firstJobMetadata := destinationJob.JobMetadataArray[0] proxyReqparams := &transformer.ProxyRequestParams{ - DestName: worker.rt.destName, - JobID: jobID, - ResponseData: val, + DestName: worker.rt.destName, + JobID: jobID, + ResponseData: transformer.ProxyRequestPayload{ + PostParametersT: val, + Metadata: transformer.ProxyRequestMetadata{ + SourceID: firstJobMetadata.SourceID, + DestinationID: firstJobMetadata.DestinationID, + WorkspaceID: firstJobMetadata.WorkspaceID, + JobID: firstJobMetadata.JobID, + AttemptNum: firstJobMetadata.AttemptNum, + DestInfo: firstJobMetadata.DestInfo, + Secret: firstJobMetadata.Secret, + }, + }, } rtlTime := time.Now() respStatusCode, respBodyTemp, respContentType = worker.rt.transformer.ProxyRequest(ctx, proxyReqparams) @@ -682,7 +696,7 @@ func (worker *workerT) processDestinationJobs() { workerID: worker.workerID, trRespStCd: respStatusCode, trRespBody: respBodyTemp, - secret: destinationJob.JobMetadataArray[0].Secret, + secret: firstJobMetadata.Secret, }) } } else { diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index 6ea1f3d1a0..ff389fbf5c 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -5,6 +5,7 @@ package transformer import ( "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -48,8 +49,23 @@ type handle struct { logger logger.Logger } +type ProxyRequestMetadata struct { + JobID int64 `json:"jobId"` + AttemptNum int `json:"attemptNum"` + UserID string `json:"userId"` + SourceID string `json:"sourceId"` + DestinationID string `json:"destinationId"` + WorkspaceID string `json:"workspaceId"` + Secret json.RawMessage `json:"secret"` + DestInfo json.RawMessage `json:"destInfo,omitempty"` +} + +type ProxyRequestPayload struct { + integrations.PostParametersT + Metadata ProxyRequestMetadata `json:"metadata,omitempty"` +} type ProxyRequestParams struct { - ResponseData integrations.PostParametersT + ResponseData ProxyRequestPayload DestName string JobID int64 BaseUrl string diff --git a/router/transformer/transformer_test.go b/router/transformer/transformer_test.go index 87dc0a89c1..40ae656b4a 100644 --- a/router/transformer/transformer_test.go +++ b/router/transformer/transformer_test.go @@ -71,7 +71,7 @@ func TestProxyRequest(t *testing.T) { // For http client timeout scenarios, we need to have a proxyTimeout which is > rtTimeout + rtTimeout time.Duration // Transformed response that needs to be sent to destination - postParameters integrations.PostParametersT + postParameters ProxyRequestPayload context proxyContext } @@ -93,21 +93,23 @@ func TestProxyRequest(t *testing.T) { timeout: 0, }, rtTimeout: 10 * time.Millisecond, - postParameters: integrations.PostParametersT{ - Type: "REST", - URL: "http://www.good_dest.domain.com", - RequestMethod: http.MethodPost, - QueryParams: map[string]interface{}{}, - Body: map[string]interface{}{ - "JSON": map[string]interface{}{ - "key_1": "val_1", - "key_2": "val_2", + postParameters: ProxyRequestPayload{ + PostParametersT: integrations.PostParametersT{ + Type: "REST", + URL: "http://www.good_dest.domain.com", + RequestMethod: http.MethodPost, + QueryParams: map[string]interface{}{}, + Body: map[string]interface{}{ + "JSON": map[string]interface{}{ + "key_1": "val_1", + "key_2": "val_2", + }, + "FORM": map[string]interface{}{}, + "JSON_ARRAY": map[string]interface{}{}, + "XML": map[string]interface{}{}, }, - "FORM": map[string]interface{}{}, - "JSON_ARRAY": map[string]interface{}{}, - "XML": map[string]interface{}{}, + Files: map[string]interface{}{}, }, - Files: map[string]interface{}{}, }, }, { @@ -125,21 +127,23 @@ func TestProxyRequest(t *testing.T) { timeout: time.Duration(1.2 * 1e9), }, rtTimeout: 8 * time.Millisecond, - postParameters: integrations.PostParametersT{ - Type: "REST", - URL: "http://www.good_dest_1.domain.com", - RequestMethod: http.MethodPost, - QueryParams: map[string]interface{}{}, - Body: map[string]interface{}{ - "JSON": map[string]interface{}{ - "key_1": "val_1", - "key_2": "val_2", + postParameters: ProxyRequestPayload{ + PostParametersT: integrations.PostParametersT{ + Type: "REST", + URL: "http://www.good_dest_1.domain.com", + RequestMethod: http.MethodPost, + QueryParams: map[string]interface{}{}, + Body: map[string]interface{}{ + "JSON": map[string]interface{}{ + "key_1": "val_1", + "key_2": "val_2", + }, + "FORM": map[string]interface{}{}, + "JSON_ARRAY": map[string]interface{}{}, + "XML": map[string]interface{}{}, }, - "FORM": map[string]interface{}{}, - "JSON_ARRAY": map[string]interface{}{}, - "XML": map[string]interface{}{}, + Files: map[string]interface{}{}, }, - Files: map[string]interface{}{}, }, }, { @@ -159,21 +163,23 @@ func TestProxyRequest(t *testing.T) { context: proxyContext{ timeout: 2 * time.Millisecond, }, - postParameters: integrations.PostParametersT{ - Type: "REST", - URL: "http://www.ctx_timeout_dest.domain.com", - RequestMethod: http.MethodPost, - QueryParams: map[string]interface{}{}, - Body: map[string]interface{}{ - "JSON": map[string]interface{}{ - "key_1": "val_1", - "key_2": "val_2", + postParameters: ProxyRequestPayload{ + PostParametersT: integrations.PostParametersT{ + Type: "REST", + URL: "http://www.ctx_timeout_dest.domain.com", + RequestMethod: http.MethodPost, + QueryParams: map[string]interface{}{}, + Body: map[string]interface{}{ + "JSON": map[string]interface{}{ + "key_1": "val_1", + "key_2": "val_2", + }, + "FORM": map[string]interface{}{}, + "JSON_ARRAY": map[string]interface{}{}, + "XML": map[string]interface{}{}, }, - "FORM": map[string]interface{}{}, - "JSON_ARRAY": map[string]interface{}{}, - "XML": map[string]interface{}{}, + Files: map[string]interface{}{}, }, - Files: map[string]interface{}{}, }, }, { @@ -192,21 +198,23 @@ func TestProxyRequest(t *testing.T) { context: proxyContext{ cancel: true, }, - postParameters: integrations.PostParametersT{ - Type: "REST", - URL: "http://www.ctx_timeout_dest.domain.com", - RequestMethod: http.MethodPost, - QueryParams: map[string]interface{}{}, - Body: map[string]interface{}{ - "JSON": map[string]interface{}{ - "key_1": "val_1", - "key_2": "val_2", + postParameters: ProxyRequestPayload{ + PostParametersT: integrations.PostParametersT{ + Type: "REST", + URL: "http://www.ctx_timeout_dest.domain.com", + RequestMethod: http.MethodPost, + QueryParams: map[string]interface{}{}, + Body: map[string]interface{}{ + "JSON": map[string]interface{}{ + "key_1": "val_1", + "key_2": "val_2", + }, + "FORM": map[string]interface{}{}, + "JSON_ARRAY": map[string]interface{}{}, + "XML": map[string]interface{}{}, }, - "FORM": map[string]interface{}{}, - "JSON_ARRAY": map[string]interface{}{}, - "XML": map[string]interface{}{}, + Files: map[string]interface{}{}, }, - Files: map[string]interface{}{}, }, }, { @@ -223,21 +231,23 @@ func TestProxyRequest(t *testing.T) { response: `Not Found`, }, rtTimeout: 10 * time.Millisecond, - postParameters: integrations.PostParametersT{ - Type: "REST", - URL: "http://www.not_found_dest.domain.com", - RequestMethod: http.MethodPost, - QueryParams: map[string]interface{}{}, - Body: map[string]interface{}{ - "JSON": map[string]interface{}{ - "key_1": "val_1", - "key_2": "val_2", + postParameters: ProxyRequestPayload{ + PostParametersT: integrations.PostParametersT{ + Type: "REST", + URL: "http://www.not_found_dest.domain.com", + RequestMethod: http.MethodPost, + QueryParams: map[string]interface{}{}, + Body: map[string]interface{}{ + "JSON": map[string]interface{}{ + "key_1": "val_1", + "key_2": "val_2", + }, + "FORM": map[string]interface{}{}, + "JSON_ARRAY": map[string]interface{}{}, + "XML": map[string]interface{}{}, }, - "FORM": map[string]interface{}{}, - "JSON_ARRAY": map[string]interface{}{}, - "XML": map[string]interface{}{}, + Files: map[string]interface{}{}, }, - Files: map[string]interface{}{}, }, }, } diff --git a/router/types/types.go b/router/types/types.go index d56e28554d..e011ab7624 100644 --- a/router/types/types.go +++ b/router/types/types.go @@ -55,6 +55,7 @@ type JobMetadataT struct { Secret json.RawMessage `json:"secret"` JobT *jobsdb.JobT `json:"jobsT"` WorkerAssignedTime time.Time `json:"workerAssignedTime"` + DestInfo json.RawMessage `json:"destInfo,omitempty"` } // TransformMessageT is used to pass message to the transformer workers