Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(warehouse): syncs issues #2732

Merged
merged 7 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"strings"
"time"

"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/validations"

Expand Down Expand Up @@ -106,6 +110,7 @@ const (
TriggeredSuccessfully = "Triggered successfully"
NoPendingEvents = "No pending events to sync for this destination"
DownloadFileNamePattern = "downloadfile.*.tmp"
NoSuchSync = "No such sync exist"
)

func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
Expand Down Expand Up @@ -144,7 +149,7 @@ func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {

func (uploadsReq *UploadsReqT) validateReq() error {
if !uploadsReq.API.enabled || uploadsReq.API.log == nil || uploadsReq.API.dbHandle == nil {
return errors.New(`warehouse api's are not initialized`)
return errors.New(`warehouse api are not initialized`)
}
if uploadsReq.Limit < 1 {
uploadsReq.Limit = 10
Expand Down Expand Up @@ -250,16 +255,21 @@ func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUplo
func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
err := uploadReq.validateReq()
if err != nil {
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), err.Error())
}

query := uploadReq.generateQuery(`id, source_id, destination_id, destination_type, namespace, status, error, created_at, first_event_at, last_event_at, last_exec_at, updated_at, timings, metadata->>'nextRetryTime', metadata->>'archivedStagingAndLoadFiles'`)
uploadReq.API.log.Debug(query)
var upload proto.WHUploadResponse
var nextRetryTimeStr sql.NullString
var firstEventAt, lastEventAt, createdAt, lastExecAt, updatedAt sql.NullTime
var timingsObject sql.NullString
var uploadError string
var isUploadArchived sql.NullBool

var (
upload proto.WHUploadResponse
nextRetryTimeStr sql.NullString
firstEventAt, lastEventAt, createdAt, lastExecAt, updatedAt sql.NullTime
timingsObject sql.NullString
uploadError string
isUploadArchived sql.NullBool
)

row := uploadReq.API.dbHandle.QueryRow(query)
err = row.Scan(
&upload.Id,
Expand All @@ -278,14 +288,19 @@ func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
&nextRetryTimeStr,
&isUploadArchived,
)
if err == sql.ErrNoRows {
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_NOT_FOUND), "sync not found")
}
if err != nil {
uploadReq.API.log.Errorf(err.Error())
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INTERNAL), err.Error())
}

if !uploadReq.authorizeSource(upload.SourceId) {
pkgLogger.Errorf(`Unauthorized request for upload:%d with sourceId:%s in workspaceId:%s`, uploadReq.UploadId, upload.SourceId, uploadReq.WorkspaceID)
return &proto.WHUploadResponse{}, errors.New("unauthorized request")
return &proto.WHUploadResponse{}, status.Error(codes.Code(code.Code_UNAUTHENTICATED), "unauthorized request")
}

upload.CreatedAt = timestamppb.New(createdAt.Time)
upload.FirstEventAt = timestamppb.New(firstEventAt.Time)
upload.LastEventAt = timestamppb.New(lastEventAt.Time)
Expand Down Expand Up @@ -317,16 +332,17 @@ func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
} else {
upload.Duration = int32(timeutil.Now().Sub(lastExecAt.Time) / time.Second)
}

tableUploadReq := TableUploadReqT{
UploadID: upload.Id,
Name: "",
API: uploadReq.API,
}
tables, err := tableUploadReq.GetWhTableUploads()
upload.Tables, err = tableUploadReq.GetWhTableUploads()
if err != nil {
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INTERNAL), err.Error())
}
upload.Tables = tables

return &upload, nil
}

Expand All @@ -343,10 +359,14 @@ func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsR
if err != nil {
return
}

var (
uploadJobT UploadJobT
upload Upload
)

query := uploadReq.generateQuery(`id, source_id, destination_id, metadata`)
uploadReq.API.log.Debug(query)
var uploadJobT UploadJobT
var upload Upload

row := uploadReq.API.dbHandle.QueryRow(query)
err = row.Scan(
Expand All @@ -355,15 +375,23 @@ func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsR
&upload.DestinationID,
&upload.Metadata,
)
if err == sql.ErrNoRows {
return &proto.TriggerWhUploadsResponse{
Message: NoSuchSync,
StatusCode: http.StatusOK,
}, nil
}
if err != nil {
uploadReq.API.log.Errorf(err.Error())
return
}

if !uploadReq.authorizeSource(upload.SourceID) {
pkgLogger.Errorf(`Unauthorized request for upload:%d with sourceId:%s in workspaceId:%s`, uploadReq.UploadId, upload.SourceID, uploadReq.WorkspaceID)
err = errors.New("unauthorized request")
return
}

uploadJobT.upload = &upload
uploadJobT.dbHandle = uploadReq.API.dbHandle
err = uploadJobT.triggerUploadNow()
Expand Down Expand Up @@ -449,7 +477,7 @@ func (tableUploadReq TableUploadReqT) generateQuery(selectFields string) string

