Skip to content

Commit

Permalink
Keep ChannelBrokerState in the database (#2297)
Browse files Browse the repository at this point in the history
* Keep ChannelBrokerState in the database (broken test suite)

* Replace Ecto.Adapters.SQL.Sandbox with Database Cleaner

This solves the issue of having long lived transactions in
Ask.Runtime.Session and Ask.Runtime.SurveyBroker that call into the
ChannelBroker process that then fails to checkout a database connection
for its own usages.

* Fix: avoid duplicate in channel broker queue

Some tests will queue a new contact while there is already a contact for
the same channel and respondent in the channel broker queue.

* Fix: don't pin respondent in tests for Ask.Runtime.Session

* Fix: missing leeway to activate contacts just before it's meant to be sent

* Fix: invalid channel broker test

* Fix: workaround for flaky tests (concurrency race conditions)

* fixup! Keep ChannelBrokerState in the database (broken test suite)

* fixup! Replace Ecto.Adapters.SQL.Sandbox with Database Cleaner

* fixup! Keep ChannelBrokerState in the database (broken test suite)

* fixup! Fix: workaround for flaky tests (concurrency race conditions)

* Apply suggestions from code review

Co-authored-by: Matías García Isaía <[email protected]>

---------

Co-authored-by: Matías García Isaía <[email protected]>
  • Loading branch information
ysbaddaden and matiasgarciaisaia authored Sep 12, 2023
1 parent c5fa0c9 commit 29c3e94
Show file tree
Hide file tree
Showing 23 changed files with 1,117 additions and 612 deletions.
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ config :phoenix, :stacktrace_depth, 20

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: "root",
password: "",
database: "ask_dev",
Expand Down
2 changes: 1 addition & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ config :ask, AskWeb.Endpoint, secret_key_base: System.get_env("SECRET_KEY_BASE")

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: System.get_env("DATABASE_USER") || "root",
password: System.get_env("DATABASE_PASS") || "",
database: System.get_env("DATABASE_NAME") || "ask",
Expand Down
4 changes: 2 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ config :logger, level: :warn

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: "root",
password: "",
database: "ask_test",
hostname: System.get_env("DATABASE_HOST") || "localhost",
pool: Ecto.Adapters.SQL.Sandbox
pool_size: 10

config :ask, Ask.Runtime.SurveyBroker, batch_size: 10

Expand Down
36 changes: 22 additions & 14 deletions lib/ask.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ defmodule Ask do
]

children =
if Mix.env() != :test && !IEx.started?() do
[
worker(Ask.OAuthTokenServer, []),
worker(Ask.Runtime.SurveyLogger, []),
worker(Ask.Runtime.SurveyBroker, []),
worker(Ask.FloipPusher, []),
worker(Ask.JsonSchema, []),
worker(Ask.Runtime.ChannelStatusServer, []),
worker(Ask.Config, []),
worker(Ask.Runtime.QuestionnaireSimulatorStore, [])
| children
]
else
children
cond do
Mix.env() == :test ->
[
worker(Ask.DatabaseCleaner, [])
| children
]

!IEx.started?() ->
[
worker(Ask.OAuthTokenServer, []),
worker(Ask.Runtime.SurveyLogger, []),
worker(Ask.Runtime.SurveyBroker, []),
worker(Ask.FloipPusher, []),
worker(Ask.JsonSchema, []),
worker(Ask.Runtime.ChannelStatusServer, []),
worker(Ask.Config, []),
worker(Ask.Runtime.QuestionnaireSimulatorStore, [])
| children
]

true ->
children
end

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
Expand Down
105 changes: 105 additions & 0 deletions lib/ask/channel_broker_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Ask.ChannelBrokerQueue do
use Ask.Model

alias Ask.ChannelBrokerQueue, as: Queue
alias Ask.Repo

@primary_key false

schema "channel_broker_queue" do
belongs_to :channel, Ask.Channel, primary_key: true
belongs_to :respondent, Ask.Respondent, primary_key: true

# queued (pending):
field :queued_at, :utc_datetime
field :priority, Ecto.Enum, values: [low: 2, normal: 1, high: 0]
# number of outgoing messages (ivr=1, sms=1+)
field :size, :integer
field :token, :string
field :not_before, :utc_datetime
field :not_after, :utc_datetime
field :reply, Ask.Ecto.Type.ErlangTerm

# sent (active):
field :last_contact, :utc_datetime
# number of active messages left (pending confirmation)
field :contacts, :integer
field :channel_state, Ask.Ecto.Type.ErlangTerm
end

def upsert!(params) do
upsert_params =
params
|> Map.drop([:channel_id, :respondent_id])
|> Map.put_new(:last_contact, nil)
|> Map.put_new(:contacts, nil)
|> Map.put_new(:channel_state, nil)
|> Map.to_list()

%Queue{}
|> changeset(params)
|> Repo.insert!(on_conflict: [set: upsert_params])
end

def changeset(struct, params \\ %{}) do
struct
|> cast(params, [
:channel_id,
:respondent_id,
:queued_at,
:priority,
:size,
:token,
:not_before,
:not_after,
:reply,
:last_contact,
:contacts,
:channel_state
])
|> validate_required([
:channel_id,
:respondent_id,
:queued_at,
:priority,
:size,
:token
])

# |> assoc_constraint(:channel, :respondent)
end

def activable_contacts?(channel_id) do
# add leeway to activate contacts to be scheduled soon
not_before = Ask.SystemTime.time().now |> DateTime.add(60, :second)

Repo.exists?(
from q in Queue,
where:
q.channel_id == ^channel_id and is_nil(q.last_contact) and
(is_nil(q.not_before) or q.not_before <= ^not_before)
)
end

def count_active_contacts(channel_id) do
Repo.one(
from q in Queue,
select: type(coalesce(sum(coalesce(q.contacts, 0)), 0), :integer),
where: q.channel_id == ^channel_id and not is_nil(q.last_contact)
)
end

def queued_contacts(channel_id) do
Repo.all(
from q in Ask.ChannelBrokerQueue,
where: q.channel_id == ^channel_id and is_nil(q.last_contact)
)
end

def active_contacts(channel_id) do
Repo.all(
from q in Ask.ChannelBrokerQueue,
where: q.channel_id == ^channel_id and not is_nil(q.last_contact)
)
end
end
20 changes: 20 additions & 0 deletions lib/ask/ecto_types/erlang_term.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Ask.Ecto.Type.ErlangTerm do
@moduledoc """
A custom Ecto type for handling the serialization of arbitrary
data types stored as binary data in the database. Requires the
underlying DB field to be a binary.
"""
use Ecto.Type
def type, do: :binary

def cast(:any, term), do: {:ok, term}
def cast(term), do: {:ok, term}

def load(binary) when is_binary(binary) do
{:ok, :erlang.binary_to_term(binary)}
end

def dump(term) do
{:ok, :erlang.term_to_binary(term)}
end
end
93 changes: 0 additions & 93 deletions lib/ask/pqueue.ex

This file was deleted.

Loading

0 comments on commit 29c3e94

Please sign in to comment.