Skip to content
This repository was archived by the owner on Aug 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #278 from adamdecaf/allow-waiting-for-flush-routes
Browse files Browse the repository at this point in the history
internal/filetransfer: allow synchronous waiting for flush routes
  • Loading branch information
adamdecaf authored Oct 2, 2019
2 parents c88519a + 99f38c0 commit 2d6f192
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func setupFileTransferController(logger log.Logger, controller *filetransfer.Con
return cancelFileSync
}

flushIncoming, flushOutgoing := make(chan struct{}, 1), make(chan struct{}, 1) // buffered channels to allow only one concurrent operation
flushIncoming, flushOutgoing := make(filetransfer.FlushChan, 1), make(filetransfer.FlushChan, 1) // buffered channels to allow only one concurrent operation

// start our controller's operations in an anon goroutine
go controller.StartPeriodicFileOperations(ctx, flushIncoming, flushOutgoing, depRepo, transferRepo)
Expand Down
2 changes: 2 additions & 0 deletions docs/ach.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ There are endpoints for just inbound or outbound file processing:
- `POST /files/flush/incoming`
- `POST /files/flush/outgoing`

Note: The query parameter `?wait` can be added onto any endpoint to hold the HTTP response until file operations are done. This has the potential of returning a timeout however, and the file operations will continue.

Note: These endpoints currently return no information in the HTTP response and instead inspect paygate's logs for details.

### Returned ACH Files
Expand Down
60 changes: 49 additions & 11 deletions internal/filetransfer/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,95 @@ package filetransfer
import (
"fmt"
"net/http"
"time"

"github.com/moov-io/base/admin"
moovhttp "github.com/moov-io/base/http"
"github.com/moov-io/paygate/internal/util"

"github.com/go-kit/kit/log"
)

func AddFileTransferSyncRoute(logger log.Logger, svc *admin.Server, flushIncoming chan struct{}, flushOutgoing chan struct{}) {
func AddFileTransferSyncRoute(logger log.Logger, svc *admin.Server, flushIncoming FlushChan, flushOutgoing FlushChan) {
svc.AddHandler("/files/flush/incoming", flushIncomingFiles(logger, flushIncoming))
svc.AddHandler("/files/flush/outgoing", flushOutgoingFiles(logger, flushOutgoing))
svc.AddHandler("/files/flush", flushFiles(logger, flushIncoming, flushOutgoing))
}

// flushIncomingFiles will download inbound and return files and then process them
func flushIncomingFiles(logger log.Logger, flushIncoming chan struct{}) http.HandlerFunc {
func flushIncomingFiles(logger log.Logger, flushIncoming FlushChan) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
moovhttp.Problem(w, fmt.Errorf("unsupported HTTP verb %s", r.Method))
return
}

flushIncoming <- struct{}{} // send a message on the channel to trigger async routine

req := maybeWaiter(r)
flushIncoming <- req
if err := maybeWait(w, req); err == util.ErrTimeout {
return
}
w.WriteHeader(http.StatusOK)
}
}

// flushOutgoingFiles will merge and upload outbound files
func flushOutgoingFiles(logger log.Logger, flushOutgoing chan struct{}) http.HandlerFunc {
func flushOutgoingFiles(logger log.Logger, flushOutgoing FlushChan) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
moovhttp.Problem(w, fmt.Errorf("unsupported HTTP verb %s", r.Method))
return
}

flushOutgoing <- struct{}{} // send a message on the channel to trigger async routine

req := maybeWaiter(r)
flushOutgoing <- req
if err := maybeWait(w, req); err == util.ErrTimeout {
return
}
w.WriteHeader(http.StatusOK)
}
}

func flushFiles(logger log.Logger, flushIncoming chan struct{}, flushOutgoing chan struct{}) http.HandlerFunc {
func flushFiles(logger log.Logger, flushIncoming FlushChan, flushOutgoing FlushChan) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
moovhttp.Problem(w, fmt.Errorf("unsupported HTTP verb %s", r.Method))
return
}

flushIncoming <- struct{}{}
flushOutgoing <- struct{}{}

