Skip to content

Commit

Permalink
wip: Postgres support
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Oct 15, 2021
1 parent a155912 commit 0cb9c8f
Show file tree
Hide file tree
Showing 12 changed files with 1,633 additions and 1 deletion.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ upload_coverage:

.PHONY: run
run:
docker-compose up -d mongodb gpubsub kafka redis nats
docker-compose up -d mongodb postgres gpubsub kafka redis nats

.PHONY: run_mongodb
run_mongodb:
docker-compose up -d mongodb

.PHONY: run_postgres
run_postgres:
docker-compose up -d postgres

.PHONY: run_gpubsub
run_gpubsub:
docker-compose up -d gpubsub
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ services:
volumes:
- mongodb:/data/db

postgres:
image: postgres:14
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: "password"
# PGDATA: "/data/postgres"
# volumes:
# - postgres:/data/postgres

gpubsub:
image: gcr.io/google.com/cloudsdktool/cloud-sdk:355.0.0-emulators
ports:
Expand Down
133 changes: 133 additions & 0 deletions eventstore/postgres/eventmaintenance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"fmt"

eh "github.com/looplab/eventhorizon"
)

// Replace implements the Replace method of the eventhorizon.EventStore interface.
// func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
// sess, err := s.client.StartSession(nil)
// if err != nil {
// return eh.EventStoreError{
// Err: eh.ErrCouldNotSaveEvents,
// BaseErr: err,
// }
// }
// defer sess.EndSession(ctx)

// if _, err := sess.WithTransaction(ctx, func(txCtx mongo.SessionContext) (interface{}, error) {
// // First check if the aggregate exists, the not found error in the update
// // query can mean both that the aggregate or the event is not found.
// if n, err := s.events.CountDocuments(ctx,
// bson.M{"aggregate_id": event.AggregateID()}); n == 0 {
// return nil, eh.ErrAggregateNotFound
// } else if err != nil {
// return nil, err
// }

// // Create the event record for the Database.
// e, err := newEvt(ctx, event)
// if err != nil {
// return nil, err
// }

// // Copy the event position from the old event (and set in metadata).
// res := s.events.FindOne(ctx, bson.M{
// "aggregate_id": event.AggregateID(),
// "version": event.Version(),
// })
// if res.Err() != nil {
// if res.Err() == mongo.ErrNoDocuments {
// return nil, eh.ErrInvalidEvent
// }
// return nil, fmt.Errorf("could not find event to replace: %w", res.Err())
// }
// var eventToReplace evt
// if err := res.Decode(&eventToReplace); err != nil {
// return nil, fmt.Errorf("could not decode event to replace: %w", err)
// }
// e.Position = eventToReplace.Position
// e.Metadata["position"] = eventToReplace.Position

// // Find and replace the event.
// if r, err := s.events.ReplaceOne(ctx, bson.M{
// "aggregate_id": event.AggregateID(),
// "version": event.Version(),
// }, e); err != nil {
// return nil, err
// } else if r.MatchedCount == 0 {
// return nil, eh.ErrInvalidEvent
// }
// return nil, nil
// }); err != nil {
// // Return some errors intact.
// if err == eh.ErrAggregateNotFound || err == eh.ErrInvalidEvent {
// return err
// }
// return eh.EventStoreError{
// Err: eh.ErrCouldNotSaveEvents,
// BaseErr: err,
// }
// }

// return nil
// }

// RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface.
// func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error {
// // Find and rename all events.
// // TODO: Maybe use change info.
// if _, err := s.events.UpdateMany(ctx,
// bson.M{
// "event_type": from.String(),
// },
// bson.M{
// "$set": bson.M{"event_type": to.String()},
// },
// ); err != nil {
// return eh.EventStoreError{
// Err: eh.ErrCouldNotSaveEvents,
// BaseErr: err,
// }
// }

// return nil
// }

// Clear clears the event storage.
func (s *EventStore) Clear(ctx context.Context) error {
if _, err := s.db.NewTruncateTable().
Model((*stream)(nil)).
Exec(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear stream table: %w", err),
}
}

if _, err := s.db.NewTruncateTable().
Model((*evt)(nil)).
Exec(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear event table: %w", err),
}
}

return nil
}
57 changes: 57 additions & 0 deletions eventstore/postgres/eventmaintenance_test.go.off
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"crypto/rand"
"encoding/hex"
"os"
"testing"

"github.com/looplab/eventhorizon/eventstore"
)

func TestEventStoreMaintenanceIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

// Use MongoDB in Docker with fallback to localhost.
addr := os.Getenv("MONGODB_ADDR")
if addr == "" {
addr = "localhost:27017"
}
url := "mongodb://" + addr

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}
db := "test-" + hex.EncodeToString(b)
t.Log("using DB:", db)

store, err := NewEventStore(url, db)
if err != nil {
t.Fatal("there should be no error:", err)
}
if store == nil {
t.Fatal("there should be a store")
}
defer store.Close(context.Background())

eventstore.MaintenanceAcceptanceTest(t, store, store, context.Background())
}
Loading

0 comments on commit 0cb9c8f

Please sign in to comment.