diff --git a/cmd/common/signal.go b/cmd/common/signal.go index 25c3ffe937..b1cce879fb 100644 --- a/cmd/common/signal.go +++ b/cmd/common/signal.go @@ -10,7 +10,7 @@ import ( func WatchSignals(ctx context.Context, cancel context.CancelFunc) <-chan struct{} { donech := make(chan struct{}) sigch := make(chan os.Signal, 1) - signal.Notify(sigch, syscall.SIGINT, syscall.SIGHUP) + signal.Notify(sigch, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM) go func() { defer close(donech) defer signal.Stop(sigch) diff --git a/glide.lock b/glide.lock index 0d3e7010b1..dad435d8c7 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: e0f1feb407bf2e04acf3c07f8eec028b143c5e22af83f84c9f0100b0b73c773c -updated: 2018-07-25T13:45:41.041433089-07:00 +hash: e9cbd88ff82355128afbcb6822163395b3e4441a86f0dd3e3ee25e7195d55734 +updated: 2018-07-25T14:52:08.920853867-07:00 imports: - name: github.com/boz/go-lifecycle version: c39961a5a0ce6b046f15d62bcbed79701666a9e0 @@ -7,6 +7,8 @@ imports: version: 9a2f9524024889e129a5422aca2cff73cb3eabf6 subpackages: - btcec +- name: github.com/caarlos0/env + version: 1cddc31c48c56ecd700d873edb9fd5b6f5df922a - name: github.com/davecgh/go-spew version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: @@ -485,4 +487,12 @@ imports: - pkg/util/proto - name: k8s.io/metrics version: 22022401af62d9a22b30b5b36a4227e60f404b17 + subpackages: + - pkg/apis/metrics + - pkg/apis/metrics/v1alpha1 + - pkg/apis/metrics/v1beta1 + - pkg/client/clientset/versioned + - pkg/client/clientset/versioned/scheme + - pkg/client/clientset/versioned/typed/metrics/v1alpha1 + - pkg/client/clientset/versioned/typed/metrics/v1beta1 testImports: [] diff --git a/glide.yaml b/glide.yaml index f0b2f989ae..5d11156a95 100644 --- a/glide.yaml +++ b/glide.yaml @@ -43,12 +43,10 @@ import: - trees/avltree - package: github.com/boz/go-lifecycle - package: k8s.io/client-go - # version: ^8.0.0 subpackages: - '...' - package: k8s.io/apiextensions-apiserver - package: k8s.io/apimachinery - # version: 'kubernetes-1.11.1' subpackages: - '...' - package: github.com/googleapis/gnostic @@ -64,3 +62,5 @@ import: - package: github.com/grpc-ecosystem/grpc-gateway version: ^1.4.1 - package: k8s.io/metrics +- package: github.com/caarlos0/env + version: ^3.3.0 diff --git a/provider/bidengine/config.go b/provider/bidengine/config.go new file mode 100644 index 0000000000..3880f98dda --- /dev/null +++ b/provider/bidengine/config.go @@ -0,0 +1,8 @@ +package bidengine + +type config struct { + FulfillmentCPUMax uint `env:"AKASH_PROVIDER_MAX_FULFILLMENT_CPU" envDefault:"1000"` + + FulfillmentMemPriceMin uint `env:"AKASH_PROVIDER_MEM_PRICE_MIN" envDefault:"50"` + FulfillmentMemPriceMax uint `env:"AKASH_PROVIDER_MEM_PRICE_MAX" envDefault:"150"` +} diff --git a/provider/bidengine/order.go b/provider/bidengine/order.go index d874d02808..41ae3cdb77 100644 --- a/provider/bidengine/order.go +++ b/provider/bidengine/order.go @@ -10,12 +10,14 @@ import ( "github.com/ovrclk/akash/provider/event" "github.com/ovrclk/akash/provider/session" "github.com/ovrclk/akash/types" + "github.com/ovrclk/akash/types/unit" "github.com/ovrclk/akash/util/runner" "github.com/tendermint/tmlibs/log" ) // order manages bidding and general lifecycle handling of an order. type order struct { + config config order types.OrderID fulfillment *types.Fulfillment @@ -41,6 +43,7 @@ func newOrder(e *service, oid types.OrderID, fulfillment *types.Fulfillment) (*o log := session.Log().With("order", oid) order := &order{ + config: e.config, order: oid, fulfillment: fulfillment, session: session, @@ -148,13 +151,10 @@ loop: group = result.Value().(*types.DeploymentGroup) - if !matchProviderAttributes(o.session.Provider().Attributes, group.Requirements) { - o.log.Debug("unable to fulfill: incompatible attributes") - break loop + if !o.shouldBid(group) { + break } - // TODO: check if price is too low - // Begin reserving resources from cluster. clusterch = runner.Do(func() runner.Result { return runner.NewResult(o.cluster.Reserve(o.order, group)) @@ -233,17 +233,65 @@ loop: } } -func (o *order) calculatePrice(resources types.ResourceList) uint64 { - max := o.resourcesMaxPrice(resources) - return uint64(rand.Int63n(int64(max)) + 1) +func (o *order) shouldBid(group *types.DeploymentGroup) bool { + + // does provider have required attributes? + if !matchProviderAttributes(o.session.Provider().Attributes, group.Requirements) { + o.log.Debug("unable to fulfill: incompatible attributes") + return false + } + + // TODO: catch overflow + var ( + cpu int64 + mem int64 + price int64 + ) + for _, rg := range group.GetResources() { + cpu += int64(rg.Unit.CPU * rg.Count) + mem += int64(rg.Unit.Memory * uint64(rg.Count)) + price += int64(rg.Price) + } + + // requesting too much cpu? + if cpu > int64(o.config.FulfillmentCPUMax) || cpu <= 0 { + o.log.Info("unable to fulfill: cpu request too high", + "cpu-requested", cpu) + return false + } + + // price max too low? + if price*unit.Gi < mem*int64(o.config.FulfillmentMemPriceMin) { + o.log.Info("unable to fulfill: price too low", + "max-price", price, + "min-price", mem*int64(o.config.FulfillmentMemPriceMin)/unit.Gi) + return false + } + + return true } -func (o *order) resourcesMaxPrice(resources types.ResourceList) uint64 { +func (o *order) calculatePrice(resources types.ResourceList) uint64 { // TODO: catch overflow - price := uint64(0) + var ( + mem int64 + rmax int64 + ) + for _, group := range resources.GetResources() { - price += group.Price + rmax += int64(group.Price) + mem += int64(group.Unit.Memory * uint64(group.Count)) + } + + cmin := uint64(float64(mem) * float64(o.config.FulfillmentMemPriceMin) / float64(unit.Gi)) + cmax := uint64(float64(mem) * float64(o.config.FulfillmentMemPriceMax) / float64(unit.Gi)) + + if cmax > uint64(rmax) { + cmax = uint64(rmax) + } + if cmax == 0 { + cmax = 1 } - o.log.Debug("group max price", "price", price) - return price + + return uint64(rand.Int63n(int64(cmax-cmin)) + int64(cmin)) } diff --git a/provider/bidengine/service.go b/provider/bidengine/service.go index c3f02bcfde..bdd19c3660 100644 --- a/provider/bidengine/service.go +++ b/provider/bidengine/service.go @@ -4,6 +4,7 @@ import ( "context" lifecycle "github.com/boz/go-lifecycle" + "github.com/caarlos0/env" "github.com/ovrclk/akash/provider/cluster" "github.com/ovrclk/akash/provider/event" "github.com/ovrclk/akash/provider/session" @@ -18,6 +19,11 @@ type Service interface { // Service handles bidding on orders. func NewService(ctx context.Context, session session.Session, cluster cluster.Cluster, bus event.Bus) (Service, error) { + config := config{} + if err := env.Parse(&config); err != nil { + return nil, err + } + sub, err := bus.Subscribe() if err != nil { return nil, err @@ -34,6 +40,7 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl session.Log().Info("found orders", "count", len(existingOrders)) s := &service{ + config: config, session: session, cluster: cluster, bus: bus, @@ -50,6 +57,7 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl } type service struct { + config config session session.Session cluster cluster.Cluster diff --git a/provider/bidengine/service_test.go b/provider/bidengine/service_test.go index 917fddca86..61610972be 100644 --- a/provider/bidengine/service_test.go +++ b/provider/bidengine/service_test.go @@ -2,6 +2,7 @@ package bidengine_test import ( "context" + "strconv" "testing" "github.com/ovrclk/akash/provider/bidengine" @@ -12,6 +13,7 @@ import ( "github.com/ovrclk/akash/testutil" txmocks "github.com/ovrclk/akash/txutil/mocks" "github.com/ovrclk/akash/types" + "github.com/ovrclk/akash/types/unit" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -25,7 +27,7 @@ func TestService(t *testing.T) { defer bus.Close() deployment := testutil.Deployment(testutil.Address(t), 1) - group := testutil.DeploymentGroups(deployment.Address, 2).Items[0] + group := makeDeploymentGroup(deployment.Address, 2) order := testutil.Order(deployment.Address, group.Seq, 3) provider := testutil.Provider(testutil.Address(t), 4) @@ -97,7 +99,7 @@ func TestService_Catchup(t *testing.T) { defer bus.Close() deployment := testutil.Deployment(testutil.Address(t), 1) - group := testutil.DeploymentGroups(deployment.Address, 2).Items[0] + group := makeDeploymentGroup(deployment.Address, 2) order := testutil.Order(deployment.Address, group.Seq, 3) provider := testutil.Provider(testutil.Address(t), 4) @@ -156,3 +158,32 @@ func TestService_Catchup(t *testing.T) { mock.AssertExpectationsForObjects(t, qclient, txclient, creso, cluster) } + +func makeDeploymentGroup(daddr []byte, nonce uint64) *types.DeploymentGroup { + runit := types.ResourceUnit{ + CPU: 500, + Memory: 256 * unit.Mi, + Disk: 5 * unit.Gi, + } + + rgroup := types.ResourceGroup{ + Unit: runit, + Count: 2, + Price: 35, + } + + pattr := types.ProviderAttribute{ + Name: "region", + Value: "us-west", + } + + return &types.DeploymentGroup{ + Name: strconv.FormatUint(nonce, 10), + DeploymentGroupID: types.DeploymentGroupID{ + Deployment: daddr, + Seq: nonce, + }, + Resources: []types.ResourceGroup{rgroup}, + Requirements: []types.ProviderAttribute{pattr}, + } +}