func (tableUploadReq TableUploadReqT) validateReq() error {
if !tableUploadReq.API.enabled || tableUploadReq.API.log == nil || tableUploadReq.API.dbHandle == nil {
return errors.New("warehouse api's are not initialized")
return errors.New("warehouse api are not initialized")
}
if tableUploadReq.UploadID == 0 {
return errors.New("upload_id is empty or should be greater than 0")
Expand All @@ -474,7 +502,7 @@ func (uploadReq UploadReqT) generateQuery(selectedFields string) string {

func (uploadReq UploadReqT) validateReq() error {
if !uploadReq.API.enabled || uploadReq.API.log == nil || uploadReq.API.dbHandle == nil {
return errors.New("warehouse api's are not initialized")
return errors.New("warehouse api are not initialized")
}
if uploadReq.UploadId < 1 {
return errors.New(`upload_id is empty or should be greater than 0 `)
Expand Down Expand Up @@ -628,11 +656,16 @@ func (uploadsReq *UploadsReqT) getTotalUploadCount(whereClause string) (int32, e

// for hosted workspaces - we get the uploads and the total upload count using the same query
func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []string, selectFields string) (uploadsRes *proto.WHUploadsResponse, err error) {
var uploads []*proto.WHUploadResponse
var totalUploadCount int32
var (
uploads []*proto.WHUploadResponse
totalUploadCount int32
whereClauses []string
subQuery string
query string
)

// create query
subQuery := fmt.Sprintf(`
subQuery = fmt.Sprintf(`
SELECT
%s,
COUNT(*) OVER() AS total_uploads
Expand All @@ -643,7 +676,6 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s
selectFields,
warehouseutils.WarehouseUploadsTable,
)
var whereClauses []string
if uploadsReq.SourceID == "" {
whereClauses = append(whereClauses, fmt.Sprintf(`source_id IN (%v)`, misc.SingleQuoteLiteralJoin(authorizedSourceIDs)))
} else if misc.Contains(authorizedSourceIDs, uploadsReq.SourceID) {
Expand All @@ -660,7 +692,7 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s
}

subQuery = subQuery + strings.Join(whereClauses, " AND ")
query := fmt.Sprintf(`
query = fmt.Sprintf(`
SELECT
*
FROM
Expand Down Expand Up @@ -697,11 +729,16 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s

// for non hosted workspaces - we get the uploads and the total upload count using separate queries
func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes *proto.WHUploadsResponse, err error) {
var uploads []*proto.WHUploadResponse
var totalUploadCount int32
var (
uploads []*proto.WHUploadResponse
totalUploadCount int32
query string
whereClause string
whereClauses []string
)

// create query
query := fmt.Sprintf(`
query = fmt.Sprintf(`
select
%s
from
Expand All @@ -710,8 +747,6 @@ func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes
selectFields,
warehouseutils.WarehouseUploadsTable,
)
whereClause := ""
var whereClauses []string
if uploadsReq.SourceID != "" {
whereClauses = append(whereClauses, fmt.Sprintf(`source_id = '%s'`, uploadsReq.SourceID))
}
Expand Down
29 changes: 29 additions & 0 deletions warehouse/warehousegrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (*warehouseGRPC) GetWHUploads(_ context.Context, request *proto.WHUploadsRe
Offset: request.Offset,
API: UploadAPI,
}
uploadsReq.API.log.Info(
"[GetWHUploads] Fetching warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s",
uploadsReq.WorkspaceID,
uploadsReq.SourceID,
uploadsReq.DestinationID,
)
res, err := uploadsReq.GetWhUploads()
return res, err
}
Expand All @@ -41,6 +47,12 @@ func (*warehouseGRPC) TriggerWHUploads(_ context.Context, request *proto.WHUploa
DestinationID: request.DestinationId,
API: UploadAPI,
}
uploadsReq.API.log.Info(
"[TriggerWHUploads] Triggering warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s",
uploadsReq.WorkspaceID,
uploadsReq.SourceID,
uploadsReq.DestinationID,
)
res, err := uploadsReq.TriggerWhUploads()
return res, err
}
Expand All @@ -51,6 +63,11 @@ func (*warehouseGRPC) GetWHUpload(_ context.Context, request *proto.WHUploadRequ
WorkspaceID: request.WorkspaceId,
API: UploadAPI,
}
uploadReq.API.log.Info(
"[GetWHUpload] Fetching warehouse upload for WorkspaceId: %s, UploadId: %d",
uploadReq.WorkspaceID,
uploadReq.UploadId,
)
res, err := uploadReq.GetWHUpload()
return res, err
}
Expand All @@ -65,6 +82,11 @@ func (*warehouseGRPC) TriggerWHUpload(_ context.Context, request *proto.WHUpload
WorkspaceID: request.WorkspaceId,
API: UploadAPI,
}
uploadReq.API.log.Info(
"[TriggerWHUpload] Triggering warehouse upload for WorkspaceId: %s, UploadId: %d",
uploadReq.WorkspaceID,
uploadReq.UploadId,
)
res, err := uploadReq.TriggerWHUpload()
return res, err
}
Expand Down Expand Up @@ -169,6 +191,13 @@ func (*warehouseGRPC) CountWHUploadsToRetry(ctx context.Context, req *proto.Retr
UploadIds: req.UploadIds,
API: UploadAPI,
}
retryReq.API.log.Info(
"[RetryWHUploads] Retrying warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s, IntervalInHours: %d",
retryReq.WorkspaceID,
retryReq.SourceID,
retryReq.DestinationID,
retryReq.IntervalInHours,
)
r, err := retryReq.UploadsToRetry(ctx)
response = &proto.RetryWHUploadsResponse{
Count: r.Count,
Expand Down