From 70c9eb6a3f49a4c854b32199a813950380f9af2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Vall=C3=A9s?= Date: Fri, 13 Dec 2024 15:58:26 +0100 Subject: [PATCH] feat(pipeline): add blob expiration time to run logs --- config/config.yaml | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/datamodel/runlogging.go | 53 ++++++++++--------- ..._add_pipeline_run_expiration_time.down.sql | 6 +++ ...39_add_pipeline_run_expiration_time.up.sql | 6 +++ pkg/service/pipelinerun.go | 11 +++- pkg/service/utils.go | 6 +++ pkg/worker/minioactivity.go | 12 ++++- pkg/worker/workflow.go | 4 -- 10 files changed, 71 insertions(+), 35 deletions(-) create mode 100644 pkg/db/migration/000039_add_pipeline_run_expiration_time.down.sql create mode 100644 pkg/db/migration/000039_add_pipeline_run_expiration_time.up.sql diff --git a/config/config.yaml b/config/config.yaml index bdeca5876..d98987169 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -26,7 +26,7 @@ database: host: pg-sql port: 5432 name: pipeline - version: 38 + version: 39 timezone: Etc/UTC pool: idleconnections: 5 diff --git a/go.mod b/go.mod index 1cafb71a1..e89912df5 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/h2non/filetype v1.1.3 github.com/iancoleman/strcase v0.3.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 github.com/itchyny/gojq v0.12.14 diff --git a/go.sum b/go.sum index ed447c78b..10f38902b 100644 --- a/go.sum +++ b/go.sum @@ -1285,8 +1285,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1:JYRfk/m+960jEScE1NPQM+xh+ScR4cHNMg/tCFwQbpI= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5 h1:5338ZeuB/C50P8aUOUKstjBSAQaWqmrdrAK2i9AbWk8= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so= diff --git a/pkg/datamodel/runlogging.go b/pkg/datamodel/runlogging.go index 7ff0a59d8..bddfe4342 100644 --- a/pkg/datamodel/runlogging.go +++ b/pkg/datamodel/runlogging.go @@ -64,33 +64,36 @@ func (j *JSONB) Scan(value interface{}) error { // PipelineRun represents the metadata and execution details for each pipeline run. type PipelineRun struct { - PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run - PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run - Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run - PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run - Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed) - Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API) - TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds - RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run - RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester - Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run - Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run - RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run - StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution - CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed - Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed - Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline + PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run + PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run + Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run + PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run + Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed) + Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API) + TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds + RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run + RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester + Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run + Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run + RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run + StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution + CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed + BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire. + + Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed + Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline } // ComponentRun represents the execution details of a single component within a pipeline run. type ComponentRun struct { - PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun - ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component - Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored) - TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds - StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution - CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution - Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed - Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component - Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component + PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun + ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component + Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored) + TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds + StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution + CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution + BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire. + Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed + Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component + Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component } diff --git a/pkg/db/migration/000039_add_pipeline_run_expiration_time.down.sql b/pkg/db/migration/000039_add_pipeline_run_expiration_time.down.sql new file mode 100644 index 000000000..f2c4594fb --- /dev/null +++ b/pkg/db/migration/000039_add_pipeline_run_expiration_time.down.sql @@ -0,0 +1,6 @@ +BEGIN; + +ALTER TABLE component_run DROP COLUMN IF EXISTS blob_data_expiration_time; +ALTER TABLE pipeline_run DROP COLUMN IF EXISTS blob_data_expiration_time; + +COMMIT; diff --git a/pkg/db/migration/000039_add_pipeline_run_expiration_time.up.sql b/pkg/db/migration/000039_add_pipeline_run_expiration_time.up.sql new file mode 100644 index 000000000..dc93c07ee --- /dev/null +++ b/pkg/db/migration/000039_add_pipeline_run_expiration_time.up.sql @@ -0,0 +1,6 @@ +BEGIN; + +ALTER TABLE pipeline_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ; +ALTER TABLE component_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ; + +COMMIT; diff --git a/pkg/service/pipelinerun.go b/pkg/service/pipelinerun.go index a10e42c1e..ac823dfee 100644 --- a/pkg/service/pipelinerun.go +++ b/pkg/service/pipelinerun.go @@ -416,7 +416,16 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo URL: url, }} - err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs}) + pipelineRunUpdate := &datamodel.PipelineRun{ + Inputs: inputs, + } + + if param.expiryRule.ExpirationDays > 0 { + blobExpiration := time.Now().UTC().AddDate(0, 0, param.expiryRule.ExpirationDays) + pipelineRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration) + } + + err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, pipelineRunUpdate) if err != nil { logger.Error("save pipeline run input data", zap.Error(err)) return err diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 5938b271c..278339303 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -140,6 +140,9 @@ func (s *service) convertPipelineRunToPB(run datamodel.PipelineRun) (*pipelinepb if run.CompletedTime.Valid { result.CompleteTime = timestamppb.New(run.CompletedTime.Time) } + if run.BlobDataExpirationTime.Valid { + result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time) + } return result, nil } @@ -160,6 +163,9 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline if run.CompletedTime.Valid { result.CompleteTime = timestamppb.New(run.CompletedTime.Time) } + if run.BlobDataExpirationTime.Valid { + result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time) + } for _, fileReference := range run.Inputs { result.InputsReference = append(result.InputsReference, &pipelinepb.FileReference{ diff --git a/pkg/worker/minioactivity.go b/pkg/worker/minioactivity.go index 3f6ecc82b..10590bfcc 100644 --- a/pkg/worker/minioactivity.go +++ b/pkg/worker/minioactivity.go @@ -4,10 +4,12 @@ import ( "context" "encoding/json" "fmt" + "time" "go.uber.org/zap" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/structpb" + "gopkg.in/guregu/null.v4" "github.com/instill-ai/pipeline-backend/pkg/constant" "github.com/instill-ai/pipeline-backend/pkg/datamodel" @@ -187,7 +189,15 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo URL: url, }} - err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, &datamodel.ComponentRun{Inputs: inputs}) + componentRunUpdate := &datamodel.ComponentRun{ + Inputs: inputs, + } + + if param.SystemVariables.ExpiryRule.ExpirationDays > 0 { + blobExpiration := time.Now().UTC().AddDate(0, 0, param.SystemVariables.ExpiryRule.ExpirationDays) + componentRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration) + } + err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, componentRunUpdate) if err != nil { log.Error("failed to save pipeline run input data", zap.Error(err)) return err diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index ca3b050d8..9b1e7b7b9 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -119,10 +119,6 @@ type PostTriggerActivityParam struct { SystemVariables recipe.SystemVariables } -type UpsertPipelineRunActivityParam struct { - PipelineRun *datamodel.PipelineRun -} - type UpdatePipelineRunActivityParam struct { PipelineTriggerID string PipelineRun *datamodel.PipelineRun