Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding metadata in the proxy request #2622

Merged
merged 18 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,24 @@ func (worker *workerT) processDestinationJobs() {
if worker.rt.transformerProxy {
jobID := destinationJob.JobMetadataArray[0].JobID
pkgLogger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request started`, worker.rt.destName, jobID)

// setting metadata
firstJobMetadata := destinationJob.JobMetadataArray[0]
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
proxyReqparams := &transformer.ProxyRequestParams{
DestName: worker.rt.destName,
JobID: jobID,
ResponseData: val,
DestName: worker.rt.destName,
JobID: jobID,
ResponseData: transformer.ProxyRequestPayload{
PostParametersT: val,
Metadata: transformer.ProxyRequestMetadata{
SourceID: firstJobMetadata.SourceID,
DestinationID: firstJobMetadata.DestinationID,
WorkspaceID: firstJobMetadata.WorkspaceID,
JobID: firstJobMetadata.JobID,
AttemptNum: firstJobMetadata.AttemptNum,
DestInfo: firstJobMetadata.DestInfo,
Secret: firstJobMetadata.Secret,
},
},
}
rtlTime := time.Now()
respStatusCode, respBodyTemp, respContentType = worker.rt.transformer.ProxyRequest(ctx, proxyReqparams)
Expand All @@ -682,7 +696,7 @@ func (worker *workerT) processDestinationJobs() {
workerID: worker.workerID,
trRespStCd: respStatusCode,
trRespBody: respBodyTemp,
secret: destinationJob.JobMetadataArray[0].Secret,
secret: firstJobMetadata.Secret,
})
}
} else {
Expand Down
18 changes: 17 additions & 1 deletion router/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package transformer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -48,8 +49,23 @@ type handle struct {
logger logger.Logger
}

type ProxyRequestMetadata struct {
JobID int64 `json:"jobId"`
AttemptNum int `json:"attemptNum"`
UserID string `json:"userId"`
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
WorkspaceID string `json:"workspaceId"`
Secret json.RawMessage `json:"secret"`
DestInfo json.RawMessage `json:"destInfo,omitempty"`
}

type ProxyRequestPayload struct {
integrations.PostParametersT
Metadata ProxyRequestMetadata `json:"metadata,omitempty"`
}
type ProxyRequestParams struct {
ResponseData integrations.PostParametersT
ResponseData ProxyRequestPayload
DestName string
JobID int64
BaseUrl string
Expand Down
142 changes: 76 additions & 66 deletions router/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestProxyRequest(t *testing.T) {
// For http client timeout scenarios, we need to have a proxyTimeout which is > rtTimeout + <timeout_at_router_transform>
rtTimeout time.Duration
// Transformed response that needs to be sent to destination
postParameters integrations.PostParametersT
postParameters ProxyRequestPayload
context proxyContext
}

Expand All @@ -93,21 +93,23 @@ func TestProxyRequest(t *testing.T) {
timeout: 0,
},
rtTimeout: 10 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -125,21 +127,23 @@ func TestProxyRequest(t *testing.T) {
timeout: time.Duration(1.2 * 1e9),
},
rtTimeout: 8 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest_1.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest_1.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -159,21 +163,23 @@ func TestProxyRequest(t *testing.T) {
context: proxyContext{
timeout: 2 * time.Millisecond,
},
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -192,21 +198,23 @@ func TestProxyRequest(t *testing.T) {
context: proxyContext{
cancel: true,
},
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -223,21 +231,23 @@ func TestProxyRequest(t *testing.T) {
response: `Not Found`,
},
rtTimeout: 10 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.not_found_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.not_found_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions router/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type JobMetadataT struct {
Secret json.RawMessage `json:"secret"`
JobT *jobsdb.JobT `json:"jobsT"`
WorkerAssignedTime time.Time `json:"workerAssignedTime"`
DestInfo json.RawMessage `json:"destInfo,omitempty"`
chandumlg marked this conversation as resolved.
Show resolved Hide resolved
}

// TransformMessageT is used to pass message to the transformer workers
Expand Down