Skip to content

Commit

Permalink
bidengine: bid parameters
Browse files Browse the repository at this point in the history
* skip orders greater than one cpu
* skip orders with max price too low
* price according to memory usage

fixes #291
fixes #295
  • Loading branch information
boz committed Jul 25, 2018
1 parent 538f565 commit f8b95ef
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/common/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func WatchSignals(ctx context.Context, cancel context.CancelFunc) <-chan struct{} {
donech := make(chan struct{})
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGHUP)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)
go func() {
defer close(donech)
defer signal.Stop(sigch)
Expand Down
14 changes: 12 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ import:
- trees/avltree
- package: github.com/boz/go-lifecycle
- package: k8s.io/client-go
# version: ^8.0.0
subpackages:
- '...'
- package: k8s.io/apiextensions-apiserver
- package: k8s.io/apimachinery
# version: 'kubernetes-1.11.1'
subpackages:
- '...'
- package: github.com/googleapis/gnostic
Expand All @@ -64,3 +62,5 @@ import:
- package: github.com/grpc-ecosystem/grpc-gateway
version: ^1.4.1
- package: k8s.io/metrics
- package: github.com/caarlos0/env
version: ^3.3.0
8 changes: 8 additions & 0 deletions provider/bidengine/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package bidengine

type config struct {
FulfillmentCPUMax uint `env:"AKASH_PROVIDER_MAX_FULFILLMENT_CPU" envDefault:"1000"`

FulfillmentMemPriceMin uint `env:"AKASH_PROVIDER_MEM_PRICE_MIN" envDefault:"50"`
FulfillmentMemPriceMax uint `env:"AKASH_PROVIDER_MEM_PRICE_MAX" envDefault:"150"`
}
74 changes: 61 additions & 13 deletions provider/bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/ovrclk/akash/provider/event"
"github.com/ovrclk/akash/provider/session"
"github.com/ovrclk/akash/types"
"github.com/ovrclk/akash/types/unit"
"github.com/ovrclk/akash/util/runner"
"github.com/tendermint/tmlibs/log"
)

