Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce initial snapshot chunking #2119

Merged
merged 16 commits into from
Dec 10, 2024
5 changes: 5 additions & 0 deletions .changeset/polite-frogs-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

feat: introduce chunked snapshot generation
10 changes: 8 additions & 2 deletions packages/elixir-client/lib/electric/client/offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ defmodule Electric.Client.Offset do
iex> from_string("1378734_3")
{:ok, %#{__MODULE__}{tx: 1378734, op: 3}}

iex> from_string("0_inf")
{:ok, %#{__MODULE__}{tx: 0, op: :infinity}}

iex> from_string("not a real offset")
{:error, "has invalid format"}

Expand All @@ -78,14 +81,17 @@ defmodule Electric.Client.Offset do
else
with [tx_offset_str, op_offset_str] <- :binary.split(str, "_"),
{tx_offset, ""} <- Integer.parse(tx_offset_str),
{op_offset, ""} <- Integer.parse(op_offset_str) do
{op_offset, ""} <- parse_int_or_inf(op_offset_str) do
icehaunter marked this conversation as resolved.
Show resolved Hide resolved
{:ok, %__MODULE__{tx: tx_offset, op: op_offset}}
else
_ -> {:error, "has invalid format"}
end
end
end

defp parse_int_or_inf("inf"), do: {:infinity, ""}
defp parse_int_or_inf(int), do: Integer.parse(int)

@doc """
Create a new #{__MODULE__} struct from the given LSN and operation
offsets.
Expand Down Expand Up @@ -115,7 +121,7 @@ defmodule Electric.Client.Offset do
end

def to_string(%__MODULE__{tx: tx, op: op}) do
"#{Integer.to_string(tx)}_#{Integer.to_string(op)}"
"#{Integer.to_string(tx)}_#{if op == :infinity, do: "inf", else: Integer.to_string(op)}"
end

@spec to_tuple(t()) :: {tx_offset(), op_offset()}
Expand Down
4 changes: 2 additions & 2 deletions packages/elixir-client/test/electric/client/mock_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Client.MockTest do
Client.Mock.response(client,
status: 200,
schema: %{id: %{type: "int8"}},
last_offset: Offset.new(0, 0),
last_offset: Offset.new(0, 1),
shape_handle: "my-shape",
body: [
Client.Mock.change(value: %{id: "4444"}),
Expand All @@ -60,7 +60,7 @@ defmodule Electric.Client.MockTest do
%ChangeMessage{value: %{"id" => 2222}},
%ChangeMessage{value: %{"id" => 3333}},
%ChangeMessage{value: %{"id" => 4444}},
up_to_date0()
up_to_date()
] = events
end
end
4 changes: 2 additions & 2 deletions packages/elixir-client/test/support/client_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Support.ClientHelpers do

defmacro offset(tx, op), do: quote(do: %Offset{tx: unquote(tx), op: unquote(op)})

defmacro offset0, do: quote(do: offset(0, 0))
defmacro offset0, do: quote(do: offset(0, :infinity))

defmacro up_to_date() do
quote(do: %ControlMessage{control: :up_to_date, offset: %Offset{tx: _, op: _}})
Expand All @@ -19,5 +19,5 @@ defmodule Support.ClientHelpers do
)
end

defmacro up_to_date0(), do: quote(do: up_to_date(0, 0))
defmacro up_to_date0(), do: quote(do: up_to_date(0, :infinity))
end
20 changes: 13 additions & 7 deletions packages/react-hooks/test/react-hooks.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { describe, expect, inject, it as bareIt } from 'vitest'
import { setTimeout as sleep } from 'node:timers/promises'
import { testWithIssuesTable as it } from './support/test-context'
import { useShape, sortedOptionsHash, UseShapeResult } from '../src/react-hooks'
import { Shape, Message } from '@electric-sql/client'
import { Shape, ShapeStream } from '@electric-sql/client'

const BASE_URL = inject(`baseUrl`)

Expand Down Expand Up @@ -351,14 +351,20 @@ describe(`useShape`, () => {
unmount()

// Add another row to shape
const [newId] = await insertIssues({ title: `other row` })
const [_] = await insertIssues({ title: `other row` })

const parallelWaiterStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
},
signal: aborter.signal,
subscribe: true,
})

// And wait until it's definitely seen
await waitFor(async () => {
const res = await fetch(
`${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1`
)
const body = (await res.json()) as Message[]
expect(body).toMatchObject([{}, { value: { id: newId } }])
return parallelWaiterStream.isUpToDate || (await sleep(50))
icehaunter marked this conversation as resolved.
Show resolved Hide resolved
})

await sleep(50)
Expand Down
80 changes: 17 additions & 63 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ defmodule Electric.Plug.ServeShapePlug do
plug :put_resp_cache_headers
plug :generate_etag
plug :validate_and_put_etag
plug :serve_log_or_snapshot
plug :serve_shape_log

# end_telemetry_span needs to always be the last plug here.
plug :end_telemetry_span
Expand Down Expand Up @@ -337,7 +337,8 @@ defmodule Electric.Plug.ServeShapePlug do
# The log can't be up to date if the last_offset is not the actual end.
# Also if client is requesting the start of the log, we don't set `up-to-date`
# here either as we want to set a long max-age on the cache-control.
if LogOffset.compare(chunk_end_offset, last_offset) == :lt or offset == @before_all_offset do
if LogOffset.compare(chunk_end_offset, last_offset) == :lt or
offset == @before_all_offset do
conn
|> assign(:up_to_date, [])
# header might have been added on first pass but no longer valid
Expand Down Expand Up @@ -420,66 +421,13 @@ defmodule Electric.Plug.ServeShapePlug do
"public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
)

# If offset is -1, we're serving a snapshot
defp serve_log_or_snapshot(
%Conn{assigns: %{offset: @before_all_offset, config: config}} = conn,
_
) do
OpenTelemetry.with_span("shape_get.plug.serve_snapshot", [], config[:stack_id], fn ->
serve_snapshot(conn)
end)
end

# Otherwise, serve log since that offset
defp serve_log_or_snapshot(%Conn{assigns: %{config: config}} = conn, _) do
defp serve_shape_log(%Conn{assigns: %{config: config}} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.serve_shape_log", [], config[:stack_id], fn ->
serve_shape_log(conn)
do_serve_shape_log(conn)
end)
end

defp serve_snapshot(
%Conn{
assigns: %{
chunk_end_offset: chunk_end_offset,
active_shape_handle: shape_handle,
up_to_date: maybe_up_to_date
}
} = conn
) do
case Shapes.get_snapshot(conn.assigns.config, shape_handle) do
{:ok, {offset, snapshot}} ->
log =
Shapes.get_log_stream(conn.assigns.config, shape_handle,
since: offset,
up_to: chunk_end_offset
)

[snapshot, log, maybe_up_to_date]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)

{:error, reason} ->
error_msg = "Could not serve a snapshot because of #{inspect(reason)}"

Logger.warning(error_msg)
OpenTelemetry.record_exception(error_msg)

{status_code, message} =
if match?(%DBConnection.ConnectionError{reason: :queue_timeout}, reason),
do: {429, "Could not establish connection to database - try again later"},
else: {500, "Failed creating or fetching the snapshot"}

send_resp(
conn,
status_code,
Jason.encode_to_iodata!(%{error: message})
)
end
end

defp serve_shape_log(
defp do_serve_shape_log(
%Conn{
assigns: %{
offset: offset,
Expand All @@ -490,7 +438,7 @@ defmodule Electric.Plug.ServeShapePlug do
} = conn
) do
log =
Shapes.get_log_stream(conn.assigns.config, shape_handle,
Shapes.get_merged_log_stream(conn.assigns.config, shape_handle,
since: offset,
up_to: chunk_end_offset
)
Expand Down Expand Up @@ -558,7 +506,10 @@ defmodule Electric.Plug.ServeShapePlug do

defp listen_for_new_changes(%Conn{assigns: assigns} = conn, _) do
# Only start listening when we know there is a possibility that nothing is going to be returned
if LogOffset.compare(assigns.offset, assigns.last_offset) != :lt do
# There is an edge case in that the snapshot is served in chunks but `last_offset` is not updated
# by that process. In that case, we'll start listening for changes but not receive any updates.
if LogOffset.compare(assigns.offset, assigns.last_offset) != :lt or
assigns.last_offset == LogOffset.last_before_real_offsets() do
shape_handle = assigns.handle

ref = make_ref()
Expand Down Expand Up @@ -587,7 +538,7 @@ defmodule Electric.Plug.ServeShapePlug do
# update last offset header
|> put_resp_header("electric-offset", "#{latest_log_offset}")
|> determine_up_to_date([])
|> serve_shape_log()
|> do_serve_shape_log()

{^ref, :shape_rotation} ->
# We may want to notify the client better that the shape handle had changed, but just closing the response
Expand All @@ -612,7 +563,9 @@ defmodule Electric.Plug.ServeShapePlug do

maybe_up_to_date = if up_to_date = assigns[:up_to_date], do: up_to_date != []

Electric.Telemetry.OpenTelemetry.get_stack_span_attrs(assigns.config[:stack_id])
Electric.Telemetry.OpenTelemetry.get_stack_span_attrs(
get_in(conn.assigns, [:config, :stack_id])
)
|> Map.merge(Electric.Plug.Utils.common_open_telemetry_attrs(conn))
|> Map.merge(%{
"shape.handle" => shape_handle,
Expand Down Expand Up @@ -668,7 +621,7 @@ defmodule Electric.Plug.ServeShapePlug do
conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle],
client_ip: conn.remote_ip,
status: conn.status,
stack_id: assigns.config[:stack_id]
stack_id: get_in(conn.assigns, [:config, :stack_id])
}
)

Expand Down Expand Up @@ -699,6 +652,7 @@ defmodule Electric.Plug.ServeShapePlug do
error_str = Exception.format(error.kind, error.reason)

conn
|> fetch_query_params()
|> assign(:error_str, error_str)
|> end_telemetry_span()

Expand Down
33 changes: 31 additions & 2 deletions packages/sync-service/lib/electric/replication/log_offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Electric.Replication.LogOffset do
** (FunctionClauseError) no function clause matching in Electric.Replication.LogOffset.new/2
"""
def new(tx_offset, op_offset)
when is_integer(tx_offset) and tx_offset >= 0 and is_integer(op_offset) and op_offset >= 0 do
when is_integer(tx_offset) and tx_offset >= 0 and is_integer(op_offset) and op_offset >= 0
when is_integer(tx_offset) and tx_offset >= 0 and op_offset == :infinity do
%LogOffset{tx_offset: tx_offset, op_offset: op_offset}
end

Expand Down Expand Up @@ -99,6 +100,10 @@ defmodule Electric.Replication.LogOffset do
(offset1.tx_offset == offset2.tx_offset and
offset1.op_offset < offset2.op_offset)

defguard is_min_offset(offset) when offset.tx_offset == -1

defguard is_virtual_offset(offset) when offset.tx_offset == 0

@doc """
An offset that is smaller than all offsets in the log.

Expand Down Expand Up @@ -130,6 +135,12 @@ defmodule Electric.Replication.LogOffset do
@spec last() :: t
def last(), do: %LogOffset{tx_offset: 0xFFFFFFFFFFFFFFFF, op_offset: :infinity}

@doc """
The last possible offset for the "virtual" part of the log - i.e. snapshots.
"""
@spec last_before_real_offsets() :: t()
def last_before_real_offsets(), do: %LogOffset{tx_offset: 0, op_offset: :infinity}
msfstef marked this conversation as resolved.
Show resolved Hide resolved

@doc """
Increments the offset of the change inside the transaction.

Expand Down Expand Up @@ -184,6 +195,10 @@ defmodule Electric.Replication.LogOffset do
[Integer.to_string(-1)]
end

def to_iolist(%LogOffset{tx_offset: tx_offset, op_offset: :infinity}) do
[Integer.to_string(tx_offset), ?_, "inf"]
end

def to_iolist(%LogOffset{tx_offset: tx_offset, op_offset: op_offset}) do
[Integer.to_string(tx_offset), ?_, Integer.to_string(op_offset)]
end
Expand All @@ -205,6 +220,9 @@ defmodule Electric.Replication.LogOffset do
iex> from_string("0_02")
{:ok, %LogOffset{tx_offset: 0, op_offset: 2}}

iex> from_string("0_inf")
{:ok, %LogOffset{tx_offset: 0, op_offset: :infinity}}

iex> from_string("1_2_3")
{:error, "has invalid format"}

Expand All @@ -224,7 +242,7 @@ defmodule Electric.Replication.LogOffset do
else
with [tx_offset_str, op_offset_str] <- String.split(str, "_"),
{tx_offset, ""} <- Integer.parse(tx_offset_str),
{op_offset, ""} <- Integer.parse(op_offset_str),
{op_offset, ""} <- parse_int_or_inf(op_offset_str),
offset <- new(tx_offset, op_offset) do
{:ok, offset}
else
Expand All @@ -233,11 +251,22 @@ defmodule Electric.Replication.LogOffset do
end
end

defp parse_int_or_inf("inf"), do: {:infinity, ""}
defp parse_int_or_inf(int), do: Integer.parse(int)

defimpl Inspect do
def inspect(%LogOffset{tx_offset: -1, op_offset: 0}, _opts) do
"LogOffset.before_all()"
end

def inspect(%LogOffset{tx_offset: 0xFFFFFFFFFFFFFFFF, op_offset: :infinity}, _opts) do
"LogOffset.last()"
end

def inspect(%LogOffset{tx_offset: 0, op_offset: :infinity}, _opts) do
"LogOffset.last_before_real_offsets()"
end

def inspect(%LogOffset{tx_offset: tx, op_offset: op}, _opts) do
"LogOffset.new(#{tx}, #{op})"
end
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ defmodule Electric.ShapeCache do
],
prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true],
create_snapshot_fn: [
type: {:fun, 6},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/6
type: {:fun, 7},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/7
],
purge_all_shapes?: [type: :boolean, required: false]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
defdelegate get_current_position(opts), to: FileStorage
defdelegate set_snapshot_xmin(xmin, opts), to: FileStorage
defdelegate snapshot_started?(opts), to: FileStorage
defdelegate get_snapshot(opts), to: FileStorage
defdelegate make_new_snapshot!(data_stream, opts), to: FileStorage
defdelegate mark_snapshot_as_started(opts), to: FileStorage
defdelegate get_log_stream(offset, max_offset, opts), to: FileStorage
Expand Down
Loading
Loading