Skip to content

Commit

Permalink
provider: comment services
Browse files Browse the repository at this point in the history
  • Loading branch information
boz committed May 29, 2018
1 parent bd288af commit 1ab104d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 3 deletions.
28 changes: 27 additions & 1 deletion provider/bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/tendermint/tmlibs/log"
)

// order manages bidding and general lifecycle handling of an order.
type order struct {
order types.OrderID

Expand All @@ -29,6 +30,7 @@ type order struct {

func newOrder(e *service, ev *event.TxCreateOrder) (*order, error) {

// Create a subscription that will see all events that have not been read from e.sub.Events()
sub, err := e.sub.Clone()
if err != nil {
return nil, err
Expand All @@ -49,9 +51,13 @@ func newOrder(e *service, ev *event.TxCreateOrder) (*order, error) {
lc: lifecycle.New(),
}

// Shut down when parent begins shutting down
go order.lc.WatchChannel(e.lc.ShuttingDown())

// Run main loop in separate thread.
go order.run()

// Notify parent of completion (allows drain).
go func() {
<-order.lc.Done()
e.drainch <- order
Expand All @@ -66,6 +72,17 @@ func (o *order) run() {
ctx, cancel := context.WithCancel(context.Background())

var (
// channels for async calculations.

// NOTE: these can/should all be done in a single operation such as
// go func(){
// group, err := getGroup()
// reservation, err := getGerservation()
// bid, err := createBid()
// }()
// But we'd want to be able to cancel in the middle of operations
// and short-circuit if necessary.

groupch <-chan runner.Result
clusterch <-chan runner.Result
bidch <-chan runner.Result
Expand All @@ -75,7 +92,7 @@ func (o *order) run() {
price uint32
)

// fetch group details
// Begin fetching group details immediately.
groupch = runner.Do(func() runner.Result {
return runner.NewResult(
o.session.Query().DeploymentGroup(ctx, o.order.GroupID()))
Expand Down Expand Up @@ -124,6 +141,8 @@ loop:
}

case result := <-groupch:
// Group details fetched.

groupch = nil

if result.Error() != nil {
Expand All @@ -136,6 +155,7 @@ loop:
// TODO: match requirements
// TODO: check if price is too low

// Begin reserving resources from cluster.
clusterch = runner.Do(func() runner.Result {
return runner.NewResult(o.cluster.Reserve(o.order, group))
})
Expand All @@ -148,10 +168,13 @@ loop:
break loop
}

// Resources reservied. Calculate price and bid.

reservation = result.Value().(cluster.Reservation)

price := o.calculatePrice(reservation.Group())

// Begin submitting fulfillment
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.TX().BroadcastTxCommit(&types.TxCreateFulfillment{
FulfillmentID: types.FulfillmentID{
Expand All @@ -170,6 +193,8 @@ loop:
o.log.Error("submitting fulfillment", "err", result.Error())
break loop
}

// Fulfillment placed. All done.
}
}

Expand All @@ -179,6 +204,7 @@ loop:
o.lc.ShutdownInitiated(nil)
o.sub.Close()

// Wait for all runners to complete.
if groupch != nil {
<-groupch
}
Expand Down
4 changes: 3 additions & 1 deletion provider/bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Service interface {
Done() <-chan struct{}
}

// Service handles bidding on orders.
func NewService(ctx context.Context, session session.Session, cluster cluster.Cluster, bus event.Bus) (Service, error) {

sub, err := bus.Subscribe()
Expand Down Expand Up @@ -77,6 +78,7 @@ loop:
// new order
opath := keys.OrderID(ev.OrderID).Path()

// create an order object for managing the bid process and order lifecycle
order, err := newOrder(s, ev)

if err != nil {
Expand All @@ -94,7 +96,7 @@ loop:
}
}

// drain
// drain: wait for all order monitors to complete.
for len(s.orders) > 0 {
delete(s.orders, <-s.drainch)
}
Expand Down
1 change: 1 addition & 0 deletions provider/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Cluster interface {
Reserve(types.OrderID, *types.DeploymentGroup) (Reservation, error)
}

// Manage compute cluster for the provider. Will eventually integrate with kubernetes, etc...
type Service interface {
Cluster
Close() error
Expand Down
18 changes: 18 additions & 0 deletions provider/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@ var ErrNotRunning = errors.New("not running")

type Event interface{}

// Bus is an async event bus that allows subscriptions to behave as a bus themselves.
// When an event is published, it is sent to all subscribers asynchronously - a subscriber
// cannot block other subscribers.
//
// NOTE: this should probably be in util/event or something (not in provider/event)
type Bus interface {
Publish(Event) error
Subscribe() (Subscriber, error)
Close()
Done() <-chan struct{}
}

// Subscriber emits events it sees on the channel returned by Events().
// A Clone() of a subscriber will emit all events that have not been emitted
// from the cloned subscriber. This is important so that events are not missed
// when adding subscribers for sub-components (see `provider/bidengine/{service,order}.go`)
type Subscriber interface {
Events() <-chan Event
Clone() (Subscriber, error)
Expand Down Expand Up @@ -99,9 +108,13 @@ loop:
for {

if b.eventch != nil && len(b.evbuf) > 0 {
// If we're emitting events (Subscriber mode) and there
// are events to emit, set up the output channel and output
// event accordingly.
outch = b.eventch
curev = b.evbuf[0]
} else {
// otherwise block the output (sending to a nil channel always blocks)
outch = nil
}

Expand All @@ -111,15 +124,18 @@ loop:
break loop

case outch <- curev:
// Event was emitted. Shrink current event buffer.
b.evbuf = b.evbuf[1:]

case ev := <-b.pubch:
// publish event

// Buffer event.
if b.eventch != nil {
b.evbuf = append(b.evbuf, ev)
}

// Publish to children.
for sub, _ := range b.subscriptions {
sub.Publish(ev)
}
Expand Down Expand Up @@ -153,6 +169,8 @@ loop:
}

func newSubscriber(parent *bus) *bus {
// Re-use bus struct, but populate output channel (eventch)
// to enable subscriber mode.

evbuf := make([]Event, len(parent.evbuf))
copy(evbuf, parent.evbuf)
Expand Down
4 changes: 4 additions & 0 deletions provider/event/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
)

type (
// Transactions needed for provider services. May not be necessary - they
// originally had more data/functionality but it was removed for simplicity.

TxCreateOrder = types.TxCreateOrder
TxCreateFulfillment = types.TxCreateFulfillment
TxCreateLease = types.TxCreateLease
TxCloseDeployment = types.TxCloseDeployment
TxCloseFulfillment = types.TxCloseFulfillment
)

// Wrap tendermint event bus - publish events from tendermint bus to our bus implementation.
func MarketplaceTxPublisher(ctx context.Context, log log.Logger, tmbus tmtmtypes.EventBusSubscriber, bus Bus) (marketplace.Monitor, error) {
handler := MarketplaceTxHandler(bus)
return marketplace.NewMonitor(ctx, log, tmbus, "tx-publisher", handler, marketplace.TxQuery())
Expand Down
24 changes: 23 additions & 1 deletion provider/manifest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Service interface {
Done() <-chan struct{}
}

// Manage incoming leases and manifests and pair the two together to construct and emit a ManifestReceived event.
func NewHandler(ctx context.Context, session session.Session, bus event.Bus) (Service, error) {
sub, err := bus.Subscribe()
if err != nil {
Expand Down Expand Up @@ -61,6 +62,11 @@ type handler struct {
lc lifecycle.Lifecycle
}

// Track manifest state.
// We may get a lease first or a manifest first.
// In either case we need to both wait for the other and also download the deployment
// to perform proper validation. Once all three components are received and validated,
// emit a ManifestReceived event to be consumed by other components.
type manifestState struct {
request *types.ManifestRequest
leaseID *types.LeaseID
Expand All @@ -80,13 +86,18 @@ type manifestRequest struct {
ch chan<- error
}

// Send incoming manifest request.
// TODO: This method should only return once validation has completed (or timeout condition)
// TODO: Add context.Context argument and/or timeout.
func (h *handler) HandleManifest(mreq *types.ManifestRequest) error {
ch := make(chan error, 1)
req := manifestRequest{mreq, ch}
select {
case h.mreqch <- req:
// Request received by service. Read and return response.
return <-ch
case <-h.lc.ShuttingDown():
// Service is shutting down; return error.
return ErrNotRunning
}
}
Expand All @@ -112,6 +123,9 @@ loop:
case ev := <-h.sub.Events():
switch ev := ev.(type) {
case event.LeaseWon:

// We won a lease. Look up state, add LeaseID, check state for completion.

did := ev.LeaseID.Deployment
mstate := h.getManifestState(did)

Expand All @@ -130,7 +144,7 @@ loop:
}

case req := <-h.mreqch:
// new manifest received
// Manifest received. Look up state, add ManifestRequest, check state for completion.

did := req.value.Deployment
mstate := h.getManifestState(did)
Expand All @@ -146,6 +160,7 @@ loop:
req.ch <- nil

case req := <-h.deploymentch:
// Deployment fetched. This should only happen if a lease was won or a manifest received.

if err := req.Error(); err != nil {
h.session.Log().Error("fetching deployment", "err", err)
Expand Down Expand Up @@ -173,6 +188,8 @@ loop:
}
}
cancel()

// Wait for all deployment fetches to complete.
h.wg.Wait()
}

Expand All @@ -191,6 +208,8 @@ func (h *handler) getManifestState(did base.Bytes) *manifestState {
func (h *handler) checkManifestState(ctx context.Context, mstate *manifestState, did base.Bytes) {
if mstate.complete() {

// If all information has been received, emit ManifestReceived event.

// TODO: validate manifest

// publish complete manifest
Expand All @@ -203,6 +222,7 @@ func (h *handler) checkManifestState(ctx context.Context, mstate *manifestState,
return
}

// If deployment has not begun to be fetched, begin fetching it now.
if mstate.deployment == nil && !mstate.deploymentPending {
mstate.deploymentPending = true
h.fetchDeployment(ctx, did)
Expand All @@ -218,7 +238,9 @@ func (h *handler) fetchDeployment(ctx context.Context, key base.Bytes) {
res, err := h.session.Query().Deployment(ctx, key)
select {
case h.deploymentch <- runner.NewResult(res, err):
// Result sent to service; do nothing else.
case <-h.lc.ShuttingDown():
// Service is shutting down; do nothing else.
}
}()
}
4 changes: 4 additions & 0 deletions provider/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Service interface {
Done() <-chan struct{}
}

// Simple wrapper around various services needed for running a provider.
func NewService(ctx context.Context, session session.Session, bus event.Bus) (Service, error) {

ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -92,16 +93,19 @@ func (s *service) ManifestHandler() manifest.Handler {
func (s *service) run() {
defer s.lc.ShutdownCompleted()

// Wait for any service to finish
select {
case <-s.lc.ShutdownRequest():
case <-s.cluster.Done():
case <-s.bidengine.Done():
case <-s.manifest.Done():
}

// Shut down all services
s.lc.ShutdownInitiated(nil)
s.cancel()

// Wait for all services to finish
<-s.cluster.Done()
<-s.bidengine.Done()
<-s.manifest.Done()
Expand Down

0 comments on commit 1ab104d

Please sign in to comment.