// order manages bidding and general lifecycle handling of an order.
type order struct {
config config
order types.OrderID
fulfillment *types.Fulfillment

Expand All @@ -41,6 +43,7 @@ func newOrder(e *service, oid types.OrderID, fulfillment *types.Fulfillment) (*o
log := session.Log().With("order", oid)

order := &order{
config: e.config,
order: oid,
fulfillment: fulfillment,
session: session,
Expand Down Expand Up @@ -148,13 +151,10 @@ loop:

group = result.Value().(*types.DeploymentGroup)

if !matchProviderAttributes(o.session.Provider().Attributes, group.Requirements) {
o.log.Debug("unable to fulfill: incompatible attributes")
break loop
if !o.shouldBid(group) {
break
}

// TODO: check if price is too low

// Begin reserving resources from cluster.
clusterch = runner.Do(func() runner.Result {
return runner.NewResult(o.cluster.Reserve(o.order, group))
Expand Down Expand Up @@ -233,17 +233,65 @@ loop:
}
}

func (o *order) calculatePrice(resources types.ResourceList) uint64 {
max := o.resourcesMaxPrice(resources)
return uint64(rand.Int63n(int64(max)) + 1)
func (o *order) shouldBid(group *types.DeploymentGroup) bool {

// does provider have required attributes?
if !matchProviderAttributes(o.session.Provider().Attributes, group.Requirements) {
o.log.Debug("unable to fulfill: incompatible attributes")
return false
}

// TODO: catch overflow
var (
cpu int64
mem int64
price int64
)
for _, rg := range group.GetResources() {
cpu += int64(rg.Unit.CPU * rg.Count)
mem += int64(rg.Unit.Memory * uint64(rg.Count))
price += int64(rg.Price)
}

// requesting too much cpu?
if cpu > int64(o.config.FulfillmentCPUMax) || cpu <= 0 {
o.log.Info("unable to fulfill: cpu request too high",
"cpu-requested", cpu)
return false
}

// price max too low?
if price*unit.Gi < mem*int64(o.config.FulfillmentMemPriceMin) {
o.log.Info("unable to fulfill: price too low",
"max-price", price,
"min-price", mem*int64(o.config.FulfillmentMemPriceMin)/unit.Gi)
return false
}

return true
}

func (o *order) resourcesMaxPrice(resources types.ResourceList) uint64 {
func (o *order) calculatePrice(resources types.ResourceList) uint64 {
// TODO: catch overflow
price := uint64(0)
var (
mem int64
rmax int64
)

for _, group := range resources.GetResources() {
price += group.Price
rmax += int64(group.Price)
mem += int64(group.Unit.Memory * uint64(group.Count))
}

cmin := uint64(float64(mem) * float64(o.config.FulfillmentMemPriceMin) / float64(unit.Gi))
cmax := uint64(float64(mem) * float64(o.config.FulfillmentMemPriceMax) / float64(unit.Gi))

if cmax > uint64(rmax) {
cmax = uint64(rmax)
}
if cmax == 0 {
cmax = 1
}
o.log.Debug("group max price", "price", price)
return price

return uint64(rand.Int63n(int64(cmax-cmin)) + int64(cmin))
}
8 changes: 8 additions & 0 deletions provider/bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

lifecycle "github.com/boz/go-lifecycle"
"github.com/caarlos0/env"
"github.com/ovrclk/akash/provider/cluster"
"github.com/ovrclk/akash/provider/event"
"github.com/ovrclk/akash/provider/session"
Expand All @@ -18,6 +19,11 @@ type Service interface {
// Service handles bidding on orders.
func NewService(ctx context.Context, session session.Session, cluster cluster.Cluster, bus event.Bus) (Service, error) {

config := config{}
if err := env.Parse(&config); err != nil {
return nil, err
}

sub, err := bus.Subscribe()
if err != nil {
return nil, err
Expand All @@ -34,6 +40,7 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl
session.Log().Info("found orders", "count", len(existingOrders))

s := &service{
config: config,
session: session,
cluster: cluster,
bus: bus,
Expand All @@ -50,6 +57,7 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl
}

type service struct {
config config
session session.Session
cluster cluster.Cluster

Expand Down
35 changes: 33 additions & 2 deletions provider/bidengine/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bidengine_test

import (
"context"
"strconv"
"testing"

"github.com/ovrclk/akash/provider/bidengine"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/ovrclk/akash/testutil"
txmocks "github.com/ovrclk/akash/txutil/mocks"
"github.com/ovrclk/akash/types"
"github.com/ovrclk/akash/types/unit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,7 +27,7 @@ func TestService(t *testing.T) {
defer bus.Close()

deployment := testutil.Deployment(testutil.Address(t), 1)
group := testutil.DeploymentGroups(deployment.Address, 2).Items[0]
group := makeDeploymentGroup(deployment.Address, 2)
order := testutil.Order(deployment.Address, group.Seq, 3)
provider := testutil.Provider(testutil.Address(t), 4)

Expand Down Expand Up @@ -97,7 +99,7 @@ func TestService_Catchup(t *testing.T) {
defer bus.Close()

deployment := testutil.Deployment(testutil.Address(t), 1)
group := testutil.DeploymentGroups(deployment.Address, 2).Items[0]
group := makeDeploymentGroup(deployment.Address, 2)
order := testutil.Order(deployment.Address, group.Seq, 3)
provider := testutil.Provider(testutil.Address(t), 4)

Expand Down Expand Up @@ -156,3 +158,32 @@ func TestService_Catchup(t *testing.T) {

mock.AssertExpectationsForObjects(t, qclient, txclient, creso, cluster)
}

func makeDeploymentGroup(daddr []byte, nonce uint64) *types.DeploymentGroup {
runit := types.ResourceUnit{
CPU: 500,
Memory: 256 * unit.Mi,
Disk: 5 * unit.Gi,
}

rgroup := types.ResourceGroup{
Unit: runit,
Count: 2,
Price: 35,
}

pattr := types.ProviderAttribute{
Name: "region",
Value: "us-west",
}

return &types.DeploymentGroup{
Name: strconv.FormatUint(nonce, 10),
DeploymentGroupID: types.DeploymentGroupID{
Deployment: daddr,
Seq: nonce,
},
Resources: []types.ResourceGroup{rgroup},
Requirements: []types.ProviderAttribute{pattr},
}
}

0 comments on commit f8b95ef

Please sign in to comment.