Skip to content

Commit

Permalink
Implement distributed scheduler building block (#562)
Browse files Browse the repository at this point in the history
* feat: add jobs/scheduling api (with validation override)

Signed-off-by: mikeee <[email protected]>

* chore: fix deps

Signed-off-by: mikeee <[email protected]>

* fix: use cli fix

Signed-off-by: mikeee <[email protected]>

* fix: ci artifact path set for cli build

Signed-off-by: mikeee <[email protected]>

* chore: remove sidecar step

Signed-off-by: mikeee <[email protected]>

* chore: revert changes to other examples

Signed-off-by: mikeee <[email protected]>

---------

Signed-off-by: mikeee <[email protected]>
Signed-off-by: Mike Nguyen <[email protected]>
  • Loading branch information
mikeee authored Jul 17, 2024
1 parent 01c0f31 commit 9675705
Show file tree
Hide file tree
Showing 19 changed files with 488 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-on-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive

- name: Setup
uses: actions/setup-go@v5
Expand Down
40 changes: 26 additions & 14 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ jobs:
outputs:
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }}
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
GITHUB_SHA: ${{ steps.outputs.outputs.GITHUB_SHA }}
steps:
- name: Parse repository_dispatch payload
if: github.event_name == 'repository_dispatch'
Expand Down Expand Up @@ -79,9 +81,6 @@ jobs:
echo "DAPR_CLI_VER=$CLI_VERSION" >> $GITHUB_ENV
echo "Found $CLI_VERSION"
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}

