Skip to content

Commit

Permalink
Merge b0f0d6a into 5c659aa
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Oct 18, 2023
2 parents 5c659aa + b0f0d6a commit 5e8af6e
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 162 deletions.
16 changes: 8 additions & 8 deletions flyteartifacts/artifact_config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
artifactsServer:
myTestValue: "test from file"
database:
postgres:
dbname: artifacts
logger:
level: 5
show-source: true
# This is an (incomplete) configure file for the artifact service, here just as an example.
#artifactsServer:
# database:
# postgres:
# dbname: your pg db
#logger:
# level: 5
# show-source: true
3 changes: 2 additions & 1 deletion flyteartifacts/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func main() {
rootCmd := sharedCmd.NewRootCmd("artifacts", server.GrpcRegistrationHook, server.HttpRegistrationHook)
migs := server.GetMigrations(ctx)
initializationSql := "create extension if not exists hstore;"
rootCmd.AddCommand(sharedCmd.NewMigrateCmd(migs, initializationSql))
dbConfig := server.GetDbConfig()
rootCmd.AddCommand(sharedCmd.NewMigrateCmd(migs, dbConfig, initializationSql))
err := rootCmd.ExecuteContext(ctx)
if err != nil {
panic(err)
Expand Down
4 changes: 2 additions & 2 deletions flyteartifacts/cmd/shared/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
)

// NewMigrateCmd represents the migrate command
func NewMigrateCmd(migs []*gormigrate.Migration, initializationSql string) *cobra.Command {
func NewMigrateCmd(migs []*gormigrate.Migration, databaseConfig *database.DbConfig, initializationSql string) *cobra.Command {
return &cobra.Command{
Use: "migrate",
Short: "This command will run all the migrations for the database",
RunE: func(cmd *cobra.Command, args []string) error {
return database.Migrate(context.Background(), migs, initializationSql)
return database.Migrate(context.Background(), databaseConfig, migs, initializationSql)
},
}
}
51 changes: 51 additions & 0 deletions flyteartifacts/pkg/blob/artifact_blob_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blob

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/ptypes/any"
)

type ArtifactBlobStore struct {
store *storage.DataStore
}

// OffloadArtifactCard stores the artifact card in the blob store
func (a *ArtifactBlobStore) OffloadArtifactCard(ctx context.Context, name, version string, card *any.Any) (storage.DataReference, error) {
uri, err := a.store.ConstructReference(ctx, a.store.GetBaseContainerFQN(ctx), name, version)
if err != nil {
return "", fmt.Errorf("failed to construct data reference for [%s/%s] with err: %v", name, version, err)
}
err = a.store.WriteProtobuf(ctx, uri, storage.Options{}, card)
if err != nil {
return "", fmt.Errorf("failed to write protobuf to %s with err: %v", uri, err)
}
return uri, nil
}

func (a *ArtifactBlobStore) RetrieveArtifactCard(ctx context.Context, uri storage.DataReference) (*any.Any, error) {
card := &any.Any{}
err := a.store.ReadProtobuf(ctx, uri, card)
if err != nil {
return nil, fmt.Errorf("failed to read protobuf from %s with err: %v", uri, err)
}
return nil, nil
}

func NewArtifactBlobStore(ctx context.Context, scope promutils.Scope) ArtifactBlobStore {
storageCfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration).ArtifactBlobStoreConfig
logger.Infof(ctx, "Initializing storage client with config [%+v]", storageCfg)

dataStorageClient, err := storage.NewDataStore(&storageCfg, scope)
if err != nil {
logger.Error(ctx, "Failed to initialize storage config")
panic(err)
}
return ArtifactBlobStore{
store: dataStorageClient,
}
}
7 changes: 6 additions & 1 deletion flyteartifacts/pkg/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package configuration
import (
"github.com/flyteorg/flyte/flytestdlib/config"
stdLibDb "github.com/flyteorg/flyte/flytestdlib/database"
stdLibStorage "github.com/flyteorg/flyte/flytestdlib/storage"
"time"
)

const artifactsServer = "artifactsServer"

type ApplicationConfiguration struct {
ArtifactDatabaseConfig stdLibDb.DbConfig `json:"artifactDatabaseConfig" pflag:",Database configuration"`
ArtifactDatabaseConfig stdLibDb.DbConfig `json:"artifactDatabaseConfig" pflag:",Database configuration"`
ArtifactBlobStoreConfig stdLibStorage.Config `json:"artifactBlobStoreConfig" pflag:",Blob store configuration"`
}