reqIncoming, reqOutgoing := maybeWaiter(r), maybeWaiter(r)
flushIncoming <- reqIncoming
flushOutgoing <- reqOutgoing
if err := maybeWait(w, reqIncoming); err == util.ErrTimeout {
return
}
if err := maybeWait(w, reqOutgoing); err == util.ErrTimeout {
return
}
w.WriteHeader(http.StatusOK)
}
}

func maybeWaiter(r *http.Request) *periodicFileOperationsRequest {
if _, exists := r.URL.Query()["wait"]; exists {
return &periodicFileOperationsRequest{
waiter: make(chan struct{}, 1),
}
}
return &periodicFileOperationsRequest{}
}

func maybeWait(w http.ResponseWriter, req *periodicFileOperationsRequest) error {
if req.waiter != nil {
err := util.Timeout(func() error {
<-req.waiter // wait for a response from StartPeriodicFileOperations
return nil
}, 30*time.Second)

if err == util.ErrTimeout {
moovhttp.Problem(w, err)
return err
}
}
return nil
}
43 changes: 40 additions & 3 deletions internal/filetransfer/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package filetransfer

import (
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/moov-io/base/admin"
Expand All @@ -22,7 +24,7 @@ func TestFlushIncomingFiles(t *testing.T) {
}(t)
defer svc.Shutdown()

flushIncoming := make(chan struct{}, 1)
flushIncoming := make(FlushChan, 1)
AddFileTransferSyncRoute(log.NewNopLogger(), svc, flushIncoming, nil)

// invalid request, wrong HTTP verb
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestFlushOutgoingFiles(t *testing.T) {
}(t)
defer svc.Shutdown()

flushOutgoing := make(chan struct{}, 1)
flushOutgoing := make(FlushChan, 1)
AddFileTransferSyncRoute(log.NewNopLogger(), svc, nil, flushOutgoing)

// invalid request, wrong HTTP verb
Expand Down Expand Up @@ -112,7 +114,7 @@ func TestFlushFilesUpload(t *testing.T) {
}(t)
defer svc.Shutdown()

flushIncoming, flushOutgoing := make(chan struct{}, 1), make(chan struct{}, 1) // buffered channel
flushIncoming, flushOutgoing := make(FlushChan, 1), make(FlushChan, 1) // buffered channel
AddFileTransferSyncRoute(log.NewNopLogger(), svc, flushIncoming, flushOutgoing)

req, err := http.NewRequest("POST", "http://"+svc.BindAddr()+"/files/flush", nil)
Expand Down Expand Up @@ -149,3 +151,38 @@ func TestFlushFilesUpload(t *testing.T) {
t.Errorf("bogus HTTP status: %d", resp.StatusCode)
}
}

func TestFlush__maybeWaiter(t *testing.T) {
u, _ := url.Parse("http://localhost/files/flush?wait")

req := maybeWaiter(&http.Request{URL: u})
if req == nil {
t.Fatal("nil periodicFileOperationsRequest")
}
if req.waiter == nil {
t.Fatal("nil waiter")
}

// expect a nil waiter now
u, _ = url.Parse("http://localhost/files/flush")
req = maybeWaiter(&http.Request{URL: u})
if req == nil {
t.Fatal("nil periodicFileOperationsRequest")
}
if req.waiter != nil {
t.Fatal("expected nil waiter")
}
}

func TestFlush__maybeWait(t *testing.T) {
req := &periodicFileOperationsRequest{
waiter: make(chan struct{}, 1),
}
w := httptest.NewRecorder()
go func() {
req.waiter <- struct{}{} // signal completion
}()
if err := maybeWait(w, req); err != nil {
t.Error(err)
}
}
45 changes: 29 additions & 16 deletions internal/filetransfer/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,38 +171,61 @@ func (c *Controller) findTransferType(routingNumber string) string {
return "unknown"
}

type FlushChan chan *periodicFileOperationsRequest

type periodicFileOperationsRequest struct {
// waiter is an optional channel to signal when the file operations
// are completed. This is used to hold HTTP responses (for the admin
// endpoints).
waiter chan struct{}
}

