From 7d79011382c5d0c35b9d74f5f3cf82b738332e90 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Wed, 4 Dec 2024 13:59:49 +0000 Subject: [PATCH] chore: clean the codebase --- .../{blob_storage.go => blobstorage.go} | 0 pkg/service/pipeline.go | 28 ++++++++++++------- pkg/service/pipelinerun.go | 28 +++++++++---------- pkg/service/utils.go | 3 +- 4 files changed, 33 insertions(+), 26 deletions(-) rename pkg/service/{blob_storage.go => blobstorage.go} (100%) diff --git a/pkg/service/blob_storage.go b/pkg/service/blobstorage.go similarity index 100% rename from pkg/service/blob_storage.go rename to pkg/service/blobstorage.go diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index dceec629c..80a2c8a69 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -831,6 +831,14 @@ func (s *service) UpdateNamespacePipelineIDByID(ctx context.Context, ns resource return s.converter.ConvertPipelineToPB(ctx, dbPipeline, pipelinepb.Pipeline_VIEW_FULL, true, true) } +// preTriggerPipeline does the following: +// 1. Upload pipeline input data to minio if the data is blob data. +// 2. New workflow memory. +// 2-1. Set the default values for the variables for memory data and uploading pipeline data. +// 2-2. Set the data with data.Value for the memory data, which will be used for pipeline running. +// 2-3. Upload "uploading pipeline data" to minio for pipeline run logger. +// 3. Map the settings in recipe to the format in workflow memory. +// 4. Enable the streaming mode when the header contains "text/event-stream" func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, r *datamodel.Recipe, pipelineTriggerID string, pipelineData []*pipelinepb.TriggerData) error { batchSize := len(pipelineData) if batchSize > constant.MaxBatchSize { @@ -845,6 +853,13 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, defaultValueMap[k] = v.Default } + requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) + + expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String()) + if err != nil { + return fmt.Errorf("get expiry rule tag: %w", err) + } + errors := []string{} for idx, data := range pipelineData { @@ -1362,17 +1377,10 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, } } - requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) - - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String()) - if err != nil { - return fmt.Errorf("get expiry rule tag: %w", err) - } - err = s.uploadPipelineRunInputsToMinio(ctx, uploadPipelineRunInputsToMinioParam{ - PipelineTriggerID: pipelineTriggerID, - ExpiryRuleTag: expiryRuleTag, - PipelineData: uploadingPipelineData, + pipelineTriggerID: pipelineTriggerID, + expiryRuleTag: expiryRuleTag, + pipelineData: uploadingPipelineData, }) if err != nil { return fmt.Errorf("pipeline run inputs to minio: %w", err) diff --git a/pkg/service/pipelinerun.go b/pkg/service/pipelinerun.go index 20f904fb0..c198b747e 100644 --- a/pkg/service/pipelinerun.go +++ b/pkg/service/pipelinerun.go @@ -93,7 +93,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns return nil, err } - isOwner := dbPipeline.OwnerUID().String() == requesterUID.String() + isOwner := dbPipeline.OwnerUID() == requesterUID pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsWithPermissions(ctx, requesterUID.String(), dbPipeline.UID.String(), page, pageSize, filter, orderBy, isOwner) @@ -103,7 +103,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns var referenceIDs []string for _, pipelineRun := range pipelineRuns { - if CanViewPrivateData(pipelineRun.RequesterUID.String(), requesterUID.String()) { + if canViewPrivateData(pipelineRun.RequesterUID, requesterUID) { for _, input := range pipelineRun.Inputs { referenceIDs = append(referenceIDs, input.Name) } @@ -154,7 +154,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns pbRun.RequesterId = *requesterID } - if CanViewPrivateData(run.RequesterUID.String(), requesterUID.String()) { + if canViewPrivateData(run.RequesterUID, requesterUID) { if len(run.Inputs) == 1 { key := run.Inputs[0].Name pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key) @@ -213,7 +213,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu return nil, fmt.Errorf("failed to get pipeline by UID: %s. error: %s", dbPipelineRun.PipelineUID.String(), err.Error()) } - isOwner := dbPipeline.OwnerUID().String() == requesterUID.String() + isOwner := dbPipeline.OwnerUID() == requesterUID if !isOwner && requesterUID.String() != dbPipelineRun.RequesterUID.String() { return nil, fmt.Errorf("requester is not pipeline owner/credit owner. they are not allowed to view these component runs") @@ -226,7 +226,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu var referenceIDs []string for _, pipelineRun := range componentRuns { - if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID.String()) { + if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) { for _, input := range pipelineRun.Inputs { referenceIDs = append(referenceIDs, input.Name) } @@ -255,7 +255,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu return nil, fmt.Errorf("failed to convert component run: %w", err) } - if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID.String()) { + if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) { if len(run.Inputs) == 1 { key := run.Inputs[0].Name pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key) @@ -384,26 +384,26 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP } type uploadPipelineRunInputsToMinioParam struct { - PipelineTriggerID string - ExpiryRuleTag string - PipelineData []map[string]any + pipelineTriggerID string + expiryRuleTag string + pipelineData []map[string]any } func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uploadPipelineRunInputsToMinioParam) error { logger, _ := logger.GetZapLogger(ctx) minioClient := s.minioClient - objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.PipelineTriggerID) + objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.pipelineTriggerID) logger.Info("uploadPipelineRunInputsToMinio started", zap.String("objectName", objectName), - zap.Any("pipelineData", param.PipelineData), + zap.Any("pipelineData", param.pipelineData), ) url, objectInfo, err := minioClient.UploadFile(ctx, logger, &miniox.UploadFileParam{ FilePath: objectName, - FileContent: param.PipelineData, + FileContent: param.pipelineData, FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: param.ExpiryRuleTag, + ExpiryRuleTag: param.expiryRuleTag, }) if err != nil { return fmt.Errorf("upload pipeline run inputs to minio: %w", err) @@ -416,7 +416,7 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo URL: url, }} - err = s.repository.UpdatePipelineRun(ctx, param.PipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs}) + err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs}) if err != nil { logger.Error("save pipeline run input data", zap.Error(err)) return err diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 8dda24704..d4eb776b5 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -179,8 +179,7 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline return result, nil } -// CanViewPrivateData - only with requester ns could users see their input/output data -func CanViewPrivateData(namespace, requesterUID string) bool { +func canViewPrivateData(namespace, requesterUID uuid.UUID) bool { return namespace == requesterUID }