From 99b8111cb6c86e9e36f9e7b6d469152bddec7411 Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Wed, 11 Sep 2024 16:23:05 -0400 Subject: [PATCH 1/5] Implement stream trimming --- lib/event_store.ex | 6 ++++ lib/event_store/sql/statements.ex | 1 + .../sql/statements/trim_stream.sql.eex | 16 ++++++++++ lib/event_store/storage.ex | 8 ++++- lib/event_store/storage/trim_stream.ex | 29 +++++++++++++++++++ lib/event_store/streams/stream.ex | 29 +++++++++++++++++++ 6 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 lib/event_store/sql/statements/trim_stream.sql.eex create mode 100644 lib/event_store/storage/trim_stream.ex diff --git a/lib/event_store.ex b/lib/event_store.ex index a02560a8..5f6807fc 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -409,6 +409,12 @@ defmodule EventStore do def stream_all_backward(start_version, opts), do: stream_backward(@all_stream, start_version, opts) + def trim_stream(stream_uuid, cutoff_version, expected_version \\ :any_version, opts \\ []) do + {conn, opts} = parse_opts(opts) + + Stream.trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) + end + def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ []) def delete_stream(@all_stream, _expected_version, _type, _opts), diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index c9801731..b61cf9e3 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -23,6 +23,7 @@ defmodule EventStore.Sql.Statements do {:subscription_ack, [:schema]}, {:insert_snapshot, [:schema]}, {:delete_snapshot, [:schema]}, + {:trim_stream, [:schema]}, {:query_all_subscriptions, [:schema]}, {:query_snapshot, [:schema]}, {:query_stream_info, [:schema]}, diff --git a/lib/event_store/sql/statements/trim_stream.sql.eex b/lib/event_store/sql/statements/trim_stream.sql.eex new file mode 100644 index 00000000..bd8ae1d7 --- /dev/null +++ b/lib/event_store/sql/statements/trim_stream.sql.eex @@ -0,0 +1,16 @@ +WITH deleted_stream_events AS ( + DELETE FROM "<%= schema %>".stream_events + WHERE stream_id = $1 + AND stream_version < $2 + RETURNING event_id +), +linked_events AS ( + DELETE FROM "<%= schema %>".stream_events + WHERE event_id IN (SELECT event_id FROM deleted_stream_events) +), +events AS ( + DELETE FROM "<%= schema %>".events + WHERE event_id IN (SELECT event_id FROM deleted_stream_events) +) + +select count(*) from deleted_stream_events; diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index 0f2af5ac..a4d36e12 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -8,7 +8,8 @@ defmodule EventStore.Storage do Reader, Snapshot, Stream, - Subscription + Subscription, + TrimStream } @doc """ @@ -101,4 +102,9 @@ defmodule EventStore.Storage do Delete an existing snapshot for a given source. """ defdelegate delete_snapshot(conn, source_uuid, opts), to: Snapshot + + @doc """ + Trim an existing stream up to the cutoff event + """ + defdelegate trim_stream(conn, stream_id, cutoff_version, otps), to: TrimStream, as: :trim end diff --git a/lib/event_store/storage/trim_stream.ex b/lib/event_store/storage/trim_stream.ex new file mode 100644 index 00000000..6c693f40 --- /dev/null +++ b/lib/event_store/storage/trim_stream.ex @@ -0,0 +1,29 @@ +defmodule EventStore.Storage.TrimStream do + @moduledoc false + + require Logger + + alias EventStore.Sql.Statements + + def trim(conn, stream_id, cutoff_version, opts) do + {schema, opts} = Keyword.pop(opts, :schema) + + query = Statements.trim_stream(schema) + + case Postgrex.query(conn, query, [stream_id, cutoff_version], opts) do + {:ok, %Postgrex.Result{num_rows: 1, rows: [[num_events]]}} -> + Logger.debug("Trimmed #{num_events} events from stream #{inspect(stream_id)}") + :ok + + {:ok, %Postgrex.Result{num_rows: 0}} -> + Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: stream not found") + + {:error, :stream_not_found} + + {:error, error} = reply -> + Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: " <> inspect(error)) + + reply + end + end +end diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 28cb554d..a6a18326 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -4,6 +4,35 @@ defmodule EventStore.Streams.Stream do alias EventStore.{EventData, RecordedEvent, Storage, UUID} alias EventStore.Streams.StreamInfo + def trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) do + with {:ok, %StreamInfo{} = stream} <- stream_info(conn, stream_uuid, expected_version, opts) do + do_trim_stream(conn, stream, cutoff_version, opts) + end + end + + defp do_trim_stream(conn, stream, cutoff_version, opts) do + %StreamInfo{stream_id: stream_id} = stream + + if Keyword.fetch!(opts, :enable_hard_deletes) do + opts = query_opts(opts) + + transaction( + conn, + fn transaction -> + with :ok <- set_enable_hard_deletes(transaction), + :ok <- Storage.trim_stream(transaction, stream_id, cutoff_version, opts) do + :ok + else + {:error, error} -> Postgrex.rollback(transaction, error) + end + end, + opts + ) + else + {:error, :not_supported} + end + end + def append_to_stream(conn, stream_uuid, expected_version, events, opts) when length(events) < 1000 do {serializer, new_opts} = Keyword.pop(opts, :serializer) From 35593fb76a0f3418d4de19a5c7a872914bb8b08a Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Thu, 12 Sep 2024 14:17:52 -0400 Subject: [PATCH 2/5] Support trim_stream_to_version --- lib/event_store.ex | 4 +- lib/event_store/streams/stream.ex | 106 +++++++++++++++++------------- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index 5f6807fc..74599c0b 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -294,7 +294,7 @@ defmodule EventStore do Supervisor.stop(supervisor, :normal, timeout) end - @accepted_overrides_append_to_stream [:created_at_override] + @append_to_stream_overrides [:created_at_override, :trim_stream_to_version] def append_to_stream(stream_uuid, expected_version, events, opts \\ []) @@ -302,7 +302,7 @@ defmodule EventStore do do: {:error, :cannot_append_to_all_stream} def append_to_stream(stream_uuid, expected_version, events, opts) do - overrides = Keyword.take(opts, @accepted_overrides_append_to_stream) + overrides = Keyword.take(opts, @append_to_stream_overrides) {conn, opts} = parse_opts(opts) opts = Keyword.merge(opts, overrides) diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index a6a18326..2f467c4a 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -5,22 +5,13 @@ defmodule EventStore.Streams.Stream do alias EventStore.Streams.StreamInfo def trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) do - with {:ok, %StreamInfo{} = stream} <- stream_info(conn, stream_uuid, expected_version, opts) do - do_trim_stream(conn, stream, cutoff_version, opts) - end - end - - defp do_trim_stream(conn, stream, cutoff_version, opts) do - %StreamInfo{stream_id: stream_id} = stream - if Keyword.fetch!(opts, :enable_hard_deletes) do - opts = query_opts(opts) - transaction( conn, fn transaction -> - with :ok <- set_enable_hard_deletes(transaction), - :ok <- Storage.trim_stream(transaction, stream_id, cutoff_version, opts) do + with {:ok, %StreamInfo{} = stream} <- + stream_info(transaction, stream_uuid, expected_version, opts), + :ok <- do_trim_stream(transaction, stream.stream_id, cutoff_version, opts) do :ok else {:error, error} -> Postgrex.rollback(transaction, error) @@ -33,41 +24,33 @@ defmodule EventStore.Streams.Stream do end end - def append_to_stream(conn, stream_uuid, expected_version, events, opts) - when length(events) < 1000 do - {serializer, new_opts} = Keyword.pop(opts, :serializer) - - with {:ok, stream} <- stream_info(conn, stream_uuid, expected_version, new_opts), - :ok <- do_append_to_storage(conn, stream, events, expected_version, serializer, new_opts) do - :ok - end - |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) - end - def append_to_stream(conn, stream_uuid, expected_version, events, opts) do - {serializer, new_opts} = Keyword.pop(opts, :serializer) + with :ok <- validate_append_opts(opts, expected_version) do + {serializer, new_opts} = Keyword.pop(opts, :serializer) - transaction( - conn, - fn transaction -> - with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), - :ok <- - do_append_to_storage( - transaction, - stream, - events, - expected_version, - serializer, - new_opts - ) do - :ok - else - {:error, error} -> Postgrex.rollback(transaction, error) - end - end, - new_opts - ) - |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) + transaction( + conn, + fn transaction -> + with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), + :ok <- + do_append_to_storage( + transaction, + stream, + events, + expected_version, + serializer, + new_opts + ), + :ok <- maybe_trim_stream(transaction, stream, new_opts) do + :ok + else + {:error, error} -> Postgrex.rollback(transaction, error) + end + end, + new_opts + ) + |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) + end end def link_to_stream(conn, stream_uuid, expected_version, events_or_event_ids, opts) do @@ -352,6 +335,36 @@ defmodule EventStore.Streams.Stream do end end + defp validate_append_opts(opts, expected_version) do + trim_version = Keyword.get(opts, :trim_stream_to_version, :no_trim) + hard_deletes_allowed? = Keyword.fetch!(opts, :enable_hard_deletes) + + case {trim_version, expected_version, hard_deletes_allowed?} do + {:no_trim, _, _} -> :ok + {_, :any_version, _} -> {:error, :cannot_trim_stream_with_any_version} + {_, _version, false} -> {:error, :cannot_trim_hard_deletes_not_allowed} + {_, _, _} -> :ok + end + end + + defp maybe_trim_stream(transaction, %StreamInfo{stream_id: stream_id}, opts) do + case Keyword.get(opts, :trim_stream_to_version) do + nil -> + :ok + + cutoff_version -> + do_trim_stream(transaction, stream_id, cutoff_version, opts) + end + end + + defp do_trim_stream(transaction, stream_id, cutoff_version, opts) do + opts = query_opts(opts) + + with :ok <- set_enable_hard_deletes(transaction) do + Storage.trim_stream(transaction, stream_id, cutoff_version, opts) + end + end + defp set_enable_hard_deletes(conn) do query = "SET SESSION eventstore.enable_hard_deletes TO 'on';" @@ -377,7 +390,8 @@ defmodule EventStore.Streams.Stream do end end - defp maybe_retry_once(error, _conn, _stream_uuid, _expected_version, _events, _opts), do: error + defp maybe_retry_once(ok_or_error, _conn, _stream_uuid, _expected_version, _events, _opts), + do: ok_or_error defp transaction(conn, transaction_fun, opts) do case Postgrex.transaction(conn, transaction_fun, opts) do From c106dd6d5fdb413cf45b669fc8551fa79de26d3e Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Mon, 13 Jan 2025 20:44:36 -0500 Subject: [PATCH 3/5] Add tests and documentation --- README.md | 1 + guides/Usage.md | 22 +++++ lib/event_store/sql/statements.ex | 2 +- .../sql/statements/trim_stream.sql.eex | 16 +++- lib/event_store/storage.ex | 2 +- lib/event_store/storage/trim_stream.ex | 7 +- lib/event_store/streams/stream.ex | 12 +-- test/event_store_test.exs | 81 +++++++++++++++++++ 8 files changed, 129 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index f7eb1756..efcb65db 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ MIT License - [Reading from all streams](guides/Usage.md#reading-from-all-streams) - [Stream from all streams](guides/Usage.md#stream-from-all-streams) - [Linking events between streams](guides/Usage.md#linking-events-between-streams) + - [Trimming events from streams](guides/Usage.md#trimming-events-from-streams) - [Subscriptions](guides/Subscriptions.md) - [Transient subscriptions](guides/Subscriptions.md#transient-subscriptions) - [Persistent subscriptions](guides/Subscriptions.md#persistent-subscriptions) diff --git a/guides/Usage.md b/guides/Usage.md index 6f718f01..cf41110c 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -156,3 +156,25 @@ alias MyApp.EventStore ``` You can also pass a list of `event_ids` instead of recorded event structs to link events. + +## Trimming events from streams + +Stream trimming allows you to permanently delete events from a stream up to a given version. This allows one form of 'Tombstoning' or 'Closing the books' where an event can be a rollup of the state so far. + +Trim an existing stream up to the 50th event: + +```elixir +alias MyApp.EventStore + +:ok = EventStore.trim_stream(stream_uuid, 50) +``` + +Given a stream with 50 events, append 3 new events, and trim up to the last one: + +```elixir +alias MyApp.EventStore + +events = [withdrawn, deposited, closed_for_the_day] +expected_version = 50 +:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, trim_stream_to_version: 53) +``` diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index b61cf9e3..e586a43c 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -23,7 +23,7 @@ defmodule EventStore.Sql.Statements do {:subscription_ack, [:schema]}, {:insert_snapshot, [:schema]}, {:delete_snapshot, [:schema]}, - {:trim_stream, [:schema]}, + {:trim_stream, [:schema, :stream_id]}, {:query_all_subscriptions, [:schema]}, {:query_snapshot, [:schema]}, {:query_stream_info, [:schema]}, diff --git a/lib/event_store/sql/statements/trim_stream.sql.eex b/lib/event_store/sql/statements/trim_stream.sql.eex index bd8ae1d7..b334bcb9 100644 --- a/lib/event_store/sql/statements/trim_stream.sql.eex +++ b/lib/event_store/sql/statements/trim_stream.sql.eex @@ -1,6 +1,16 @@ -WITH deleted_stream_events AS ( - DELETE FROM "<%= schema %>".stream_events - WHERE stream_id = $1 +WITH stream_info AS ( + <%= if stream_id do %> + SELECT $1::bigint AS stream_id + <% else %> + SELECT stream_id + FROM "<%= schema %>".streams + WHERE stream_uuid = $1 + <% end %> +), +deleted_stream_events AS ( + DELETE FROM "<%= schema %>".stream_events AS stream_events + USING stream_info as s + WHERE stream_events.stream_id = s.stream_id AND stream_version < $2 RETURNING event_id ), diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index a4d36e12..7d93f73e 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -106,5 +106,5 @@ defmodule EventStore.Storage do @doc """ Trim an existing stream up to the cutoff event """ - defdelegate trim_stream(conn, stream_id, cutoff_version, otps), to: TrimStream, as: :trim + defdelegate trim_stream(conn, stream_id, stream_uuid, cutoff_version, otps), to: TrimStream, as: :trim end diff --git a/lib/event_store/storage/trim_stream.ex b/lib/event_store/storage/trim_stream.ex index 6c693f40..3cc15a0e 100644 --- a/lib/event_store/storage/trim_stream.ex +++ b/lib/event_store/storage/trim_stream.ex @@ -5,12 +5,13 @@ defmodule EventStore.Storage.TrimStream do alias EventStore.Sql.Statements - def trim(conn, stream_id, cutoff_version, opts) do + def trim(conn, stream_id, stream_uuid, cutoff_version, opts) do {schema, opts} = Keyword.pop(opts, :schema) - query = Statements.trim_stream(schema) + query = Statements.trim_stream(schema, stream_id) - case Postgrex.query(conn, query, [stream_id, cutoff_version], opts) do + stream_id_or_uuid = stream_id || stream_uuid + case Postgrex.query(conn, query, [stream_id_or_uuid, cutoff_version], opts) do {:ok, %Postgrex.Result{num_rows: 1, rows: [[num_events]]}} -> Logger.debug("Trimmed #{num_events} events from stream #{inspect(stream_id)}") :ok diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 2f467c4a..b816daa8 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -11,7 +11,7 @@ defmodule EventStore.Streams.Stream do fn transaction -> with {:ok, %StreamInfo{} = stream} <- stream_info(transaction, stream_uuid, expected_version, opts), - :ok <- do_trim_stream(transaction, stream.stream_id, cutoff_version, opts) do + :ok <- do_trim_stream(transaction, stream, cutoff_version, opts) do :ok else {:error, error} -> Postgrex.rollback(transaction, error) @@ -342,26 +342,26 @@ defmodule EventStore.Streams.Stream do case {trim_version, expected_version, hard_deletes_allowed?} do {:no_trim, _, _} -> :ok {_, :any_version, _} -> {:error, :cannot_trim_stream_with_any_version} - {_, _version, false} -> {:error, :cannot_trim_hard_deletes_not_allowed} + {_, _version, false} -> {:error, :cannot_trim_when_hard_deletes_not_enabled} {_, _, _} -> :ok end end - defp maybe_trim_stream(transaction, %StreamInfo{stream_id: stream_id}, opts) do + defp maybe_trim_stream(transaction, %StreamInfo{} = stream, opts) do case Keyword.get(opts, :trim_stream_to_version) do nil -> :ok cutoff_version -> - do_trim_stream(transaction, stream_id, cutoff_version, opts) + do_trim_stream(transaction, stream, cutoff_version, opts) end end - defp do_trim_stream(transaction, stream_id, cutoff_version, opts) do + defp do_trim_stream(transaction, %StreamInfo{} = stream, cutoff_version, opts) do opts = query_opts(opts) with :ok <- set_enable_hard_deletes(transaction) do - Storage.trim_stream(transaction, stream_id, cutoff_version, opts) + Storage.trim_stream(transaction, stream.stream_id, stream.stream_uuid, cutoff_version, opts) end end diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 630c0775..553ffeed 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -65,6 +65,87 @@ defmodule EventStore.EventStoreTest do end end + describe "trimming the event stream" do + setup(tags) do + hard_deletes? = Map.get(tags, :enable_hard_deletes, true) + stop_supervised!(TestEventStore) + start_supervised!({TestEventStore, enable_hard_deletes: hard_deletes?}) + :ok + end + + test "should not allow trimming with :any_version" do + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert {:error, :cannot_trim_stream_with_any_version} = + EventStore.append_to_stream(stream_uuid, :any_version, events, + trim_stream_to_version: 2 + ) + end + + @tag enable_hard_deletes: false + test "should not allow trimming when hard_deletes are disabled" do + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert {:error, :cannot_trim_when_hard_deletes_not_enabled} = + EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2) + end + + test "should trim up to the given version" do + # When a stream exists with 2 events + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events) + + # When we trim to stream to the 2nd event + assert :ok = EventStore.trim_stream(stream_uuid, 2) + + # Then the stream has a single event in it, at version 2 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 2 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 2 + end + + test "should trim up to the event given when the stream exists" do + # Given an existing stream with an event + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events) + + # When we append 2 events and ask the stream to be trimmed up to the 3rd event + events = EventFactory.create_events(2) + assert :ok = EventStore.append_to_stream(stream_uuid, 1, events, trim_stream_to_version: 3) + + # Then the stream has a single event in it, at version 3 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 3 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 3 + end + + test "should trim up to the event given even when the stream doesn't exist" do + # When we append 2 events and ask the stream to be trimmed up to the 2nd event + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2) + + # Then the stream has a single event in it, at version 2 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 2 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 2 + end + end + describe "link to event store" do setup do source_stream_uuid = UUID.uuid4() From d8e44de46708329a25b06e93098ce7e189241bf9 Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Mon, 13 Jan 2025 22:32:47 -0500 Subject: [PATCH 4/5] Formatting --- lib/event_store/sql/statements/insert_events.sql.eex | 4 ++-- lib/event_store/storage.ex | 4 +++- lib/event_store/storage/trim_stream.ex | 1 + lib/event_store/streams/stream.ex | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index 27141d76..ec198542 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -1,4 +1,4 @@ -<%# +<% # # Elixir template variables: # schema - string # stream_id - integer @@ -21,7 +21,7 @@ %> WITH - <%# + <% # # create a table variable with: # event_id - uuid - the id for the new event # index - integer - the increase in the stream version for any stream it is linked to diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index 7d93f73e..a2055b17 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -106,5 +106,7 @@ defmodule EventStore.Storage do @doc """ Trim an existing stream up to the cutoff event """ - defdelegate trim_stream(conn, stream_id, stream_uuid, cutoff_version, otps), to: TrimStream, as: :trim + defdelegate trim_stream(conn, stream_id, stream_uuid, cutoff_version, otps), + to: TrimStream, + as: :trim end diff --git a/lib/event_store/storage/trim_stream.ex b/lib/event_store/storage/trim_stream.ex index 3cc15a0e..be41ab7a 100644 --- a/lib/event_store/storage/trim_stream.ex +++ b/lib/event_store/storage/trim_stream.ex @@ -11,6 +11,7 @@ defmodule EventStore.Storage.TrimStream do query = Statements.trim_stream(schema, stream_id) stream_id_or_uuid = stream_id || stream_uuid + case Postgrex.query(conn, query, [stream_id_or_uuid, cutoff_version], opts) do {:ok, %Postgrex.Result{num_rows: 1, rows: [[num_events]]}} -> Logger.debug("Trimmed #{num_events} events from stream #{inspect(stream_id)}") diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index b816daa8..0b7cbf6c 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -347,7 +347,7 @@ defmodule EventStore.Streams.Stream do end end - defp maybe_trim_stream(transaction, %StreamInfo{} = stream, opts) do + defp maybe_trim_stream(transaction, %StreamInfo{} = stream, opts) do case Keyword.get(opts, :trim_stream_to_version) do nil -> :ok From ad1d9d87838aa405f6112d5d9b5c8ad2fdd30300 Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Mon, 13 Jan 2025 22:50:52 -0500 Subject: [PATCH 5/5] Don't expect hard_deletes to be present --- lib/event_store/streams/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 0b7cbf6c..65310a4a 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -337,7 +337,7 @@ defmodule EventStore.Streams.Stream do defp validate_append_opts(opts, expected_version) do trim_version = Keyword.get(opts, :trim_stream_to_version, :no_trim) - hard_deletes_allowed? = Keyword.fetch!(opts, :enable_hard_deletes) + hard_deletes_allowed? = Keyword.get(opts, :enable_hard_deletes, false) case {trim_version, expected_version, hard_deletes_allowed?} do {:no_trim, _, _} -> :ok