// StartPeriodicFileOperations will block forever to periodically download incoming and returned ACH files while also merging
// and uploading ACH files to their remote SFTP server. forceUpload is a channel for manually triggering the "merge and upload"
// portion of this pooling loop, which is used by admin endpoints and to make testing easier.
//
// Uploads will be completed before their cutoff time which is set for a given ABA routing number.
func (c *Controller) StartPeriodicFileOperations(ctx context.Context, flushIncoming chan struct{}, flushOutgoing chan struct{}, depRepo internal.DepositoryRepository, transferRepo internal.TransferRepository) {
func (c *Controller) StartPeriodicFileOperations(ctx context.Context, flushIncoming FlushChan, flushOutgoing FlushChan, depRepo internal.DepositoryRepository, transferRepo internal.TransferRepository) {
tick := time.NewTicker(c.interval)
defer tick.Stop()

// Grab shared transfer cursor for new transfers to merge into local files
transferCursor := transferRepo.GetTransferCursor(c.batchSize, depRepo)
microDepositCursor := depRepo.GetMicroDepositCursor(c.batchSize)

finish := func(req *periodicFileOperationsRequest, wg *sync.WaitGroup, errs chan error) {
// Wait for all operations to complete
wg.Wait()
errs <- nil // send so channel read doesn't block
if err := <-errs; err != nil {
c.logger.Log("StartPeriodicFileOperations", fmt.Sprintf("ERROR: periodic file operation"), "error", err)
} else {
c.logger.Log("StartPeriodicFileOperations", fmt.Sprintf("files sync'd, waiting %v", c.interval))
}
if req != nil && req.waiter != nil {
req.waiter <- struct{}{} // signal to our waiter the request is finished
}
}

for {
// Setup our concurrnet waiting
var wg sync.WaitGroup
errs := make(chan error, 10)

select {
case <-flushIncoming:
case req := <-flushIncoming:
c.logger.Log("StartPeriodicFileOperations", "flushing inbound ACH files")
if err := c.downloadAndProcessIncomingFiles(depRepo, transferRepo); err != nil {
errs <- fmt.Errorf("downloadAndProcessIncomingFiles: %v", err)
}
goto finish
finish(req, &wg, errs)

case <-flushOutgoing:
case req := <-flushOutgoing:
c.logger.Log("StartPeriodicFileOperations", "flushing ACH files to their outbound destination")
if err := c.mergeAndUploadFiles(transferCursor, microDepositCursor, transferRepo, &mergeUploadOpts{force: true}); err != nil {
errs <- fmt.Errorf("mergeAndUploadFiles: %v", err)
}
goto finish
finish(req, &wg, errs)

case <-tick.C:
// This is triggered by the time.Ticker (which accounts for delays) so let's download and upload files.
Expand All @@ -222,22 +245,12 @@ func (c *Controller) StartPeriodicFileOperations(ctx context.Context, flushIncom
}
wg.Done()
}()
goto finish
finish(nil, &wg, errs)

case <-ctx.Done():
c.logger.Log("StartPeriodicFileOperations", "Shutting down due to context.Done()")
return
}

finish:
// Wait for all operations to complete
wg.Wait()
errs <- nil // send so channel read doesn't block
if err := <-errs; err != nil {
c.logger.Log("StartPeriodicFileOperations", fmt.Sprintf("ERROR: periodic file operation"), "error", err)
} else {
c.logger.Log("StartPeriodicFileOperations", fmt.Sprintf("files sync'd, waiting %v", c.interval))
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/filetransfer/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ func TestController__startPeriodicFileOperations(t *testing.T) {
t.Fatal(err)
}

flushIncoming, flushOutgoing := make(chan struct{}, 1), make(chan struct{}, 1)
flushIncoming, flushOutgoing := make(FlushChan, 1), make(FlushChan, 1)
ctx, cancelFileSync := context.WithCancel(context.Background())

go controller.StartPeriodicFileOperations(ctx, flushIncoming, flushOutgoing, depRepo, transferRepo) // async call to register the polling loop
flushIncoming <- struct{}{} // trigger the calls
flushOutgoing <- struct{}{}
// trigger the calls
flushIncoming <- &periodicFileOperationsRequest{}
flushOutgoing <- &periodicFileOperationsRequest{}

time.Sleep(250 * time.Millisecond)

Expand Down

0 comments on commit 2d6f192

Please sign in to comment.