From 2f85cad745892021ec8463f07cdedf99fc0e5d28 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Wed, 21 Feb 2024 15:59:42 -0600 Subject: [PATCH] pipeline: fallback to shard name during lookup Fixes: https://github.com/moov-io/achgateway/issues/223 --- internal/pipeline/file_receiver.go | 12 +++++++-- internal/pipeline/file_receiver_test.go | 34 +++++++++++++++++++++++++ internal/shards/inmemory_repository.go | 3 ++- internal/shards/repository.go | 7 ++--- 4 files changed, 48 insertions(+), 8 deletions(-) diff --git a/internal/pipeline/file_receiver.go b/internal/pipeline/file_receiver.go index 93365ca..0b1ba2f 100644 --- a/internal/pipeline/file_receiver.go +++ b/internal/pipeline/file_receiver.go @@ -19,6 +19,7 @@ package pipeline import ( "context" + "database/sql" "errors" "fmt" "os" @@ -401,8 +402,15 @@ func (fr *FileReceiver) produceInvalidQueueFile(ctx context.Context, logger log. func (fr *FileReceiver) getAggregator(ctx context.Context, shardKey string) *aggregator { shardName, err := fr.shardRepository.Lookup(shardKey) if err != nil { - fr.logger.Error().LogErrorf("problem looking up shardKey=%s: %v", shardKey, err) - return nil + if !errors.Is(err, sql.ErrNoRows) { + fr.logger.Error().LogErrorf("problem looking up shardKey=%s: %v", shardKey, err) + return nil + } + } + if shardName == "" { + // Often we have deployments with "SD-live-odfi" that have shard keys of "SD-$uuid". + // We want "SD-live-odfi" (as a shard key) to represent "SD-live-odfi" (as a shard name). + shardName = shardKey } agg, exists := fr.shardAggregators[shardName] diff --git a/internal/pipeline/file_receiver_test.go b/internal/pipeline/file_receiver_test.go index 05c0399..65add93 100644 --- a/internal/pipeline/file_receiver_test.go +++ b/internal/pipeline/file_receiver_test.go @@ -39,6 +39,7 @@ import ( "github.com/moov-io/base/database" "github.com/moov-io/base/log" + "github.com/google/uuid" "github.com/stretchr/testify/require" "gocloud.dev/pubsub" ) @@ -49,6 +50,8 @@ type TestFileReceiver struct { MergingDir string Publisher stream.Publisher Events *events.MockEmitter + + shardRepo shards.Repository } func (fr *TestFileReceiver) TriggerCutoff(t *testing.T) { @@ -92,6 +95,15 @@ func testFileReceiver(t *testing.T) *TestFileReceiver { OutboundFilenameTemplate: `{{ .ShardName }}-{{ date "150405.00000" }}.ach`, UploadAgent: "mock", }, + { + Name: "SD-live-odfi", + Cutoffs: service.Cutoffs{ + Timezone: "America/Chicago", + Windows: []string{"5:00"}, + }, + OutboundFilenameTemplate: `{{ .ShardName }}-{{ date "150405.00000" }}.ach`, + UploadAgent: "mock", + }, }, Default: "testing", }, @@ -132,6 +144,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver { MergingDir: dir, Publisher: filesTopic, Events: eventEmitter, + shardRepo: shardRepo, } } @@ -168,6 +181,27 @@ func TestFileReceiver__InvalidQueueFile(t *testing.T) { require.Contains(t, iqf.Error, "reading QueueACHFile failed: ImmediateDestination") } +func TestFileReceiver__getAggregator(t *testing.T) { + fr := testFileReceiver(t) + + ctx := context.Background() + agg := fr.getAggregator(ctx, "testing") + require.NotNil(t, agg) + + mapping := service.ShardMapping{ + ShardKey: "SD-" + uuid.NewString(), + ShardName: "SD-live-odfi", + } + err := fr.shardRepo.Add(mapping, database.NopInTx) + require.NoError(t, err) + + foundKey := fr.getAggregator(ctx, mapping.ShardKey) + require.Equal(t, "SD-live-odfi", foundKey.shard.Name) + + foundName := fr.getAggregator(ctx, mapping.ShardName) + require.Equal(t, foundKey, foundName) +} + func TestFileReceiver__shouldAutocommit(t *testing.T) { fr := testFileReceiver(t) diff --git a/internal/shards/inmemory_repository.go b/internal/shards/inmemory_repository.go index a42369e..e9c23fa 100644 --- a/internal/shards/inmemory_repository.go +++ b/internal/shards/inmemory_repository.go @@ -18,6 +18,7 @@ package shards import ( + "database/sql" "fmt" "github.com/moov-io/achgateway/internal/service" @@ -40,7 +41,7 @@ func (r *InMemoryRepository) Lookup(shardKey string) (string, error) { return r.Shards[i].ShardName, nil } } - return "", fmt.Errorf("unknown shardKey=%s", shardKey) + return "", fmt.Errorf("unknown shardKey %s: %w", shardKey, sql.ErrNoRows) } func (r *InMemoryRepository) List() ([]service.ShardMapping, error) { diff --git a/internal/shards/repository.go b/internal/shards/repository.go index 28dc4a3..f7334a8 100644 --- a/internal/shards/repository.go +++ b/internal/shards/repository.go @@ -57,22 +57,19 @@ func (r *sqlRepository) Lookup(shardKey string) (string, error) { if err != nil { return "", err } - if rows.Err() != nil { - return "", rows.Err() - } defer rows.Close() var shardName string for rows.Next() { err := rows.Scan(&shardName) if err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return "", nil } return "", err } } - return shardName, nil + return shardName, rows.Err() } func (r *sqlRepository) List() ([]service.ShardMapping, error) {