Skip to content

Commit

Permalink
feat(pipeline): add blob expiration time to run logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jvallesm committed Dec 13, 2024
1 parent 06ce84b commit 70c9eb6
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 35 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ database:
host: pg-sql
port: 5432
name: pipeline
version: 38
version: 39
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/h2non/filetype v1.1.3
github.com/iancoleman/strcase v0.3.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8
github.com/itchyny/gojq v0.12.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1:JYRfk/m+960jEScE1NPQM+xh+ScR4cHNMg/tCFwQbpI=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5 h1:5338ZeuB/C50P8aUOUKstjBSAQaWqmrdrAK2i9AbWk8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so=
Expand Down
53 changes: 28 additions & 25 deletions pkg/datamodel/runlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,36 @@ func (j *JSONB) Scan(value interface{}) error {

// PipelineRun represents the metadata and execution details for each pipeline run.
type PipelineRun struct {
PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run
PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run
Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run
PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run
Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed)
Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds
RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run
RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run
RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed
Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline
PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run
PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run
Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run
PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run
Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed)
Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds
RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run
RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run
RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed
BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire.

Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed
Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline
}

// ComponentRun represents the execution details of a single component within a pipeline run.
type ComponentRun struct {
PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun
ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component
Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component
PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun
ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component
Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution
BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire.
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE component_run DROP COLUMN IF EXISTS blob_data_expiration_time;
ALTER TABLE pipeline_run DROP COLUMN IF EXISTS blob_data_expiration_time;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE pipeline_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ;
ALTER TABLE component_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ;

COMMIT;
11 changes: 10 additions & 1 deletion pkg/service/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,16 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo
URL: url,
}}

err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs})
pipelineRunUpdate := &datamodel.PipelineRun{
Inputs: inputs,
}

if param.expiryRule.ExpirationDays > 0 {
blobExpiration := time.Now().UTC().AddDate(0, 0, param.expiryRule.ExpirationDays)
pipelineRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration)
}

err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, pipelineRunUpdate)
if err != nil {
logger.Error("save pipeline run input data", zap.Error(err))
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/service/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (s *service) convertPipelineRunToPB(run datamodel.PipelineRun) (*pipelinepb
if run.CompletedTime.Valid {
result.CompleteTime = timestamppb.New(run.CompletedTime.Time)
}
if run.BlobDataExpirationTime.Valid {
result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time)
}

return result, nil
}
Expand All @@ -160,6 +163,9 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline
if run.CompletedTime.Valid {
result.CompleteTime = timestamppb.New(run.CompletedTime.Time)
}
if run.BlobDataExpirationTime.Valid {
result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time)
}

for _, fileReference := range run.Inputs {
result.InputsReference = append(result.InputsReference, &pipelinepb.FileReference{
Expand Down
12 changes: 11 additions & 1 deletion pkg/worker/minioactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/guregu/null.v4"

"github.com/instill-ai/pipeline-backend/pkg/constant"
"github.com/instill-ai/pipeline-backend/pkg/datamodel"
Expand Down Expand Up @@ -187,7 +189,15 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo
URL: url,
}}

err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, &datamodel.ComponentRun{Inputs: inputs})
componentRunUpdate := &datamodel.ComponentRun{
Inputs: inputs,
}

if param.SystemVariables.ExpiryRule.ExpirationDays > 0 {

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / codecov

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 196 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / codecov

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)
blobExpiration := time.Now().UTC().AddDate(0, 0, param.SystemVariables.ExpiryRule.ExpirationDays)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule) (typecheck)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)) (typecheck)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule) (typecheck)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / lint

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)) (typecheck)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / codecov

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)

Check failure on line 197 in pkg/worker/minioactivity.go

View workflow job for this annotation

GitHub Actions / codecov

param.SystemVariables.ExpiryRule undefined (type recipe.SystemVariables has no field or method ExpiryRule)
componentRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration)
}
err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, componentRunUpdate)
if err != nil {
log.Error("failed to save pipeline run input data", zap.Error(err))
return err
Expand Down
4 changes: 0 additions & 4 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ type PostTriggerActivityParam struct {
SystemVariables recipe.SystemVariables
}

type UpsertPipelineRunActivityParam struct {
PipelineRun *datamodel.PipelineRun
}

type UpdatePipelineRunActivityParam struct {
PipelineTriggerID string
PipelineRun *datamodel.PipelineRun
Expand Down

0 comments on commit 70c9eb6

Please sign in to comment.