Skip to content

Commit

Permalink
Implement method to get a default example for each SDKs
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-avilov committed Jan 12, 2022
1 parent 5f3c17b commit 509b7ff
Show file tree
Hide file tree
Showing 10 changed files with 1,007 additions and 1,002 deletions.
15 changes: 14 additions & 1 deletion playground/api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ message GetPrecompiledObjectLogsRequest{
string cloud_path = 1;
}

// GetListOfPrecompiledObjectsResponse represent the map between sdk and categories for the sdk.
// GetDefaultPrecompiledObjectRequest contains information of the needed PrecompiledObject sdk.
message GetDefaultPrecompiledObjectRequest {
Sdk sdk = 1;
}

// GetPrecompiledObjectsResponse represent the map between sdk and categories for the sdk.
message GetPrecompiledObjectsResponse{
repeated Categories sdk_categories = 1;
}
Expand All @@ -204,6 +209,11 @@ message GetPrecompiledObjectLogsResponse {
string output = 1;
}

// GetDefaultPrecompiledObjectResponse represents the default PrecompiledObject and his category for the sdk.
message GetDefaultPrecompiledObjectResponse {
PrecompiledObject precompiled_object = 1;
}

service PlaygroundService {

// Submit the job for an execution and get the pipeline uuid.
Expand Down Expand Up @@ -244,4 +254,7 @@ service PlaygroundService {

// Get the logs of an PrecompiledObject.
rpc GetPrecompiledObjectLogs(GetPrecompiledObjectLogsRequest) returns (GetPrecompiledObjectLogsResponse);

// Get the default precompile object for the sdk.
rpc GetDefaultPrecompiledObject(GetDefaultPrecompiledObjectRequest) returns (GetDefaultPrecompiledObjectResponse);
}
27 changes: 26 additions & 1 deletion playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (controller *playgroundController) GetPrecompiledObjects(ctx context.Contex
// GetPrecompiledObjectCode returns the code of the specific example
func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Context, info *pb.GetPrecompiledObjectCodeRequest) (*pb.GetPrecompiledObjectCodeResponse, error) {
cd := cloud_bucket.New()
codeString, err := cd.GetPrecompiledObject(ctx, info.GetCloudPath())
codeString, err := cd.GetPrecompiledObjectCode(ctx, info.GetCloudPath())
if err != nil {
logger.Errorf("GetPrecompiledObjectCode(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("Error during getting Precompiled Object's code", "Error with cloud connection")
Expand Down Expand Up @@ -294,3 +294,28 @@ func (controller *playgroundController) GetPrecompiledObjectLogs(ctx context.Con
response := pb.GetPrecompiledObjectLogsResponse{Output: logs}
return &response, nil
}

// GetDefaultPrecompiledObject returns the default precompile object for sdk.
func (controller *playgroundController) GetDefaultPrecompiledObject(ctx context.Context, info *pb.GetDefaultPrecompiledObjectRequest) (*pb.GetDefaultPrecompiledObjectResponse, error) {
switch info.Sdk {
case pb.Sdk_SDK_UNSPECIFIED, pb.Sdk_SDK_SCIO:
logger.Errorf("GetDefaultPrecompiledObject(): unimplemented sdk: %s\n", info.Sdk)
return nil, errors.InvalidArgumentError("Error during preparing", "Sdk is not implemented yet: %s", info.Sdk.String())
}

bucket := cloud_bucket.New()
precompiledObject, err := bucket.GetDefaultPrecompileObject(ctx, info.Sdk, controller.env.ApplicationEnvs.WorkingDir())
if err != nil {
logger.Errorf("GetDefaultPrecompileObject(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("Error during getting default Precompiled Object", "Error with cloud connection")
}

response := pb.GetDefaultPrecompiledObjectResponse{PrecompiledObject: &pb.PrecompiledObject{
CloudPath: precompiledObject.CloudPath,
Name: precompiledObject.Name,
Description: precompiledObject.Description,
Type: precompiledObject.Type,
PipelineOptions: precompiledObject.PipelineOptions,
}}
return &response, nil
}
5 changes: 5 additions & 0 deletions playground/backend/configs/DEFAULT_EXAMPLES.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"SDK_JAVA": "SDK_JAVA/MinimalWordCount",
"SDK_GO": "SDK_GO/MinimalWordCount",
"SDK_PYTHON": "SDK_PYTHON/WordCountWithMetrics"
}
571 changes: 358 additions & 213 deletions playground/backend/internal/api/v1/api.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions playground/backend/internal/api/v1/api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 54 additions & 12 deletions playground/backend/internal/cloud_bucket/precompiled_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ import (
)

const (
BucketName = "playground-precompiled-objects"
OutputExtension = "output"
LogsExtension = "log"
MetaInfoName = "meta.info"
Timeout = time.Second * 10
javaExtension = "java"
goExtension = "go"
pyExtension = "py"
scioExtension = "scala"
separatorsNumber = 2
BucketName = "playground-precompiled-objects"
OutputExtension = "output"
LogsExtension = "log"
MetaInfoName = "meta.info"
Timeout = time.Second * 10
javaExtension = "java"
goExtension = "go"
pyExtension = "py"
scioExtension = "scala"
separatorsNumber = 2
defaultExamplesConfigName = "DEFAULT_EXAMPLES.json"
configFolderName = "configs"
)

type ObjectInfo struct {
Expand Down Expand Up @@ -93,8 +95,8 @@ func New() *CloudStorage {
return &CloudStorage{}
}

// GetPrecompiledObject returns the source code of the example
func (cd *CloudStorage) GetPrecompiledObject(ctx context.Context, precompiledObjectPath string) (string, error) {
// GetPrecompiledObjectCode returns the source code of the example
func (cd *CloudStorage) GetPrecompiledObjectCode(ctx context.Context, precompiledObjectPath string) (string, error) {
extension, err := getFileExtensionBySdk(precompiledObjectPath)
if err != nil {
return "", err
Expand Down Expand Up @@ -173,6 +175,28 @@ func (cd *CloudStorage) GetPrecompiledObjects(ctx context.Context, targetSdk pb.
return &precompiledObjects, nil
}

// GetDefaultPrecompileObject returns the default precompiled object for the sdk
func (cd *CloudStorage) GetDefaultPrecompileObject(ctx context.Context, targetSdk pb.Sdk, workingDir string) (*ObjectInfo, error) {
defaultExampleToSdk, err := getDefaultExamplesFromJson(workingDir)

infoPath := filepath.Join(defaultExampleToSdk[targetSdk.String()], MetaInfoName)
metaInfo, err := cd.getFileFromBucket(ctx, infoPath, "")
if err != nil {
return nil, err
}

precompiledObject := ObjectInfo{}
err = json.Unmarshal(metaInfo, &precompiledObject)
if err != nil {
logger.Errorf("json.Unmarshal: %v", err.Error())
return nil, err
}

precompiledObject.CloudPath = filepath.Dir(infoPath)

return &precompiledObject, nil
}

// getPrecompiledObjectsDirs finds directories with precompiled objects
// Since there is no notion of directory at cloud storage, then
// to avoid duplicates of a base path (directory) need to store it in a set/map.
Expand Down Expand Up @@ -267,6 +291,9 @@ func getFileExtensionBySdk(precompiledObjectPath string) (string, error) {

// getFullFilePath get full path to the precompiled object file
func getFullFilePath(objectDir string, extension string) string {
if extension == "" {
return objectDir
}
precompiledObjectName := filepath.Base(objectDir) //the base of the object's directory matches the name of the file
fileName := strings.Join([]string{precompiledObjectName, extension}, ".")
filePath := filepath.Join(objectDir, fileName)
Expand All @@ -288,3 +315,18 @@ func getSdkName(path string) string {
sdkName := strings.Split(path, string(os.PathSeparator))[0] // the path of the form "sdkName/example/", where the first part is sdkName
return sdkName
}

// getDefaultExamplesFromJson reads a json file that contains information about default examples for sdk and converts him to map
func getDefaultExamplesFromJson(workingDir string) (map[string]string, error) {
defaultExampleToSdk := map[string]string{}
configPath := filepath.Join(workingDir, configFolderName, defaultExamplesConfigName)
file, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
err = json.Unmarshal(file, &defaultExampleToSdk)
if err != nil {
return nil, err
}
return defaultExampleToSdk, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package cloud_bucket
import (
pb "beam.apache.org/playground/backend/internal/api/v1"
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"reflect"
"testing"
)

const (
precompiledObjectPath = "SDK_JAVA/MinimalWordCount"
targetSdk = pb.Sdk_SDK_UNSPECIFIED
defaultExamplesConfig = "{\n \"SDK_JAVA\": \"1\",\n \"SDK_GO\": \"2\",\n \"SDK_PYTHON\": \"3\"\n}"
)

var bucket *CloudStorage
Expand All @@ -35,6 +40,35 @@ func init() {
ctx = context.Background()
}

func TestMain(m *testing.M) {
err := setup()
if err != nil {
panic(fmt.Errorf("error during test setup: %s", err.Error()))
}
defer teardown()
m.Run()
}

func setup() error {
err := os.Mkdir(configFolderName, fs.ModePerm)
if err != nil {
return err
}
filePath := filepath.Join(configFolderName, defaultExamplesConfigName)
err = os.WriteFile(filePath, []byte(defaultExamplesConfig), 0600)
if err != nil {
return err
}
return nil
}

func teardown() {
err := os.RemoveAll(configFolderName)
if err != nil {
panic(fmt.Errorf("error during test setup: %s", err.Error()))
}
}

func Test_getFullFilePath(t *testing.T) {
type args struct {
examplePath string
Expand Down Expand Up @@ -250,6 +284,50 @@ func Benchmark_GetPrecompiledObjectOutput(b *testing.B) {

func Benchmark_GetPrecompiledObject(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = bucket.GetPrecompiledObject(ctx, precompiledObjectPath)
_, _ = bucket.GetPrecompiledObjectCode(ctx, precompiledObjectPath)
}
}

func Benchmark_GetDefaultPrecompileObject(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = bucket.GetDefaultPrecompileObject(ctx, targetSdk, "")
}
}

func Test_getDefaultExamplesFromJson(t *testing.T) {
expectedMap := map[string]string{"SDK_JAVA": "1", "SDK_GO": "2", "SDK_PYTHON": "3"}
type args struct {
workingDir string
}
tests := []struct {
name string
args args
want map[string]string
wantErr bool
}{
{
name: "get object from json",
args: args{workingDir: ""},
want: expectedMap,
wantErr: false,
},
{
name: "error if wrong json path",
args: args{workingDir: "Wrong_path"},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getDefaultExamplesFromJson(tt.args.workingDir)
if (err != nil) != tt.wantErr {
t.Errorf("getDefaultExamplesFromJson() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getDefaultExamplesFromJson() got = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 509b7ff

Please sign in to comment.