var defaultApplicationConfiguration = ApplicationConfiguration{
Expand All @@ -28,6 +30,9 @@ var defaultApplicationConfiguration = ApplicationConfiguration{
ExtraOptions: "sslmode=disable",
},
},
ArtifactBlobStoreConfig: stdLibStorage.Config{
InitContainer: "flyte-artifacts",
},
}

var ApplicationConfig = config.MustRegisterSection(artifactsServer, &defaultApplicationConfiguration)
1 change: 0 additions & 1 deletion flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type ArtifactKey struct {

type Artifact struct {
gorm.Model
// gatepr: this doesn't actually create a foreign key...
ArtifactKeyID uint
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
Expand Down
52 changes: 52 additions & 0 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package db

import (
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/jackc/pgx/v5/pgtype"
)

func PartitionsIdlToHstore(idlPartitions *core.Partitions) pgtype.Hstore {
if idlPartitions == nil || idlPartitions.GetValue() == nil {
return nil
}
var hstore = make(pgtype.Hstore)

for k, v := range idlPartitions.GetValue() {
sv := v.GetStaticValue()
hstore[k] = &sv
}
return hstore
}

func ServiceToGormModel(serviceModel models.Artifact) (Artifact, error) {
partitions := PartitionsIdlToHstore(serviceModel.Artifact.GetArtifactId().GetPartitions())

ga := Artifact{
ArtifactKey: ArtifactKey{
Project: serviceModel.Artifact.ArtifactId.ArtifactKey.Project,
Domain: serviceModel.Artifact.ArtifactId.ArtifactKey.Domain,
Name: serviceModel.Artifact.ArtifactId.ArtifactKey.Name,
},
Version: serviceModel.Artifact.ArtifactId.Version,
Partitions: partitions,

LiteralType: serviceModel.LiteralTypeBytes,
LiteralValue: serviceModel.LiteralValueBytes,
Description: serviceModel.Artifact.Spec.ShortDescription,
MetadataType: serviceModel.Artifact.Spec.MetadataType,
OffloadedUserMetadata: serviceModel.OffloadedMetadata,

ExecutionName: serviceModel.Artifact.Spec.Execution.Name,
}

if serviceModel.Artifact.Spec.TaskExecution != nil {
ga.TaskProject = serviceModel.Artifact.Spec.TaskExecution.TaskId.Project
ga.TaskDomain = serviceModel.Artifact.Spec.TaskExecution.TaskId.Domain
ga.TaskName = serviceModel.Artifact.Spec.TaskExecution.TaskId.Name
ga.TaskVersion = serviceModel.Artifact.Spec.TaskExecution.TaskId.Version
ga.RetryAttempt = &serviceModel.Artifact.Spec.TaskExecution.RetryAttempt
}

return ga, nil
}
19 changes: 11 additions & 8 deletions flyteartifacts/pkg/db/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand All @@ -17,25 +18,27 @@ type RDSStorage struct {
metrics gormMetrics
}

// WriteOne is a test function
func (r *RDSStorage) WriteOne(ctx context.Context, gormModel Artifact) (models.Artifact, error) {
// CreateArtifact helps implement StorageInterface
func (r *RDSStorage) CreateArtifact(ctx context.Context, serviceModel models.Artifact) (models.Artifact, error) {
timer := r.metrics.CreateDuration.Start()
logger.Debugf(ctx, "Attempt create artifact %s", gormModel.Version)
logger.Debugf(ctx, "Attempt create artifact [%s:%s]",
serviceModel.Artifact.ArtifactId.ArtifactKey.Name, serviceModel.Artifact.ArtifactId.Version)
gormModel, err := ServiceToGormModel(serviceModel)
if err != nil {
logger.Errorf(ctx, "Failed to convert service model to gorm model: %+v", err)
return models.Artifact{}, err
}
tx := r.db.Create(&gormModel)
timer.Stop()
if tx.Error != nil {
logger.Errorf(ctx, "Failed to create artifact %+v", tx.Error)
return models.Artifact{}, tx.Error
}
return models.Artifact{}, nil
}

// CreateArtifact helps implement StorageInterface
func (r *RDSStorage) CreateArtifact(context.Context, *models.Artifact) (models.Artifact, error) {
return models.Artifact{}, nil
}

func (r *RDSStorage) GetArtifact(ctx context.Context) (models.Artifact, error) {
func (r *RDSStorage) GetArtifact(ctx context.Context, query core.ArtifactQuery, details bool) (models.Artifact, error) {
return models.Artifact{}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions flyteartifacts/pkg/db/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestWriteOne(t *testing.T) {
p := pgtype.Hstore{
"area": &pval1,
}
//p := postgres.Hstore{"area": &pval1}

lt := &core.LiteralType{
Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER},
Expand Down Expand Up @@ -69,6 +68,6 @@ func TestWriteOne(t *testing.T) {
RetryAttempt: &one,
}

a, err := rds.WriteOne(ctx, gormA)
fmt.Println(a, err)
//a, err := rds.WriteOne(ctx, gormA)
//fmt.Println(a, err)
}
39 changes: 0 additions & 39 deletions flyteartifacts/pkg/models/artifact.go

This file was deleted.

11 changes: 11 additions & 0 deletions flyteartifacts/pkg/models/service_models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package models

import "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"

// Artifact is a wrapper object for easier handling of additional fields
type Artifact struct {
artifact.Artifact
OffloadedMetadata string
LiteralTypeBytes []byte
LiteralValueBytes []byte
}
9 changes: 8 additions & 1 deletion flyteartifacts/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package server
import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/blob"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flyteartifacts/pkg/db"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/go-gormigrate/gormigrate/v2"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -27,7 +29,8 @@ func NewArtifactService(ctx context.Context, scope promutils.Scope) *ArtifactSer
fmt.Println(cfg)

storage := db.NewStorage(ctx, scope.NewSubScope("storage:rds"))
coreService := NewCoreService(storage, scope.NewSubScope("server"))
blobStore := blob.NewArtifactBlobStore(ctx, scope.NewSubScope("storage:s3"))
coreService := NewCoreService(storage, &blobStore, scope.NewSubScope("server"))

return &ArtifactService{
Metrics: InitMetrics(scope),
Expand All @@ -54,3 +57,7 @@ func GrpcRegistrationHook(ctx context.Context, server *grpc.Server, scope promut
func GetMigrations(ctx context.Context) []*gormigrate.Migration {
return db.Migrations
}
func GetDbConfig() *database.DbConfig {
cfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration)
return &cfg.ArtifactDatabaseConfig
}
29 changes: 21 additions & 8 deletions flyteartifacts/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

type CoreService struct {
Storage StorageInterface
Storage StorageInterface
BlobStore BlobStoreInterface
// TriggerHandler TriggerHandlerInterface
// SearchHandler SearchHandlerInterface
}
Expand All @@ -17,20 +18,31 @@ func (c *CoreService) CreateArtifact(ctx context.Context, request *artifact.Crea
if request == nil {
return nil, nil
}
model, err := CreateArtifactModelFrom(request.ArtifactKey, request.Spec, request.Version, request.Partitions, request.Tag, request.Spec.Principal)

artifactObj, err := CreateArtifactModelFromRequest(ctx, request.ArtifactKey, request.Spec, request.Version, request.Partitions, request.Tag, request.Spec.Principal)
if err != nil {
logger.Errorf(ctx, "Failed to create artifact model from request: %v", err)
logger.Errorf(ctx, "Failed to validate Create request: %v", err)
return nil, err
}

created, err := c.Storage.CreateArtifact(ctx, &model)
// Offload the metadata object before storing and add the offload location instead.
if artifactObj.Spec.UserMetadata != nil {
offloadLocation, err := c.BlobStore.OffloadArtifactCard(ctx,
artifactObj.ArtifactId.ArtifactKey.Name, artifactObj.ArtifactId.Version, artifactObj.Spec.UserMetadata)
if err != nil {
logger.Errorf(ctx, "Failed to offload metadata: %v", err)
return nil, err
}
artifactObj.OffloadedMetadata = offloadLocation.String()
}

created, err := c.Storage.CreateArtifact(ctx, artifactObj)
if err != nil {
logger.Errorf(ctx, "Failed to create artifact: %v", err)
return nil, err
}
idl := FromModelToIdl(created)

return &artifact.CreateArtifactResponse{Artifact: &idl}, nil
return &artifact.CreateArtifactResponse{Artifact: &created.Artifact}, nil
}

func (c *CoreService) GetArtifact(ctx context.Context, request *artifact.GetArtifactRequest) (*artifact.GetArtifactResponse, error) {
Expand Down Expand Up @@ -67,8 +79,9 @@ func (c *CoreService) HandleCloudEvent(ctx context.Context, request *artifact.Cl
return &artifact.CloudEventResponse{}, nil
}

func NewCoreService(storage StorageInterface, _ promutils.Scope) CoreService {
func NewCoreService(storage StorageInterface, blobStore BlobStoreInterface, _ promutils.Scope) CoreService {
return CoreService{
Storage: storage,
Storage: storage,
BlobStore: blobStore,
}
}
Loading

0 comments on commit 5e8af6e

Please sign in to comment.