Skip to content

Commit

Permalink
feat(pipeline): add blob expiration time to run logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jvallesm committed Dec 13, 2024
1 parent 17ece56 commit 7e0dc1d
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 32 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ database:
host: pg-sql
port: 5432
name: pipeline
version: 38
version: 39
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
53 changes: 28 additions & 25 deletions pkg/datamodel/runlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,36 @@ func (j *JSONB) Scan(value interface{}) error {

// PipelineRun represents the metadata and execution details for each pipeline run.
type PipelineRun struct {
PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run
PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run
Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run
PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run
Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed)
Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds
RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run
RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run
RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed
Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline
PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run
PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run
Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run
PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run
Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed)
Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to complete the run in nanoseconds
RunnerUID uuid.UUID `gorm:"type:uuid" json:"runner-uid"` // Identity of the user who initiated the run
RequesterUID uuid.UUID `gorm:"type:uuid" json:"requester-uid"` // Namespace used for the run, which is the requester
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the run
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the run
RecipeSnapshot JSONB `gorm:"type:jsonb" json:"recipe-snapshot"` // Snapshot of the pipeline recipe used for this run
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time,omitempty"` // Time when the run started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time,omitempty"` // Time when the run completed
BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire.

Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the run failed
Components []ComponentRun `gorm:"foreignKey:PipelineTriggerUID;references:PipelineTriggerUID" json:"components"` // Execution details for each component in the pipeline
}

// ComponentRun represents the execution details of a single component within a pipeline run.
type ComponentRun struct {
PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun
ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component
Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component
PipelineTriggerUID uuid.UUID `gorm:"type:uuid;primaryKey;index" json:"pipeline-trigger-uid"` // Links to the parent PipelineRun
ComponentID string `gorm:"type:varchar(255);primaryKey" json:"component-id"` // Unique identifier for each pipeline component
Status RunStatus `gorm:"type:varchar(50);index" json:"status"` // Completion status of the component (e.g., Completed, Errored)
TotalDuration null.Int `gorm:"type:bigint" json:"total-duration"` // Time taken to execute the component in nanoseconds
StartedTime time.Time `gorm:"type:timestamp with time zone;index" json:"started-time"` // Time when the component started execution
CompletedTime null.Time `gorm:"type:timestamp with time zone;index" json:"completed-time"` // Time when the component finished execution
BlobDataExpirationTime null.Time `gorm:"type:timestamp with time zone" json:"blob-data-expiration-time,omitempty"` // Time when the blob data (e.g. input or recipe) will expire.
Error null.String `gorm:"type:text" json:"error-msg"` // Error message if the component failed
Inputs JSONB `gorm:"type:jsonb" json:"inputs"` // Input files for the component
Outputs JSONB `gorm:"type:jsonb" json:"outputs"` // Output files from the component
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE component_run DROP COLUMN IF EXISTS blob_data_expiration_time;
ALTER TABLE pipeline_run DROP COLUMN IF EXISTS blob_data_expiration_time;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE pipeline_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ;
ALTER TABLE component_run ADD COLUMN IF NOT EXISTS blob_data_expiration_time TIMESTAMPTZ;

COMMIT;
11 changes: 10 additions & 1 deletion pkg/service/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,16 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo
URL: url,
}}

err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs})
pipelineRunUpdate := &datamodel.PipelineRun{
Inputs: inputs,
}

if param.expiryRule.ExpirationDays > 0 {
blobExpiration := time.Now().UTC().AddDate(0, 0, param.expiryRule.ExpirationDays)
pipelineRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration)
}

err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, pipelineRunUpdate)
if err != nil {
logger.Error("save pipeline run input data", zap.Error(err))
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/service/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (s *service) convertPipelineRunToPB(run datamodel.PipelineRun) (*pipelinepb
if run.CompletedTime.Valid {
result.CompleteTime = timestamppb.New(run.CompletedTime.Time)
}
if run.BlobDataExpirationTime.Valid {
result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / codecov

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)

Check failure on line 144 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / codecov

result.BlobDataExpirationTime undefined (type *pipelinev1beta.PipelineRun has no field or method BlobDataExpirationTime)
}

return result, nil
}
Expand All @@ -160,6 +163,9 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline
if run.CompletedTime.Valid {
result.CompleteTime = timestamppb.New(run.CompletedTime.Time)
}
if run.BlobDataExpirationTime.Valid {
result.BlobDataExpirationTime = timestamppb.New(run.BlobDataExpirationTime.Time)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / codecov

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / lint

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)) (typecheck)

Check failure on line 167 in pkg/service/utils.go

View workflow job for this annotation

GitHub Actions / codecov

result.BlobDataExpirationTime undefined (type *pipelinev1beta.ComponentRun has no field or method BlobDataExpirationTime)
}

for _, fileReference := range run.Inputs {
result.InputsReference = append(result.InputsReference, &pipelinepb.FileReference{
Expand Down
12 changes: 11 additions & 1 deletion pkg/worker/minioactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/guregu/null.v4"

"github.com/instill-ai/pipeline-backend/pkg/constant"
"github.com/instill-ai/pipeline-backend/pkg/datamodel"
Expand Down Expand Up @@ -187,7 +189,15 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo
URL: url,
}}

err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, &datamodel.ComponentRun{Inputs: inputs})
componentRunUpdate := &datamodel.ComponentRun{
Inputs: inputs,
}

if param.SystemVariables.ExpiryRule.ExpirationDays > 0 {
blobExpiration := time.Now().UTC().AddDate(0, 0, param.SystemVariables.ExpiryRule.ExpirationDays)
componentRunUpdate.BlobDataExpirationTime = null.TimeFrom(blobExpiration)
}
err = w.repository.UpdateComponentRun(ctx, pipelineTriggerID, param.ID, componentRunUpdate)
if err != nil {
log.Error("failed to save pipeline run input data", zap.Error(err))
return err
Expand Down
4 changes: 0 additions & 4 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ type PostTriggerActivityParam struct {
SystemVariables recipe.SystemVariables
}

type UpsertPipelineRunActivityParam struct {
PipelineRun *datamodel.PipelineRun
}

type UpdatePipelineRunActivityParam struct {
PipelineTriggerID string
PipelineRun *datamodel.PipelineRun
Expand Down

0 comments on commit 7e0dc1d

Please sign in to comment.