- name: Checkout Dapr CLI repo to override dapr command.
uses: actions/checkout@v4
if: env.DAPR_CLI_REF != ''
Expand All @@ -105,17 +104,19 @@ jobs:
cd cli
make
mkdir -p $HOME/artifacts/$GITHUB_SHA/
sudo cp dist/linux_amd64/release/dapr $HOME/artifacts/$GITHUB_SHA/dapr
sudo cp dist/linux_amd64/release/dapr ~/artifacts/$GITHUB_SHA/dapr
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
echo "DAPR_CLI_REF=$DAPR_CLI_REF" >> $GITHUB_ENV
- name: Build daprd and placement with referenced commit.
- name: Build dapr
if: env.DAPR_REF != ''
run: |
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
cd dapr_runtime
make
mkdir -p $HOME/artifacts/$GITHUB_SHA/
cp dist/linux_amd64/release/daprd $HOME/artifacts/$GITHUB_SHA/daprd
cp dist/linux_amd64/release/placement $HOME/artifacts/$GITHUB_SHA/placement
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
cp ./dist/linux_amd64/release/* ~/artifacts/$GITHUB_SHA/
- name: Upload dapr-artifacts
uses: actions/upload-artifact@v4
Expand All @@ -132,10 +133,12 @@ jobs:
run: |
echo "DAPR_INSTALL_URL=$DAPR_INSTALL_URL"
echo "DAPR_CLI_VER=$DAPR_CLI_VER" >> "$GITHUB_OUTPUT"
echo "DAPR_CLI_REF=$DAPR_CLI_REF" >> "$GITHUB_OUTPUT"
echo "DAPR_RUNTIME_VER=$DAPR_RUNTIME_VER" >> "$GITHUB_OUTPUT"
echo "CHECKOUT_REPO=$CHECKOUT_REPO" >> "$GITHUB_OUTPUT"
echo "CHECKOUT_REF=$CHECKOUT_REF" >> "$GITHUB_OUTPUT"
echo "DAPR_REF=$DAPR_REF" >> "$GITHUB_OUTPUT"
echo "GITHUB_SHA=$GITHUB_SHA" >> "$GITHUB_OUTPUT"
validate-example:
needs: setup
Expand All @@ -148,10 +151,11 @@ jobs:
DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }}
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }}
DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }}
CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ needs.setup.outputs.CHECKOUT_REF }}
GITHUB_SHA: ${{ needs.setup.outputs.GITHUB_SHA }}

strategy:
fail-fast: false
Expand All @@ -161,6 +165,7 @@ jobs:
"actor",
"configuration",
"crypto",
"dist-scheduler",
"grpc-service",
"hello-world",
"pubsub",
Expand All @@ -187,7 +192,11 @@ jobs:
uses: actions/download-artifact@v4
with:
name: dapr-artifacts
path: $HOME/artifacts/$GITHUB_SHA/
path: ~/artifacts/${{ env.GITHUB_SHA }}/

- name: Display artifacts downloaded
if: env.DAPR_CLI_REF != '' || env.DAPR_REF != ''
run: ls ~/artifacts/$GITHUB_SHA/

- name: Set up Go
id: setup-go
Expand All @@ -196,6 +205,7 @@ jobs:
go-version-file: "go.mod"

- name: Set up Dapr CLI
if: env.DAPR_CLI_VER != ''
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}

- name: Override dapr cli with referenced commit.
Expand All @@ -208,13 +218,15 @@ jobs:
dapr uninstall --all
dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
- name: Override daprd and placement service with referenced commit.
- name: Print scheduler logs
run: |
docker logs dapr_scheduler
- name: Override daprd with referenced commit.
if: env.DAPR_REF != ''
run: |
mkdir -p $HOME/.dapr/bin/
cp $HOME/artifacts/$GITHUB_SHA/daprd $HOME/.dapr/bin/daprd
docker stop dapr_placement
$HOME/artifacts/$GITHUB_SHA/placement --healthz-port 9091 &
- name: Set up Python ${{ env.PYTHON_VER }}
uses: actions/setup-python@v5
Expand Down
9 changes: 9 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ type Client interface {
// RaiseEventWorkflowBeta1 raises an event for a workflow.
RaiseEventWorkflowBeta1(ctx context.Context, req *RaiseEventWorkflowRequest) error

// ScheduleJobAlpha1 creates and schedules a job.
ScheduleJobAlpha1(ctx context.Context, req *Job) error

// GetJobAlpha1 returns a scheduled job.
GetJobAlpha1(ctx context.Context, name string) (*Job, error)

// DeleteJobAlpha1 deletes a scheduled job.
DeleteJobAlpha1(ctx context.Context, name string) error

// GrpcClient returns the base grpc client if grpc is used and nil otherwise
GrpcClient() pb.DaprClient

Expand Down
27 changes: 27 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,33 @@ func (s *testDaprServer) RaiseEventWorkflowBeta1(ctx context.Context, in *pb.Rai
return &emptypb.Empty{}, nil
}

func (s *testDaprServer) ScheduleJobAlpha1(ctx context.Context, in *pb.ScheduleJobRequest) (*pb.ScheduleJobResponse, error) {
return &pb.ScheduleJobResponse{}, nil
}

func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest) (*pb.GetJobResponse, error) {
var (
schedule = "@every 10s"
dueTime = "10s"
repeats uint32 = 4
ttl = "10s"
)
return &pb.GetJobResponse{
Job: &pb.Job{
Name: "name",
Schedule: &schedule,
Repeats: &repeats,
DueTime: &dueTime,
Ttl: &ttl,
Data: nil,
},
}, nil
}

func (s *testDaprServer) DeleteJobAlpha1(ctx context.Context, in *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error) {
return &pb.DeleteJobResponse{}, nil
}

func TestGrpcClient(t *testing.T) {
protoClient := pb.NewDaprClient(nil)
client := &GRPCClient{protoClient: protoClient}
Expand Down
91 changes: 91 additions & 0 deletions client/scheduling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"log"

"google.golang.org/protobuf/types/known/anypb"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

type Job struct {
Name string
Schedule string
Repeats uint32 // Optional
DueTime string // Optional
TTL string // Optional
Data *anypb.Any
}

// ScheduleJobAlpha1 raises and schedules a job.
func (c *GRPCClient) ScheduleJobAlpha1(ctx context.Context, job *Job) error {
// TODO: Assert job fields are defined: Name, Schedule, Data
jobRequest := &pb.Job{
Name: job.Name,
Schedule: &job.Schedule,
Data: job.Data,
}

if job.Schedule != "" {
jobRequest.Schedule = &job.Schedule
}

if job.Repeats != 0 {
jobRequest.Repeats = &job.Repeats
}

if job.DueTime != "" {
jobRequest.DueTime = &job.DueTime
}

if job.TTL != "" {
jobRequest.Ttl = &job.TTL
}
_, err := c.protoClient.ScheduleJobAlpha1(ctx, &pb.ScheduleJobRequest{
Job: jobRequest,
})
return err
}

// GetJobAlpha1 retrieves a scheduled job.
func (c *GRPCClient) GetJobAlpha1(ctx context.Context, name string) (*Job, error) {
// TODO: Name validation
resp, err := c.protoClient.GetJobAlpha1(ctx, &pb.GetJobRequest{
Name: name,
})
log.Println(resp)
if err != nil {
return nil, err
}
return &Job{
Name: resp.GetJob().GetName(),
Schedule: resp.GetJob().GetSchedule(),
Repeats: resp.GetJob().GetRepeats(),
DueTime: resp.GetJob().GetDueTime(),
TTL: resp.GetJob().GetTtl(),
Data: resp.GetJob().GetData(),
}, nil
}

// DeleteJobAlpha1 deletes a scheduled job.
func (c *GRPCClient) DeleteJobAlpha1(ctx context.Context, name string) error {
// TODO: Name validation
_, err := c.protoClient.DeleteJobAlpha1(ctx, &pb.DeleteJobRequest{
Name: name,
})
return err
}
58 changes: 58 additions & 0 deletions client/scheduling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"
)

func TestSchedulingAlpha1(t *testing.T) {
ctx := context.Background()

t.Run("schedule job - valid", func(t *testing.T) {
err := testClient.ScheduleJobAlpha1(ctx, &Job{
Name: "test",
Schedule: "test",
Data: &anypb.Any{},
})

require.NoError(t, err)
})

t.Run("get job - valid", func(t *testing.T) {
expected := &Job{
Name: "name",
Schedule: "@every 10s",
Repeats: 4,
DueTime: "10s",
TTL: "10s",
Data: nil,
}

resp, err := testClient.GetJobAlpha1(ctx, "name")
require.NoError(t, err)
assert.Equal(t, expected, resp)
})

t.Run("delete job - valid", func(t *testing.T) {
err := testClient.DeleteJobAlpha1(ctx, "name")

require.NoError(t, err)
})
}
43 changes: 43 additions & 0 deletions examples/dist-scheduler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Dapr Distributed Scheduler Example with go-sdk

## Steps

### Prepare

- Dapr installed (v1.14 or higher)

### Run Distributed Scheduling Example

<!-- STEP
name: Run Distributed Scheduling Example
output_match_mode: substring
expected_stdout_lines:
- 'Scheduler stream connected'
- 'schedulejob - success'
- 'job 0 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'job 1 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'job 2 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'getjob - resp: &{prod-db-backup @every 1s 10 value:"{\"task\":\"db-backup\",\"metadata\":{\"db_name\":\"my-prod-db\",\"backup_location\":\"/backup-dir\"}}"}'
- 'deletejob - success'
background: true
sleep: 30
-->

```bash
dapr run --app-id=distributed-scheduler \
--metrics-port=9091 \
--scheduler-host-address=localhost:50006 \
--dapr-grpc-port 50001 \
--app-port 50070 \
--app-protocol grpc \
--log-level debug \
go run ./main.go

```

<!-- END_STEP -->
11 changes: 11 additions & 0 deletions examples/dist-scheduler/api/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package api

type Metadata struct {
DBName string `json:"db_name"`
BackupLocation string `json:"backup_location"`
}

type DBBackup struct {
Task string `json:"task"`
Metadata Metadata `json:"metadata"`
}
Loading

0 comments on commit 9675705

Please sign in to comment.