Skip to content

Commit

Permalink
Merge pull request #224 from moov-io/shard-fallback-to-name
Browse files Browse the repository at this point in the history
pipeline: fallback to shard name during lookup
  • Loading branch information
adamdecaf authored Feb 21, 2024
2 parents f72b2e7 + 2f85cad commit 8415bd9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 8 deletions.
12 changes: 10 additions & 2 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline

import (
"context"
"database/sql"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -401,8 +402,15 @@ func (fr *FileReceiver) produceInvalidQueueFile(ctx context.Context, logger log.
func (fr *FileReceiver) getAggregator(ctx context.Context, shardKey string) *aggregator {
shardName, err := fr.shardRepository.Lookup(shardKey)
if err != nil {
fr.logger.Error().LogErrorf("problem looking up shardKey=%s: %v", shardKey, err)
return nil
if !errors.Is(err, sql.ErrNoRows) {
fr.logger.Error().LogErrorf("problem looking up shardKey=%s: %v", shardKey, err)
return nil
}
}
if shardName == "" {
// Often we have deployments with "SD-live-odfi" that have shard keys of "SD-$uuid".
// We want "SD-live-odfi" (as a shard key) to represent "SD-live-odfi" (as a shard name).
shardName = shardKey
}

agg, exists := fr.shardAggregators[shardName]
Expand Down
34 changes: 34 additions & 0 deletions internal/pipeline/file_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/moov-io/base/database"
"github.com/moov-io/base/log"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"gocloud.dev/pubsub"
)
Expand All @@ -49,6 +50,8 @@ type TestFileReceiver struct {
MergingDir string
Publisher stream.Publisher
Events *events.MockEmitter

shardRepo shards.Repository
}

func (fr *TestFileReceiver) TriggerCutoff(t *testing.T) {
Expand Down Expand Up @@ -92,6 +95,15 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
OutboundFilenameTemplate: `{{ .ShardName }}-{{ date "150405.00000" }}.ach`,
UploadAgent: "mock",
},
{
Name: "SD-live-odfi",
Cutoffs: service.Cutoffs{
Timezone: "America/Chicago",
Windows: []string{"5:00"},
},
OutboundFilenameTemplate: `{{ .ShardName }}-{{ date "150405.00000" }}.ach`,
UploadAgent: "mock",
},
},
Default: "testing",
},
Expand Down Expand Up @@ -132,6 +144,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
MergingDir: dir,
Publisher: filesTopic,
Events: eventEmitter,
shardRepo: shardRepo,
}
}

Expand Down Expand Up @@ -168,6 +181,27 @@ func TestFileReceiver__InvalidQueueFile(t *testing.T) {
require.Contains(t, iqf.Error, "reading QueueACHFile failed: ImmediateDestination")
}

func TestFileReceiver__getAggregator(t *testing.T) {
fr := testFileReceiver(t)

ctx := context.Background()
agg := fr.getAggregator(ctx, "testing")
require.NotNil(t, agg)

mapping := service.ShardMapping{
ShardKey: "SD-" + uuid.NewString(),
ShardName: "SD-live-odfi",
}
err := fr.shardRepo.Add(mapping, database.NopInTx)
require.NoError(t, err)

foundKey := fr.getAggregator(ctx, mapping.ShardKey)
require.Equal(t, "SD-live-odfi", foundKey.shard.Name)

foundName := fr.getAggregator(ctx, mapping.ShardName)
require.Equal(t, foundKey, foundName)
}

func TestFileReceiver__shouldAutocommit(t *testing.T) {
fr := testFileReceiver(t)

Expand Down
3 changes: 2 additions & 1 deletion internal/shards/inmemory_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package shards

import (
"database/sql"
"fmt"

"github.com/moov-io/achgateway/internal/service"
Expand All @@ -40,7 +41,7 @@ func (r *InMemoryRepository) Lookup(shardKey string) (string, error) {
return r.Shards[i].ShardName, nil
}
}
return "", fmt.Errorf("unknown shardKey=%s", shardKey)
return "", fmt.Errorf("unknown shardKey %s: %w", shardKey, sql.ErrNoRows)
}

func (r *InMemoryRepository) List() ([]service.ShardMapping, error) {
Expand Down
7 changes: 2 additions & 5 deletions internal/shards/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,19 @@ func (r *sqlRepository) Lookup(shardKey string) (string, error) {
if err != nil {
return "", err
}
if rows.Err() != nil {
return "", rows.Err()
}
defer rows.Close()

var shardName string
for rows.Next() {
err := rows.Scan(&shardName)
if err != nil {
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}
return "", err
}
}
return shardName, nil
return shardName, rows.Err()
}

func (r *sqlRepository) List() ([]service.ShardMapping, error) {
Expand Down

0 comments on commit 8415bd9

Please sign in to comment.