Skip to content

Commit

Permalink
Add fake metadata store and fix tests. (#958)
Browse files Browse the repository at this point in the history
* Add fake metadata store and fix tests.

Also, add instructions on how to build/run the backend with Bazel.

Note that the fake metadata store works, but I need to add proper tests
that exercise it. That'll be done in a separate PR.

One thing I'm missing here is how to make Bazel run well in Travis. I
will send a follow up PR for doing this.

* move select for update to the db interface
  • Loading branch information
neuromage authored and k8s-ci-robot committed Mar 21, 2019
1 parent 4839350 commit 02de9c5
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 11 deletions.
13 changes: 11 additions & 2 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ Pipelines backend.
All components can be built using [Bazel](https://bazel.build/). To build
everything under backend, run:
```
bazel build //backend/...
bazel build --action_env=PATH --define=grpc_no_ares=true //backend/...
```

To run all tests:
```
bazel test //backend/...
bazel test --action_env=PATH --define=grpc_no_ares=true //backend/...
```

The API server itself can only be built/tested using Bazel. The following commands target building and testing just the API server.
```
bazel build --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/...
```
```
bazel test --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/...
```


## Building Go client library and swagger files
After making changes to proto files, the Go client libraries and swagger
files need to be regenerated and checked-in. The backend/api/generate_api.sh
Expand Down
3 changes: 3 additions & 0 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//backend/api:go_default_library",
"//backend/src/apiserver/common:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"//backend/src/apiserver/metadata:go_default_library",
"//backend/src/apiserver/model:go_default_library",
"//backend/src/apiserver/storage:go_default_library",
"//backend/src/common/util:go_default_library",
Expand All @@ -25,6 +26,8 @@ go_library(
"@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go",
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
Expand Down
18 changes: 17 additions & 1 deletion backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package resource

import (
"ml_metadata/metadata_store/mlmetadata"
mlpb "ml_metadata/proto/metadata_store_go_proto"

workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/metadata"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
Expand Down Expand Up @@ -64,7 +68,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time, nil),
runStore: storage.NewRunStore(db, time, initFakeMetadataStore()),
workflowClientFake: NewWorkflowClientFake(),
resourceReferenceStore: storage.NewResourceReferenceStore(db),
dBStatusStore: storage.NewDBStatusStore(db),
Expand All @@ -75,6 +79,18 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
}, nil
}

func initFakeMetadataStore() *metadata.Store {
cfg := &mlpb.ConnectionConfig{
Config: &mlpb.ConnectionConfig_FakeDatabase{&mlpb.FakeDatabaseConfig{}},
}

mlmdStore, err := mlmetadata.NewStore(cfg)
if err != nil {
glog.Fatalf("Failed to create ML Metadata store: %v", err)
}
return metadata.NewStore(mlmdStore)
}

func NewFakeClientManagerOrFatal(time util.TimeInterface) *FakeClientManager {
uuid := util.NewFakeUUIDGeneratorOrFatal(DefaultFakeUUID, nil)
fakeStore, err := NewFakeClientManager(time, uuid)
Expand Down
12 changes: 12 additions & 0 deletions backend/src/apiserver/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type SQLDialect interface {

// Check whether the error is a SQL duplicate entry error or not
IsDuplicateError(err error) bool

// Modifies the SELECT clause in query to return one that locks the selected
// row for update.
SelectForUpdate(query string) string
}

// MySQLDialect implements SQLDialect with mysql dialect implementation.
Expand Down Expand Up @@ -103,6 +107,14 @@ func (d SQLiteDialect) Concat(exprs []string, separator string) string {
return strings.Join(exprs, separatorSQL)
}

func (d MySQLDialect) SelectForUpdate(query string) string {
return query + " FOR UPDATE"
}

func (d SQLiteDialect) SelectForUpdate(query string) string {
return query
}

func (d SQLiteDialect) IsDuplicateError(err error) bool {
sqlError, ok := err.(sqlite3.Error)
return ok && sqlError.Code == sqlite3.ErrConstraint
Expand Down
17 changes: 9 additions & 8 deletions backend/src/apiserver/storage/run_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,20 @@ func (s *RunStore) UpdateRun(runID string, condition string, workflowRuntimeMani
// new in the status of an Argo manifest. This means we need to keep track
// manually here on what the previously updated state of the run is, to ensure
// we do not add duplicate metadata. Hence the locking below.
row := tx.QueryRow("SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE", runID)
query := "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?"
query = s.db.SelectForUpdate(query)

row := tx.QueryRow(query, runID)
var storedManifest string
if err := row.Scan(&storedManifest); err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "failed to find row with run id %q", runID)
return util.NewInvalidInputError("Failed to update run %s. Row not found.", runID)
}

if s.metadataStore != nil {
if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil {
// Metadata storage failed. Log the error here, but continue to allow the run
// to be updated as per usual.
glog.Errorf("Failed to record output artifacts: %+v", err)
}
if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil {
// Metadata storage failed. Log the error here, but continue to allow the run
// to be updated as per usual.
glog.Errorf("Failed to record output artifacts: %+v", err)
}

sql, args, err := sq.
Expand Down

0 comments on commit 02de9c5

Please sign in to comment.