diff --git a/cmd/main/main.go b/cmd/main/main.go index f1d5ff062..189919046 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -298,14 +298,16 @@ func main() { InstillCoreHost: config.Config.Server.InstillCoreHost, ComponentStore: compStore, }), - MgmtPublicServiceClient: mgmtPublicServiceClient, - MgmtPrivateServiceClient: mgmtPrivateServiceClient, - MinioClient: minioClient, - ComponentStore: compStore, - Memory: ms, - WorkerUID: workerUID, - RetentionHandler: nil, - BinaryFetcher: binaryFetcher, + MgmtPublicServiceClient: mgmtPublicServiceClient, + MgmtPrivateServiceClient: mgmtPrivateServiceClient, + MinioClient: minioClient, + ComponentStore: compStore, + Memory: ms, + WorkerUID: workerUID, + RetentionHandler: nil, + BinaryFetcher: binaryFetcher, + ArtifactPublicServiceClient: artifactPublicServiceClient, + ArtifactPrivateServiceClient: artifactPrivateServiceClient, }, ) @@ -511,7 +513,6 @@ func main() { lw.RegisterActivity(cw.UpdatePipelineRunActivity) lw.RegisterActivity(cw.UpsertComponentRunActivity) - mw.RegisterActivity(cw.UploadInputsToMinioActivity) mw.RegisterActivity(cw.UploadOutputsToMinioActivity) mw.RegisterActivity(cw.UploadRecipeToMinioActivity) mw.RegisterActivity(cw.UploadComponentInputsActivity) diff --git a/go.mod b/go.mod index 747497615..6f69d85ae 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/influxdata/influxdb-client-go/v2 v2.12.3 github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a - github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792 + github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 github.com/itchyny/gojq v0.12.14 github.com/jackc/pgx/v5 v5.5.5 github.com/jmoiron/sqlx v1.4.0 diff --git a/go.sum b/go.sum index de16d3b14..15c7e9ffa 100644 --- a/go.sum +++ b/go.sum @@ -1285,8 +1285,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1: github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= -github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792 h1:b4lhXcFJ/kGGC1RErtItoI57paf9WXBCVpaPIAApldY= -github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8= +github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so= +github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8= github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ= github.com/itchyny/gojq v0.12.14 h1:6k8vVtsrhQSYgSGg827AD+PVVaB1NLXEdX+dda2oZCc= github.com/itchyny/gojq v0.12.14/go.mod h1:y1G7oO7XkcR1LPZO59KyoCRy08T3j9vDYRV0GgYSS+s= diff --git a/pkg/component/internal/util/helper.go b/pkg/component/internal/util/helper.go index fad88b877..92831e976 100644 --- a/pkg/component/internal/util/helper.go +++ b/pkg/component/internal/util/helper.go @@ -138,15 +138,24 @@ func GetFileTypeByFilename(filename string) (string, error) { func GetContentTypeFromBase64(base64String string) (string, error) { // Remove the "data:" prefix and split at the first semicolon - contentType := strings.TrimPrefix(base64String, "data:") + if hasDataPrefix(base64String) { + contentType := strings.TrimPrefix(base64String, "data:") - parts := strings.SplitN(contentType, ";", 2) - if len(parts) != 2 { - return "", fmt.Errorf("invalid format") + parts := strings.SplitN(contentType, ";", 2) + if len(parts) != 2 { + return "", fmt.Errorf("invalid format") + } + + // The first part is the content type + return parts[0], nil } - // The first part is the content type - return parts[0], nil + b, err := base64.StdEncoding.DecodeString(base64String) + if err != nil { + return "", fmt.Errorf("decode base64 string: %w", err) + } + mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0] + return mimeType, nil } func GetFileBase64Content(base64String string) string { diff --git a/pkg/service/blobstorage.go b/pkg/service/blobstorage.go new file mode 100644 index 000000000..4c2e6c229 --- /dev/null +++ b/pkg/service/blobstorage.go @@ -0,0 +1,116 @@ +package service + +import ( + "context" + "encoding/base64" + "fmt" + "mime" + "strings" + "time" + + "github.com/gabriel-vasile/mimetype" + "google.golang.org/grpc/metadata" + + "github.com/instill-ai/pipeline-backend/pkg/recipe" + "github.com/instill-ai/pipeline-backend/pkg/resource" + "github.com/instill-ai/pipeline-backend/pkg/utils" + + artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" + resourcex "github.com/instill-ai/x/resource" +) + +func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.Namespace, data string) (string, error) { + mimeType, err := getMimeType(data) + if err != nil { + return "", fmt.Errorf("get mime type: %w", err) + } + artifactClient := s.artifactPublicServiceClient + requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) + + vars, err := recipe.GenerateSystemVariables(ctx, recipe.SystemVariables{}) + + if err != nil { + return "", fmt.Errorf("generate system variables: %w", err) + } + + ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(vars)) + + timestamp := time.Now().Format(time.RFC3339) + objectName := fmt.Sprintf("%s-%s%s", requesterUID.String(), timestamp, getFileExtension(mimeType)) + + // TODO: We will need to add the expiry days for the blob data. + // This will be addressed in ins-6857 + resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ + NamespaceId: ns.NsID, + ObjectName: objectName, + ObjectExpireDays: 0, + }) + + if err != nil { + return "", fmt.Errorf("get upload url: %w", err) + } + + uploadURL := resp.GetUploadUrl() + data = removePrefix(data) + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return "", fmt.Errorf("decode base64 string: %w", err) + } + + err = utils.UploadBlobData(ctx, uploadURL, mimeType, b, s.log) + if err != nil { + return "", fmt.Errorf("upload blob data: %w", err) + } + + respDownloadURL, err := artifactClient.GetObjectDownloadURL(ctx, &artifactpb.GetObjectDownloadURLRequest{ + NamespaceId: ns.NsID, + ObjectUid: resp.GetObject().GetUid(), + }) + if err != nil { + return "", fmt.Errorf("get object download url: %w", err) + } + + return respDownloadURL.GetDownloadUrl(), nil +} + +func getMimeType(data string) (string, error) { + var mimeType string + if strings.HasPrefix(data, "data:") { + contentType := strings.TrimPrefix(data, "data:") + parts := strings.SplitN(contentType, ";", 2) + if len(parts) == 0 { + return "", fmt.Errorf("invalid data url") + } + mimeType = parts[0] + } else { + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return "", fmt.Errorf("decode base64 string: %w", err) + } + mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0] + + } + return mimeType, nil +} + +func getFileExtension(mimeType string) string { + ext, err := mime.ExtensionsByType(mimeType) + if err != nil { + return "" + } + if len(ext) == 0 { + return "" + } + return ext[0] +} + +func removePrefix(data string) string { + if strings.HasPrefix(data, "data:") { + parts := strings.SplitN(data, ",", 2) + if len(parts) == 0 { + return "" + } + return parts[1] + } + return data +} diff --git a/pkg/service/main.go b/pkg/service/main.go index 093760b1e..aa1540930 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -20,6 +20,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/resource" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" + artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" miniox "github.com/instill-ai/x/minio" @@ -98,37 +99,41 @@ type TriggerResult struct { // However, we keep it here for now because we may need it in the future. // service is the implementation of the Service interface type service struct { - repository repository.Repository - redisClient *redis.Client - temporalClient client.Client - component *componentstore.Store - mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient - mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient - aclClient acl.ACLClientInterface - converter Converter - minioClient miniox.MinioI - memory memory.MemoryStore - log *zap.Logger - workerUID uuid.UUID - retentionHandler MetadataRetentionHandler - binaryFetcher external.BinaryFetcher + repository repository.Repository + redisClient *redis.Client + temporalClient client.Client + component *componentstore.Store + mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient + mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient + aclClient acl.ACLClientInterface + converter Converter + minioClient miniox.MinioI + memory memory.MemoryStore + log *zap.Logger + workerUID uuid.UUID + retentionHandler MetadataRetentionHandler + binaryFetcher external.BinaryFetcher + artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient + artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient } // ServiceConfig is the configuration for the service type ServiceConfig struct { - Repository repository.Repository - RedisClient *redis.Client - TemporalClient client.Client - ACLClient acl.ACLClientInterface - Converter Converter - MgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient - MgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient - MinioClient miniox.MinioI - ComponentStore *componentstore.Store - Memory memory.MemoryStore - WorkerUID uuid.UUID - RetentionHandler MetadataRetentionHandler - BinaryFetcher external.BinaryFetcher + Repository repository.Repository + RedisClient *redis.Client + TemporalClient client.Client + ACLClient acl.ACLClientInterface + Converter Converter + MgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient + MgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient + MinioClient miniox.MinioI + ComponentStore *componentstore.Store + Memory memory.MemoryStore + WorkerUID uuid.UUID + RetentionHandler MetadataRetentionHandler + BinaryFetcher external.BinaryFetcher + ArtifactPublicServiceClient artifactpb.ArtifactPublicServiceClient + ArtifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient } // NewService initiates a service instance @@ -141,19 +146,21 @@ func NewService( } return &service{ - repository: cfg.Repository, - redisClient: cfg.RedisClient, - temporalClient: cfg.TemporalClient, - mgmtPublicServiceClient: cfg.MgmtPublicServiceClient, - mgmtPrivateServiceClient: cfg.MgmtPrivateServiceClient, - component: cfg.ComponentStore, - aclClient: cfg.ACLClient, - converter: cfg.Converter, - minioClient: cfg.MinioClient, - memory: cfg.Memory, - log: zapLogger, - workerUID: cfg.WorkerUID, - retentionHandler: cfg.RetentionHandler, - binaryFetcher: cfg.BinaryFetcher, + repository: cfg.Repository, + redisClient: cfg.RedisClient, + temporalClient: cfg.TemporalClient, + mgmtPublicServiceClient: cfg.MgmtPublicServiceClient, + mgmtPrivateServiceClient: cfg.MgmtPrivateServiceClient, + component: cfg.ComponentStore, + aclClient: cfg.ACLClient, + converter: cfg.Converter, + minioClient: cfg.MinioClient, + memory: cfg.Memory, + log: zapLogger, + workerUID: cfg.WorkerUID, + retentionHandler: cfg.RetentionHandler, + binaryFetcher: cfg.BinaryFetcher, + artifactPublicServiceClient: cfg.ArtifactPublicServiceClient, + artifactPrivateServiceClient: cfg.ArtifactPrivateServiceClient, } } diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 96cd4f84f..93ce3c7f6 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -2,7 +2,6 @@ package service import ( "context" - "encoding/base64" "encoding/json" "errors" "fmt" @@ -12,7 +11,6 @@ import ( "time" "cloud.google.com/go/longrunning/autogen/longrunningpb" - "github.com/gabriel-vasile/mimetype" "github.com/gofrs/uuid" "go.einride.tech/aip/filtering" "go.einride.tech/aip/ordering" @@ -833,6 +831,17 @@ func (s *service) UpdateNamespacePipelineIDByID(ctx context.Context, ns resource return s.converter.ConvertPipelineToPB(ctx, dbPipeline, pipelinepb.Pipeline_VIEW_FULL, true, true) } +// preTriggerPipeline does the following: +// 1. Upload pipeline input data to minio if the data is blob data. +// 2. New workflow memory. +// 2-1. Set the default values for the variables for memory data and uploading pipeline data. +// 2-2. Set the data with data.Value for the memory data, which will be used for pipeline running. +// 2-3. Upload "uploading pipeline data" to minio for pipeline run logger. +// 3. Map the settings in recipe to the format in workflow memory. +// 4. Enable the streaming mode when the header contains "text/event-stream" +// +// We upload User Input Data by `uploadBlobAndGetDownloadURL`, which exposes the public URL because it will be used by `console` & external users. +// We upload Pipeline Input Data by `uploadPipelineRunInputsToMinio`, which does not expose the public URL. The URL will be used by pipeline run logger. func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, r *datamodel.Recipe, pipelineTriggerID string, pipelineData []*pipelinepb.TriggerData) error { batchSize := len(pipelineData) if batchSize > constant.MaxBatchSize { @@ -866,38 +875,31 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, // TODO: remove these conversions after the blob storage is fully rolled out for k := range m { - switch s := m[k].(type) { + switch str := m[k].(type) { case string: - if formatMap[k] != "string" && formatMap[k] != "number" && formatMap[k] != "boolean" && formatMap[k] != "json" { + if isUnstructuredFormat(formatMap[k]) { // Skip the base64 decoding if the string is a URL - if strings.HasPrefix(s, "http://") || strings.HasPrefix(s, "https://") { + if strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://") { continue } - if !strings.HasPrefix(s, "data:") { - b, err := base64.StdEncoding.DecodeString(s) - if err != nil { - return fmt.Errorf("can not decode file %s, %s", formatMap[k], s) - } - mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0] - vars.Fields[k] = structpb.NewStringValue(fmt.Sprintf("data:%s;base64,%s", mimeType, s)) + downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, ns, str) + if err != nil { + return fmt.Errorf("upload blob and get download url: %w", err) } - + vars.Fields[k] = structpb.NewStringValue(downloadURL) } case []string: - if formatMap[k] != "array:string" && formatMap[k] != "array:number" && formatMap[k] != "array:boolean" { - for idx := range s { + if isUnstructuredFormat(formatMap[k]) { + for idx := range str { // Skip the base64 decoding if the string is a URL - if strings.HasPrefix(s[idx], "http://") || strings.HasPrefix(s[idx], "https://") { + if strings.HasPrefix(str[idx], "http://") || strings.HasPrefix(str[idx], "https://") { continue } - if !strings.HasPrefix(s[idx], "data:") { - b, err := base64.StdEncoding.DecodeString(s[idx]) - if err != nil { - return fmt.Errorf("can not decode file %s, %s", formatMap[k], s) - } - mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0] - vars.Fields[k].GetListValue().GetValues()[idx] = structpb.NewStringValue(fmt.Sprintf("data:%s;base64,%s", mimeType, s[idx])) + downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, ns, str[idx]) + if err != nil { + return fmt.Errorf("upload blob and get download url: %w", err) } + vars.Fields[k] = structpb.NewStringValue(downloadURL) } } @@ -920,6 +922,11 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, formats[k] = []string{v} } + uploadingPipelineData := make([]map[string]any, len(pipelineData)) + for idx := range uploadingPipelineData { + uploadingPipelineData[idx] = make(map[string]any) + } + // TODO(huitang): implement a structpb to format.Value converter for idx, d := range pipelineData { @@ -941,6 +948,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, // is missing and should be handled as such by components. if d, ok := defaultValueMap[k]; !ok || d == nil { variable[k] = data.NewNull() + uploadingPipelineData[idx][k] = nil continue } } @@ -949,106 +957,138 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, case "boolean": if v == nil { variable[k] = data.NewBoolean(defaultValueMap[k].(bool)) + uploadingPipelineData[idx][k] = defaultValueMap[k].(bool) } else { if _, ok := v.Kind.(*structpb.Value_BoolValue); !ok { return fmt.Errorf("%w: invalid boolean value: %v", errdomain.ErrInvalidArgument, v) } variable[k] = data.NewBoolean(v.GetBoolValue()) + uploadingPipelineData[idx][k] = v.GetBoolValue() } case "array:boolean": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx] = data.NewBoolean(val.(bool)) + uploadingDataArray := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i] = data.NewBoolean(val.(bool)) + uploadingDataArray[i] = val.(bool) } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + uploadingDataArray := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_BoolValue); !ok { return fmt.Errorf("%w: invalid boolean value: %v", errdomain.ErrInvalidArgument, val) } - array[idx] = data.NewBoolean(val.GetBoolValue()) + array[i] = data.NewBoolean(val.GetBoolValue()) + uploadingDataArray[i] = val.GetBoolValue() } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } case "string": if v == nil { variable[k] = data.NewString(defaultValueMap[k].(string)) + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) } variable[k] = data.NewString(v.GetStringValue()) + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:string": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx] = data.NewString(val.(string)) + uploadingDataArray := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i] = data.NewString(val.(string)) + uploadingDataArray[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + uploadingDataArray := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx] = data.NewString(val.GetStringValue()) + array[i] = data.NewString(val.GetStringValue()) + uploadingDataArray[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } case "integer": if v == nil { variable[k] = data.NewNumberFromFloat(defaultValueMap[k].(float64)) + uploadingPipelineData[idx][k] = defaultValueMap[k].(float64) } else { if _, ok := v.Kind.(*structpb.Value_NumberValue); !ok { return fmt.Errorf("%w: invalid number value: %v", errdomain.ErrInvalidArgument, v) } variable[k] = data.NewNumberFromFloat(v.GetNumberValue()) + uploadingPipelineData[idx][k] = v.GetNumberValue() } case "array:integer": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx] = data.NewNumberFromFloat(val.(float64)) + uploadingDataArray := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i] = data.NewNumberFromFloat(val.(float64)) + uploadingDataArray[i] = val.(float64) } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + uploadingDataArray := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_NumberValue); !ok { return fmt.Errorf("%w: invalid number value: %v", errdomain.ErrInvalidArgument, val) } - array[idx] = data.NewNumberFromFloat(val.GetNumberValue()) + array[i] = data.NewNumberFromFloat(val.GetNumberValue()) + uploadingDataArray[i] = val.GetNumberValue() } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } case "number": if v == nil { variable[k] = data.NewNumberFromFloat(defaultValueMap[k].(float64)) + uploadingPipelineData[idx][k] = defaultValueMap[k].(float64) } else { if _, ok := v.Kind.(*structpb.Value_NumberValue); !ok { return fmt.Errorf("%w: invalid number value: %v", errdomain.ErrInvalidArgument, v) } variable[k] = data.NewNumberFromFloat(v.GetNumberValue()) + uploadingPipelineData[idx][k] = v.GetNumberValue() } case "array:number": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx] = data.NewNumberFromFloat(val.(float64)) + uploadingDataArray := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i] = data.NewNumberFromFloat(val.(float64)) + uploadingDataArray[i] = val.(float64) } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + uploadingDataArray := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_NumberValue); !ok { return fmt.Errorf("%w: invalid number value: %v", errdomain.ErrInvalidArgument, val) } - array[idx] = data.NewNumberFromFloat(val.GetNumberValue()) + array[i] = data.NewNumberFromFloat(val.GetNumberValue()) + uploadingDataArray[i] = val.GetNumberValue() } variable[k] = array + uploadingPipelineData[idx][k] = uploadingDataArray } case "image", "image/*": if v == nil { @@ -1056,6 +1096,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) @@ -1064,29 +1105,36 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:image", "array:image/*": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx], err = data.NewImageFromURL(ctx, s.binaryFetcher, val.(string)) + arrayWithURL := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i], err = data.NewImageFromURL(ctx, s.binaryFetcher, val.(string)) if err != nil { return err } + arrayWithURL[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + arrayWithURL := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx], err = data.NewImageFromURL(ctx, s.binaryFetcher, val.GetStringValue()) + array[i], err = data.NewImageFromURL(ctx, s.binaryFetcher, val.GetStringValue()) if err != nil { return err } + arrayWithURL[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } case "audio", "audio/*": if v == nil { @@ -1094,6 +1142,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) @@ -1102,29 +1151,36 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:audio", "array:audio/*": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx], err = data.NewAudioFromURL(ctx, s.binaryFetcher, val.(string)) + arrayWithURL := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i], err = data.NewAudioFromURL(ctx, s.binaryFetcher, val.(string)) if err != nil { return err } + arrayWithURL[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + arrayWithURL := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx], err = data.NewAudioFromURL(ctx, s.binaryFetcher, val.GetStringValue()) + array[i], err = data.NewAudioFromURL(ctx, s.binaryFetcher, val.GetStringValue()) if err != nil { return err } + arrayWithURL[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } case "video", "video/*": if v == nil { @@ -1132,6 +1188,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) @@ -1140,29 +1197,36 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:video", "array:video/*": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx], err = data.NewVideoFromURL(ctx, s.binaryFetcher, val.(string)) + arrayWithURL := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i], err = data.NewVideoFromURL(ctx, s.binaryFetcher, val.(string)) if err != nil { return err } + arrayWithURL[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + arrayWithURL := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx], err = data.NewVideoFromURL(ctx, s.binaryFetcher, val.GetStringValue()) + array[i], err = data.NewVideoFromURL(ctx, s.binaryFetcher, val.GetStringValue()) if err != nil { return err } + arrayWithURL[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } case "document": @@ -1171,6 +1235,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) @@ -1179,29 +1244,36 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:document": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx], err = data.NewDocumentFromURL(ctx, s.binaryFetcher, val.(string)) + arrayWithURL := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i], err = data.NewDocumentFromURL(ctx, s.binaryFetcher, val.(string)) if err != nil { return err } + arrayWithURL[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + arrayWithURL := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx], err = data.NewDocumentFromURL(ctx, s.binaryFetcher, val.GetStringValue()) + array[i], err = data.NewDocumentFromURL(ctx, s.binaryFetcher, val.GetStringValue()) if err != nil { return err } + arrayWithURL[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } case "file", "*/*": if v == nil { @@ -1209,6 +1281,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = defaultValueMap[k].(string) } else { if _, ok := v.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, v) @@ -1217,29 +1290,36 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if err != nil { return err } + uploadingPipelineData[idx][k] = v.GetStringValue() } case "array:file", "array:*/*": if v == nil { array := make(data.Array, len(defaultValueMap[k].([]any))) - for idx, val := range defaultValueMap[k].([]any) { - array[idx], err = data.NewBinaryFromURL(ctx, s.binaryFetcher, val.(string)) + arrayWithURL := make([]any, len(defaultValueMap[k].([]any))) + for i, val := range defaultValueMap[k].([]any) { + array[i], err = data.NewBinaryFromURL(ctx, s.binaryFetcher, val.(string)) if err != nil { return err } + arrayWithURL[i] = val.(string) } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } else { array := make(data.Array, len(v.GetListValue().Values)) - for idx, val := range v.GetListValue().Values { + arrayWithURL := make([]any, len(v.GetListValue().Values)) + for i, val := range v.GetListValue().Values { if _, ok := val.Kind.(*structpb.Value_StringValue); !ok { return fmt.Errorf("%w: invalid string value: %v", errdomain.ErrInvalidArgument, val) } - array[idx], err = data.NewBinaryFromURL(ctx, s.binaryFetcher, val.GetStringValue()) + array[i], err = data.NewBinaryFromURL(ctx, s.binaryFetcher, val.GetStringValue()) if err != nil { return err } + arrayWithURL[i] = val.GetStringValue() } variable[k] = array + uploadingPipelineData[idx][k] = arrayWithURL } case "semi-structured/*", "semi-structured/json", "json": @@ -1249,6 +1329,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, return err } variable[k] = jv + uploadingPipelineData[idx][k] = jv } else { switch v.Kind.(type) { case *structpb.Value_StructValue: @@ -1266,6 +1347,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, return err } variable[k] = jv + uploadingPipelineData[idx][k] = jv case *structpb.Value_ListValue: j := []any{} b, err := protojson.Marshal(v) @@ -1281,6 +1363,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, return err } variable[k] = jv + uploadingPipelineData[idx][k] = jv } } @@ -1289,6 +1372,23 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, return err } } + + requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) + + expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String()) + if err != nil { + return fmt.Errorf("get expiry rule tag: %w", err) + } + + err = s.uploadPipelineRunInputsToMinio(ctx, uploadPipelineRunInputsToMinioParam{ + pipelineTriggerID: pipelineTriggerID, + expiryRuleTag: expiryRuleTag, + pipelineData: uploadingPipelineData, + }) + if err != nil { + return fmt.Errorf("pipeline run inputs to minio: %w", err) + } + err = wfm.Set(ctx, idx, constant.SegVariable, variable) if err != nil { return err @@ -1881,8 +1981,8 @@ func (s *service) TriggerNamespacePipelineByID(ctx context.Context, ns resource. pipelineTriggerID: pipelineTriggerID, pipelineUID: pipelineUID, pipelineReleaseID: defaultPipelineReleaseID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) defer func() { if err != nil { @@ -1904,8 +2004,8 @@ func (s *service) TriggerNamespacePipelineByID(ctx context.Context, ns resource. pipelineID: dbPipeline.ID, pipelineUID: pipelineUID, pipelineTriggerID: pipelineTriggerID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }, returnTraces) if err != nil { return nil, nil, err @@ -1928,8 +2028,8 @@ func (s *service) TriggerAsyncNamespacePipelineByID(ctx context.Context, ns reso pipelineTriggerID: pipelineTriggerID, pipelineUID: dbPipeline.UID, pipelineReleaseID: defaultPipelineReleaseID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) defer func() { if err != nil { @@ -1950,8 +2050,8 @@ func (s *service) TriggerAsyncNamespacePipelineByID(ctx context.Context, ns reso pipelineID: dbPipeline.ID, pipelineUID: dbPipeline.UID, pipelineTriggerID: pipelineTriggerID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) if err != nil { return nil, err @@ -1979,8 +2079,8 @@ func (s *service) TriggerNamespacePipelineReleaseByID(ctx context.Context, ns re pipelineTriggerID: pipelineTriggerID, pipelineUID: pipelineUID, pipelineReleaseID: dbPipelineRelease.ID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) defer func() { if err != nil { @@ -2004,8 +2104,8 @@ func (s *service) TriggerNamespacePipelineReleaseByID(ctx context.Context, ns re pipelineReleaseID: dbPipelineRelease.ID, pipelineReleaseUID: dbPipelineRelease.UID, pipelineTriggerID: pipelineTriggerID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }, returnTraces) if err != nil { return nil, nil, err @@ -2033,8 +2133,8 @@ func (s *service) TriggerAsyncNamespacePipelineReleaseByID(ctx context.Context, pipelineTriggerID: pipelineTriggerID, pipelineUID: pipelineUID, pipelineReleaseID: dbPipelineRelease.ID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) defer func() { if err != nil { @@ -2057,8 +2157,8 @@ func (s *service) TriggerAsyncNamespacePipelineReleaseByID(ctx context.Context, pipelineReleaseID: dbPipelineRelease.ID, pipelineReleaseUID: dbPipelineRelease.UID, pipelineTriggerID: pipelineTriggerID, - requesterUID: uuid.FromStringOrNil(requesterUID), - userUID: uuid.FromStringOrNil(userUID), + requesterUID: requesterUID, + userUID: userUID, }) if err != nil { return nil, err diff --git a/pkg/service/pipelinerun.go b/pkg/service/pipelinerun.go index fd2fb784d..a10e42c1e 100644 --- a/pkg/service/pipelinerun.go +++ b/pkg/service/pipelinerun.go @@ -14,12 +14,14 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/constant" "github.com/instill-ai/pipeline-backend/pkg/datamodel" + "github.com/instill-ai/pipeline-backend/pkg/logger" "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/resource" runpb "github.com/instill-ai/protogen-go/common/run/v1alpha" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" + miniox "github.com/instill-ai/x/minio" resourcex "github.com/instill-ai/x/resource" ) @@ -91,9 +93,9 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns return nil, err } - isOwner := dbPipeline.OwnerUID().String() == requesterUID + isOwner := dbPipeline.OwnerUID() == requesterUID - pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsWithPermissions(ctx, requesterUID, dbPipeline.UID.String(), + pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsWithPermissions(ctx, requesterUID.String(), dbPipeline.UID.String(), page, pageSize, filter, orderBy, isOwner) if err != nil { return nil, fmt.Errorf("failed to get pipeline runs: %w", err) @@ -101,7 +103,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns var referenceIDs []string for _, pipelineRun := range pipelineRuns { - if CanViewPrivateData(pipelineRun.RequesterUID.String(), requesterUID) { + if canViewPrivateData(pipelineRun.RequesterUID, requesterUID) { for _, input := range pipelineRun.Inputs { referenceIDs = append(referenceIDs, input.Name) } @@ -152,7 +154,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns pbRun.RequesterId = *requesterID } - if CanViewPrivateData(run.RequesterUID.String(), requesterUID) { + if canViewPrivateData(run.RequesterUID, requesterUID) { if len(run.Inputs) == 1 { key := run.Inputs[0].Name pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key) @@ -211,9 +213,9 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu return nil, fmt.Errorf("failed to get pipeline by UID: %s. error: %s", dbPipelineRun.PipelineUID.String(), err.Error()) } - isOwner := dbPipeline.OwnerUID().String() == requesterUID + isOwner := dbPipeline.OwnerUID() == requesterUID - if !isOwner && requesterUID != dbPipelineRun.RequesterUID.String() { + if !isOwner && requesterUID != dbPipelineRun.RequesterUID { return nil, fmt.Errorf("requester is not pipeline owner/credit owner. they are not allowed to view these component runs") } @@ -224,7 +226,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu var referenceIDs []string for _, pipelineRun := range componentRuns { - if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID) { + if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) { for _, input := range pipelineRun.Inputs { referenceIDs = append(referenceIDs, input.Name) } @@ -253,7 +255,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu return nil, fmt.Errorf("failed to convert component run: %w", err) } - if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID) { + if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) { if len(run.Inputs) == 1 { key := run.Inputs[0].Name pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key) @@ -380,3 +382,47 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP PageSize: int32(pageSize), }, nil } + +type uploadPipelineRunInputsToMinioParam struct { + pipelineTriggerID string + expiryRuleTag string + pipelineData []map[string]any +} + +func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uploadPipelineRunInputsToMinioParam) error { + logger, _ := logger.GetZapLogger(ctx) + minioClient := s.minioClient + objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.pipelineTriggerID) + + logger.Info("uploadPipelineRunInputsToMinio started", + zap.String("objectName", objectName), + zap.Any("pipelineData", param.pipelineData), + ) + + url, objectInfo, err := minioClient.UploadFile(ctx, logger, &miniox.UploadFileParam{ + FilePath: objectName, + FileContent: param.pipelineData, + FileMimeType: constant.ContentTypeJSON, + ExpiryRuleTag: param.expiryRuleTag, + }) + if err != nil { + return fmt.Errorf("upload pipeline run inputs to minio: %w", err) + } + + inputs := datamodel.JSONB{{ + Name: objectInfo.Key, + Type: objectInfo.ContentType, + Size: objectInfo.Size, + URL: url, + }} + + err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs}) + if err != nil { + logger.Error("save pipeline run input data", zap.Error(err)) + return err + } + + logger.Info("uploadPipelineRunInputsToMinio finished") + + return nil +} diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 8dda24704..5938b271c 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math/rand" + "strings" "time" "github.com/gofrs/uuid" @@ -179,8 +180,7 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline return result, nil } -// CanViewPrivateData - only with requester ns could users see their input/output data -func CanViewPrivateData(namespace, requesterUID string) bool { +func canViewPrivateData(namespace, requesterUID uuid.UUID) bool { return namespace == requesterUID } @@ -228,3 +228,15 @@ func parseRecipeMetadata(ctx context.Context, metadataMap map[string][]byte, con dataSpec, _ := converter.GeneratePipelineDataSpec(dbRecipe.Variable, dbRecipe.Output, dbRecipe.Component) return pbStruct, dataSpec, nil } + +func isUnstructuredFormat(format string) bool { + if strings.HasPrefix(format, "array:") { + return format != "array:string" && + format != "array:number" && + format != "array:boolean" + } + return format != "string" && + format != "number" && + format != "boolean" && + format != "json" +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index dda62ab91..62c3e8c3c 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -1,16 +1,24 @@ package utils import ( + "context" + "fmt" + "net/url" "reflect" "strings" "time" + "github.com/gofrs/uuid" "github.com/influxdata/influxdb-client-go/v2/api/write" + "go.uber.org/zap" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/structpb" influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/resource" + "github.com/instill-ai/x/blobstorage" mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta" ) @@ -189,3 +197,76 @@ func StructToMap(s interface{}, tag string) map[string]interface{} { } return out } + +// They are same logic in the some components like Instill Artifact, Instill Model. +// We can extract this logic to the shared package. +// But for now, we keep it here because we want to avoid that the components depend on pipeline shared package. +func GetRequestMetadata(vars map[string]any) metadata.MD { + md := metadata.Pairs( + "Authorization", getHeaderAuthorization(vars), + "Instill-User-Uid", getInstillUserUID(vars), + "Instill-Auth-Type", "user", + ) + + if requester := getInstillRequesterUID(vars); requester != "" { + md.Set("Instill-Requester-Uid", requester) + } + return md +} + +func getHeaderAuthorization(vars map[string]any) string { + if v, ok := vars["__PIPELINE_HEADER_AUTHORIZATION"]; ok { + return v.(string) + } + return "" +} +func getInstillUserUID(vars map[string]any) string { + if v, ok := vars["__PIPELINE_USER_UID"]; ok { + switch uid := v.(type) { + case uuid.UUID: + return uid.String() + case string: + return uid + } + } + return "" +} + +func getInstillRequesterUID(vars map[string]any) string { + if v, ok := vars["__PIPELINE_REQUESTER_UID"]; ok { + switch uid := v.(type) { + case uuid.UUID: + return uid.String() + case string: + return uid + } + } + return "" +} + +// UploadBlobData uploads the blob data to the given upload URL. +func UploadBlobData(ctx context.Context, uploadURL string, fileContentType string, fileBytes []byte, logger *zap.Logger) error { + if uploadURL == "" { + return fmt.Errorf("empty upload URL provided") + } + + parsedURL, err := url.Parse(uploadURL) + if err != nil { + return fmt.Errorf("parsing upload URL: %w", err) + } + if config.Config.APIGateway.TLSEnabled { + parsedURL.Scheme = "https" + } else { + parsedURL.Scheme = "http" + } + parsedURL.Host = fmt.Sprintf("%s:%d", config.Config.APIGateway.Host, config.Config.APIGateway.PublicPort) + fullURL := parsedURL.String() + + err = blobstorage.UploadFile(ctx, logger, fullURL, fileBytes, fileContentType) + + if err != nil { + return fmt.Errorf("uploading blob: %w", err) + } + + return nil +} diff --git a/pkg/worker/main.go b/pkg/worker/main.go index bffb47562..0161dcbe7 100644 --- a/pkg/worker/main.go +++ b/pkg/worker/main.go @@ -44,7 +44,6 @@ type Worker interface { UpdatePipelineRunActivity(context.Context, *UpdatePipelineRunActivityParam) error UpsertComponentRunActivity(context.Context, *UpsertComponentRunActivityParam) error - UploadInputsToMinioActivity(context.Context, *UploadInputsToMinioActivityParam) error UploadOutputsToMinioActivity(context.Context, *UploadOutputsToMinioActivityParam) error UploadRecipeToMinioActivity(context.Context, *UploadRecipeToMinioActivityParam) error UploadComponentInputsActivity(context.Context, *ComponentActivityParam) error diff --git a/pkg/worker/minioactivity.go b/pkg/worker/minioactivity.go index 5309a66f5..e889501cc 100644 --- a/pkg/worker/minioactivity.go +++ b/pkg/worker/minioactivity.go @@ -15,59 +15,6 @@ import ( miniox "github.com/instill-ai/x/minio" ) -func (w *worker) UploadInputsToMinioActivity(ctx context.Context, param *UploadInputsToMinioActivityParam) error { - log := w.log.With(zap.String("PipelineTriggerID", param.PipelineTriggerID)) - log.Info("UploadInputsToMinioActivity started") - - wfm, err := w.memoryStore.GetWorkflowMemory(ctx, param.PipelineTriggerID) - if err != nil { - return err - } - - pipelineData := make([]*structpb.Struct, wfm.GetBatchSize()) - - for i := range wfm.GetBatchSize() { - val, err := wfm.GetPipelineData(ctx, i, memory.PipelineVariable) - if err != nil { - return err - } - varStr, err := val.ToStructValue() - if err != nil { - return err - } - pipelineData[i] = varStr.GetStructValue() - } - - objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.PipelineTriggerID) - - url, objectInfo, err := w.minioClient.UploadFile(ctx, log, &miniox.UploadFileParam{ - FilePath: objectName, - FileContent: pipelineData, - FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: param.ExpiryRuleTag, - }) - if err != nil { - log.Error("failed to upload pipeline run inputs to minio", zap.Error(err)) - return err - } - - inputs := datamodel.JSONB{{ - Name: objectInfo.Key, - Type: objectInfo.ContentType, - Size: objectInfo.Size, - URL: url, - }} - - err = w.repository.UpdatePipelineRun(ctx, param.PipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs}) - if err != nil { - log.Error("failed to save pipeline run input data", zap.Error(err)) - return err - } - - log.Info("UploadInputsToMinioActivity finished") - return nil -} - func (w *worker) UploadRecipeToMinioActivity(ctx context.Context, param *UploadRecipeToMinioActivityParam) error { log := w.log.With(zap.String("PipelineTriggerUID", param.PipelineTriggerID)) log.Info("UploadRecipeToMinioActivity started") diff --git a/pkg/worker/utils.go b/pkg/worker/utils.go index f4f31f377..bdfbe1f20 100644 --- a/pkg/worker/utils.go +++ b/pkg/worker/utils.go @@ -6,12 +6,10 @@ import ( "fmt" "strings" - "github.com/gofrs/uuid" "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/data" "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/utils" - "google.golang.org/grpc/metadata" ) func (w *worker) writeNewDataPoint(ctx context.Context, data utils.PipelineUsageMetricData) error { @@ -85,33 +83,3 @@ func setIteratorIndex(v format.Value, identifier string, index int) format.Value return v } } - -// They are same logic in the some components like Instill Artifact, Instill Model. -// We can extract this logic to the shared package. -// But for now, we keep it here because we want to avoid that the components depend on pipeline shared package. -func getRequestMetadata(vars map[string]any) metadata.MD { - md := metadata.Pairs( - "Authorization", getHeaderAuthorization(vars), - "Instill-User-Uid", getInstillUserUID(vars), - "Instill-Auth-Type", "user", - ) - - if requester := getInstillRequesterUID(vars); requester != "" { - md.Set("Instill-Requester-Uid", requester) - } - return md -} - -func getHeaderAuthorization(vars map[string]any) string { - if v, ok := vars["__PIPELINE_HEADER_AUTHORIZATION"]; ok { - return v.(string) - } - return "" -} -func getInstillUserUID(vars map[string]any) string { - return vars["__PIPELINE_USER_UID"].(uuid.UUID).String() -} - -func getInstillRequesterUID(vars map[string]any) string { - return vars["__PIPELINE_REQUESTER_UID"].(uuid.UUID).String() -} diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index 171eb7790..03c16292a 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "go/parser" - "net/url" "strings" "time" @@ -33,7 +32,6 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/recipe" "github.com/instill-ai/pipeline-backend/pkg/resource" "github.com/instill-ai/pipeline-backend/pkg/utils" - "github.com/instill-ai/x/blobstorage" "github.com/instill-ai/x/errmsg" componentbase "github.com/instill-ai/pipeline-backend/pkg/component/base" @@ -248,16 +246,7 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip }).Get(ctx, nil); err != nil { return err } - - err := workflow.ExecuteActivity(minioCtx, w.UploadInputsToMinioActivity, &UploadInputsToMinioActivityParam{ - PipelineTriggerID: param.SystemVariables.PipelineTriggerID, - ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, - }).Get(ctx, nil) - if err != nil { - logger.Error("Failed to upload pipeline run input", zap.Error(err)) - } - - err = workflow.ExecuteActivity(minioCtx, w.UploadRecipeToMinioActivity, &UploadRecipeToMinioActivityParam{ + err := workflow.ExecuteActivity(minioCtx, w.UploadRecipeToMinioActivity, &UploadRecipeToMinioActivityParam{ PipelineTriggerID: param.SystemVariables.PipelineTriggerID, ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, }).Get(ctx, nil) @@ -692,9 +681,12 @@ func (w *worker) uploadBlobDataAndGetDownloadURL(ctx context.Context, param *Com sysVarJSON := utils.StructToMap(param.SystemVariables, "json") - ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(sysVarJSON)) + ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) objectName := fmt.Sprintf("%s/%s", requesterID, value.Filename()) + + // TODO: We will need to add the expiry days for the blob data. + // This will be addressed in ins-6857 resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ NamespaceId: requesterID, ObjectName: objectName, @@ -707,7 +699,12 @@ func (w *worker) uploadBlobDataAndGetDownloadURL(ctx context.Context, param *Com uploadURL := resp.GetUploadUrl() - err = uploadBlobData(ctx, uploadURL, value, w.log) + fileBytes, err := value.Binary() + if err != nil { + return "", fmt.Errorf("getting file bytes: %w", err) + } + + err = utils.UploadBlobData(ctx, uploadURL, value.ContentType().String(), fileBytes.ByteArray(), w.log) if err != nil { return "", fmt.Errorf("upload blob data: %w", err) } @@ -723,38 +720,6 @@ func (w *worker) uploadBlobDataAndGetDownloadURL(ctx context.Context, param *Com return respDownloadURL.GetDownloadUrl(), nil } -func uploadBlobData(ctx context.Context, uploadURL string, value format.File, logger *zap.Logger) error { - if uploadURL == "" { - return fmt.Errorf("empty upload URL provided") - } - - parsedURL, err := url.Parse(uploadURL) - if err != nil { - return fmt.Errorf("parsing upload URL: %w", err) - } - if config.Config.APIGateway.TLSEnabled { - parsedURL.Scheme = "https" - } else { - parsedURL.Scheme = "http" - } - parsedURL.Host = fmt.Sprintf("%s:%d", config.Config.APIGateway.Host, config.Config.APIGateway.PublicPort) - fullURL := parsedURL.String() - contentType := value.ContentType().String() - fileBytes, err := value.Binary() - - if err != nil { - return fmt.Errorf("getting file bytes: %w", err) - } - - err = blobstorage.UploadFile(ctx, logger, fullURL, fileBytes.ByteArray(), contentType) - - if err != nil { - return fmt.Errorf("uploading blob: %w", err) - } - - return nil -} - // TODO: complete iterator // PreIteratorActivity generate the trigger memory for each iteration. func (w *worker) PreIteratorActivity(ctx context.Context, param *PreIteratorActivityParam) (*PreIteratorActivityResult, error) { diff --git a/pkg/worker/workflowmodel.go b/pkg/worker/workflowmodel.go index b2d48a6d7..4b15895aa 100644 --- a/pkg/worker/workflowmodel.go +++ b/pkg/worker/workflowmodel.go @@ -15,11 +15,6 @@ type UploadToMinioActivityResponse struct { ObjectInfo *minio.ObjectInfo } -type UploadInputsToMinioActivityParam struct { - PipelineTriggerID string - ExpiryRuleTag string -} - type UploadOutputsToMinioActivityParam struct { PipelineTriggerID string ExpiryRuleTag string