From 29c3e94f621651e42e41ef57866fcfdca2f0e01d Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 12 Sep 2023 20:50:27 +0200 Subject: [PATCH] Keep ChannelBrokerState in the database (#2297) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Keep ChannelBrokerState in the database (broken test suite) * Replace Ecto.Adapters.SQL.Sandbox with Database Cleaner This solves the issue of having long lived transactions in Ask.Runtime.Session and Ask.Runtime.SurveyBroker that call into the ChannelBroker process that then fails to checkout a database connection for its own usages. * Fix: avoid duplicate in channel broker queue Some tests will queue a new contact while there is already a contact for the same channel and respondent in the channel broker queue. * Fix: don't pin respondent in tests for Ask.Runtime.Session * Fix: missing leeway to activate contacts just before it's meant to be sent * Fix: invalid channel broker test * Fix: workaround for flaky tests (concurrency race conditions) * fixup! Keep ChannelBrokerState in the database (broken test suite) * fixup! Replace Ecto.Adapters.SQL.Sandbox with Database Cleaner * fixup! Keep ChannelBrokerState in the database (broken test suite) * fixup! Fix: workaround for flaky tests (concurrency race conditions) * Apply suggestions from code review Co-authored-by: Matías García Isaía --------- Co-authored-by: Matías García Isaía --- config/dev.exs | 2 +- config/prod.exs | 2 +- config/test.exs | 4 +- lib/ask.ex | 36 +- lib/ask/channel_broker_queue.ex | 105 ++++ lib/ask/ecto_types/erlang_term.ex | 20 + lib/ask/pqueue.ex | 93 ---- lib/ask/runtime/channel_broker.ex | 82 +--- lib/ask/runtime/channel_broker_state.ex | 454 ++++++++++-------- ...0821100203_create_channel_broker_queue.exs | 32 ++ priv/repo/structure.sql | 32 +- test/ask/pqueue_test.exs | 107 ----- .../ask/runtime/channel_broker_state_test.exs | 441 +++++++++++++++-- test/ask/runtime/channel_broker_test.exs | 122 ++--- .../runtime/channel_status_server_test.exs | 8 + test/ask/runtime/session_test.exs | 55 ++- .../controllers/survey_controller_test.exs | 8 + test/support/channel_case.ex | 9 +- test/support/conn_case.ex | 9 +- test/support/data_case.ex | 9 +- test/support/database_cleaner.ex | 87 ++++ test/support/test_helpers.ex | 10 +- test/test_helper.exs | 2 - 23 files changed, 1117 insertions(+), 612 deletions(-) create mode 100644 lib/ask/channel_broker_queue.ex create mode 100644 lib/ask/ecto_types/erlang_term.ex delete mode 100644 lib/ask/pqueue.ex create mode 100644 priv/repo/migrations/20230821100203_create_channel_broker_queue.exs delete mode 100644 test/ask/pqueue_test.exs create mode 100644 test/support/database_cleaner.ex diff --git a/config/dev.exs b/config/dev.exs index 10e467358..30e218ddf 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -32,7 +32,7 @@ config :phoenix, :stacktrace_depth, 20 # Configure your database config :ask, Ask.Repo, - adapter: Ecto.Adapters.MySQL, + adapter: Ecto.Adapters.MyXQL, username: "root", password: "", database: "ask_dev", diff --git a/config/prod.exs b/config/prod.exs index 9ec9b4d92..e5c96ce5c 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -65,7 +65,7 @@ config :ask, AskWeb.Endpoint, secret_key_base: System.get_env("SECRET_KEY_BASE") # Configure your database config :ask, Ask.Repo, - adapter: Ecto.Adapters.MySQL, + adapter: Ecto.Adapters.MyXQL, username: System.get_env("DATABASE_USER") || "root", password: System.get_env("DATABASE_PASS") || "", database: System.get_env("DATABASE_NAME") || "ask", diff --git a/config/test.exs b/config/test.exs index 883b40141..6f894161e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -12,12 +12,12 @@ config :logger, level: :warn # Configure your database config :ask, Ask.Repo, - adapter: Ecto.Adapters.MySQL, + adapter: Ecto.Adapters.MyXQL, username: "root", password: "", database: "ask_test", hostname: System.get_env("DATABASE_HOST") || "localhost", - pool: Ecto.Adapters.SQL.Sandbox + pool_size: 10 config :ask, Ask.Runtime.SurveyBroker, batch_size: 10 diff --git a/lib/ask.ex b/lib/ask.ex index 9c2ddd8b9..676d987e0 100644 --- a/lib/ask.ex +++ b/lib/ask.ex @@ -27,20 +27,28 @@ defmodule Ask do ] children = - if Mix.env() != :test && !IEx.started?() do - [ - worker(Ask.OAuthTokenServer, []), - worker(Ask.Runtime.SurveyLogger, []), - worker(Ask.Runtime.SurveyBroker, []), - worker(Ask.FloipPusher, []), - worker(Ask.JsonSchema, []), - worker(Ask.Runtime.ChannelStatusServer, []), - worker(Ask.Config, []), - worker(Ask.Runtime.QuestionnaireSimulatorStore, []) - | children - ] - else - children + cond do + Mix.env() == :test -> + [ + worker(Ask.DatabaseCleaner, []) + | children + ] + + !IEx.started?() -> + [ + worker(Ask.OAuthTokenServer, []), + worker(Ask.Runtime.SurveyLogger, []), + worker(Ask.Runtime.SurveyBroker, []), + worker(Ask.FloipPusher, []), + worker(Ask.JsonSchema, []), + worker(Ask.Runtime.ChannelStatusServer, []), + worker(Ask.Config, []), + worker(Ask.Runtime.QuestionnaireSimulatorStore, []) + | children + ] + + true -> + children end # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/ask/channel_broker_queue.ex b/lib/ask/channel_broker_queue.ex new file mode 100644 index 000000000..2cc695ec4 --- /dev/null +++ b/lib/ask/channel_broker_queue.ex @@ -0,0 +1,105 @@ +defmodule Ask.ChannelBrokerQueue do + use Ask.Model + + alias Ask.ChannelBrokerQueue, as: Queue + alias Ask.Repo + + @primary_key false + + schema "channel_broker_queue" do + belongs_to :channel, Ask.Channel, primary_key: true + belongs_to :respondent, Ask.Respondent, primary_key: true + + # queued (pending): + field :queued_at, :utc_datetime + field :priority, Ecto.Enum, values: [low: 2, normal: 1, high: 0] + # number of outgoing messages (ivr=1, sms=1+) + field :size, :integer + field :token, :string + field :not_before, :utc_datetime + field :not_after, :utc_datetime + field :reply, Ask.Ecto.Type.ErlangTerm + + # sent (active): + field :last_contact, :utc_datetime + # number of active messages left (pending confirmation) + field :contacts, :integer + field :channel_state, Ask.Ecto.Type.ErlangTerm + end + + def upsert!(params) do + upsert_params = + params + |> Map.drop([:channel_id, :respondent_id]) + |> Map.put_new(:last_contact, nil) + |> Map.put_new(:contacts, nil) + |> Map.put_new(:channel_state, nil) + |> Map.to_list() + + %Queue{} + |> changeset(params) + |> Repo.insert!(on_conflict: [set: upsert_params]) + end + + def changeset(struct, params \\ %{}) do + struct + |> cast(params, [ + :channel_id, + :respondent_id, + :queued_at, + :priority, + :size, + :token, + :not_before, + :not_after, + :reply, + :last_contact, + :contacts, + :channel_state + ]) + |> validate_required([ + :channel_id, + :respondent_id, + :queued_at, + :priority, + :size, + :token + ]) + + # |> assoc_constraint(:channel, :respondent) + end + + def activable_contacts?(channel_id) do + # add leeway to activate contacts to be scheduled soon + not_before = Ask.SystemTime.time().now |> DateTime.add(60, :second) + + Repo.exists?( + from q in Queue, + where: + q.channel_id == ^channel_id and is_nil(q.last_contact) and + (is_nil(q.not_before) or q.not_before <= ^not_before) + ) + end + + def count_active_contacts(channel_id) do + Repo.one( + from q in Queue, + select: type(coalesce(sum(coalesce(q.contacts, 0)), 0), :integer), + where: q.channel_id == ^channel_id and not is_nil(q.last_contact) + ) + end + + def queued_contacts(channel_id) do + Repo.all( + from q in Ask.ChannelBrokerQueue, + where: q.channel_id == ^channel_id and is_nil(q.last_contact) + ) + end + + def active_contacts(channel_id) do + Repo.all( + from q in Ask.ChannelBrokerQueue, + where: q.channel_id == ^channel_id and not is_nil(q.last_contact) + ) + end +end diff --git a/lib/ask/ecto_types/erlang_term.ex b/lib/ask/ecto_types/erlang_term.ex new file mode 100644 index 000000000..b554205a0 --- /dev/null +++ b/lib/ask/ecto_types/erlang_term.ex @@ -0,0 +1,20 @@ +defmodule Ask.Ecto.Type.ErlangTerm do + @moduledoc """ + A custom Ecto type for handling the serialization of arbitrary + data types stored as binary data in the database. Requires the + underlying DB field to be a binary. + """ + use Ecto.Type + def type, do: :binary + + def cast(:any, term), do: {:ok, term} + def cast(term), do: {:ok, term} + + def load(binary) when is_binary(binary) do + {:ok, :erlang.binary_to_term(binary)} + end + + def dump(term) do + {:ok, :erlang.term_to_binary(term)} + end +end diff --git a/lib/ask/pqueue.ex b/lib/ask/pqueue.ex deleted file mode 100644 index 29c6ec133..000000000 --- a/lib/ask/pqueue.ex +++ /dev/null @@ -1,93 +0,0 @@ -# A basic priority queue list. Items are inserted with a priority (high, normal -# or low). -defmodule Ask.PQueue do - @type t :: map() - - @enforce_keys [:high, :normal, :low] - defstruct [:high, :normal, :low] - - @spec new() :: t - def new() do - %Ask.PQueue{high: [], normal: [], low: []} - end - - @spec push(t, any, :high | :normal | :low) :: t - def push(queue, item, priority) do - case priority do - :high -> - Map.put(queue, :high, [item | queue.high]) - :normal -> - Map.put(queue, :normal, [item | queue.normal]) - :low -> - Map.put(queue, :low, [item | queue.low]) - end - end - - @spec pop(t) :: {t, any | nil} - def pop(queue) do - cond do - length(queue.high) > 0 -> - [item | tail] = Enum.reverse(queue.high) - {Map.put(queue, :high, Enum.reverse(tail)), item} - - length(queue.normal) > 0 -> - [item | tail] = Enum.reverse(queue.normal) - {Map.put(queue, :normal, Enum.reverse(tail)), item} - - length(queue.low) > 0 -> - [item | tail] = Enum.reverse(queue.low) - {Map.put(queue, :low, Enum.reverse(tail)), item} - - true -> - {queue, nil} - end - end - - @spec each(t, (any -> any)) :: :ok - def each(queue, callback) do - Enum.each(queue.high, callback) - Enum.each(queue.normal, callback) - Enum.each(queue.low, callback) - end - - @spec delete(t, (any -> boolean)) :: t - def delete(queue, callback) do - cond do - index = Enum.find_index(queue.high, callback) -> - Map.put(queue, :high, List.delete_at(queue.high, index)) - - index = Enum.find_index(queue.normal, callback) -> - Map.put(queue, :normal, List.delete_at(queue.normal, index)) - - index = Enum.find_index(queue.low, callback) -> - Map.put(queue, :low, List.delete_at(queue.low, index)) - - true -> - queue - end - end - - @spec any?(t, (any -> boolean)) :: boolean - def any?(queue, callback) do - cond do - Enum.any?(queue.high, callback) -> true - Enum.any?(queue.normal, callback) -> true - Enum.any?(queue.low, callback) -> true - true -> false - end - end - - @spec empty?(t) :: boolean - def empty?(%{high: [], low: [], normal: []}), do: true - def empty?(_), do: false - - @spec len(t) :: non_neg_integer - def len(queue) do - length(queue.high) + length(queue.normal) + length(queue.low) - end - - @spec len(t, :high | :normal | :low) :: non_neg_integer - def len(queue, :high), do: length(queue.high) - def len(queue, :normal), do: length(queue.normal) - def len(queue, :low), do: length(queue.low) -end diff --git a/lib/ask/runtime/channel_broker.ex b/lib/ask/runtime/channel_broker.ex index 2044e55c4..84e84df1b 100644 --- a/lib/ask/runtime/channel_broker.ex +++ b/lib/ask/runtime/channel_broker.ex @@ -3,7 +3,7 @@ defmodule Ask.Runtime.ChannelBroker do alias Ask.Runtime.ChannelBrokerSupervisor alias Ask.Runtime.ChannelBrokerAgent, as: Agent alias Ask.Runtime.ChannelBrokerState, as: State - alias Ask.{Channel, Logger, PQueue} + alias Ask.{Channel, Logger} import Ecto.Query alias Ask.Repo use GenServer @@ -137,10 +137,11 @@ defmodule Ask.Runtime.ChannelBroker do info("init", channel_id: channel_id, channel_type: channel_type, settings: settings) state = - (Agent.recover_state(channel_id) || State.new(channel_id, settings)) + (Agent.recover_state(channel_id) || State.new(channel_id, channel_type, settings)) |> Map.put(:runtime_channel, runtime_channel) + |> info() - schedule_GC(channel_type, state) + schedule_GC(state) {:ok, state, State.process_timeout(state)} end @@ -163,7 +164,6 @@ defmodule Ask.Runtime.ChannelBroker do |> State.queue_contact(contact, size) |> try_activate_next_queued_contact() |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -187,7 +187,6 @@ defmodule Ask.Runtime.ChannelBroker do |> State.queue_contact(contact, 1) |> try_activate_next_queued_contact() |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -213,9 +212,7 @@ defmodule Ask.Runtime.ChannelBroker do |> refresh_runtime_channel() |> do_cancel_message(channel_state) |> State.deactivate_contact(respondent_id) - |> State.remove_from_queue(respondent_id) |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -239,7 +236,6 @@ defmodule Ask.Runtime.ChannelBroker do |> State.touch_last_contact(respondent.id) end |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -258,7 +254,6 @@ defmodule Ask.Runtime.ChannelBroker do |> State.decrement_respondents_contacts(respondent.id, 1) |> try_activate_next_queued_contact() |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -271,7 +266,6 @@ defmodule Ask.Runtime.ChannelBroker do state |> State.increment_respondents_contacts(respondent_id, size) |> Agent.save_state() - |> debug() {:noreply, new_state, State.process_timeout(new_state)} end @@ -279,7 +273,7 @@ defmodule Ask.Runtime.ChannelBroker do @impl true def handle_cast({:on_channel_settings_change, settings}, state) do debug("handle_cast[on_channel_settings_change]", settings: settings) - new_state = State.put_capacity(state, Map.get(settings, "capacity")) |> debug() + new_state = State.put_capacity(state, Map.get(settings, "capacity")) {:noreply, new_state, State.process_timeout(new_state)} end @@ -315,7 +309,7 @@ defmodule Ask.Runtime.ChannelBroker do respondent_id: respondent_id ) - reply = State.is_active(state, respondent_id) || State.is_queued(state, respondent_id) + reply = State.queued_or_active?(state, respondent_id) {:reply, reply, state, State.process_timeout(state)} end end @@ -355,19 +349,19 @@ defmodule Ask.Runtime.ChannelBroker do ChannelBrokerSupervisor.terminate_child(channel_id) end - def handle_info({:collect_garbage, channel_type}, state) do - info("handle_info[collect_garbage]", channel_type: channel_type, config: state.config) + def handle_info({:collect_garbage}, state) do + info("handle_info[collect_garbage]", channel_type: state.channel_type, config: state.config) - active_respondents = - from(r in "respondents", - where: r.id in ^Map.keys(state.active_contacts) and r.state == "active", - select: r.id + active_respondent_ids = + Repo.all( + from r in "respondents", + select: r.id, + where: r.id in ^State.active_respondent_ids(state) and r.state == "active" ) - |> Repo.all() new_state = state - |> State.clean_inactive_respondents(active_respondents) + |> State.clean_inactive_respondents(active_respondent_ids) |> refresh_runtime_channel() |> State.clean_outdated_respondents() |> activate_contacts() @@ -375,7 +369,7 @@ defmodule Ask.Runtime.ChannelBroker do |> info() # schedule next run - schedule_GC(channel_type, state) + schedule_GC(state) {:noreply, new_state, State.process_timeout(new_state)} end @@ -430,9 +424,7 @@ defmodule Ask.Runtime.ChannelBroker do State.deactivate_contact(new_state, respondent.id) future_call?(not_before) -> - new_state - |> State.deactivate_contact(respondent.id) - |> State.queue_contact({respondent, token, not_before, not_after}, 1, :low) + State.reenqueue_contact(new_state, respondent.id, :low) true -> ivr_call(new_state, respondent, token, not_before, not_after) @@ -467,10 +459,7 @@ defmodule Ask.Runtime.ChannelBroker do {:error, reason} -> Logger.warn("ChannelBroker: IVR call to Verboice failed with #{inspect(reason)}") - - state - |> State.deactivate_contact(respondent.id) - |> State.queue_contact({respondent, token, not_before, not_after}, 1) + State.reenqueue_contact(state, respondent.id) end end @@ -490,12 +479,12 @@ defmodule Ask.Runtime.ChannelBroker do # Don't schedule automatic GC runs in tests. if Mix.env() == :test do - defp schedule_GC(_, _), do: nil + defp schedule_GC(_), do: nil else - defp schedule_GC(channel_type, state) do + defp schedule_GC(state) do interval = State.gc_interval(state) - debug("schedule_GC", channel_type: channel_type, interval: interval) - Process.send_after(self(), {:collect_garbage, channel_type}, interval) + debug("schedule_GC", channel_type: state.channel_type, interval: interval) + Process.send_after(self(), {:collect_garbage}, interval) end end @@ -516,41 +505,20 @@ defmodule Ask.Runtime.ChannelBroker do # Log helpers - defp info(name, options) do - Logger.info( - "ChannelBroker.#{name}:#{Enum.map(options, fn {k, v} -> " #{k}=#{inspect(v)}" end)}" - ) - end - defp debug(name, options) do Logger.debug( "ChannelBroker.#{name}:#{Enum.map(options, fn {k, v} -> " #{k}=#{inspect(v)}" end)}" ) end - defp debug(%State{} = state) do - debug("State", - channel: state.channel_id, - active: map_size(state.active_contacts), - queued: PQueue.len(state.contacts_queue), - queued_high: PQueue.len(state.contacts_queue, :high), - queued_normal: PQueue.len(state.contacts_queue, :normal), - queued_low: PQueue.len(state.contacts_queue, :low) + defp info(name, options) do + Logger.info( + "ChannelBroker.#{name}:#{Enum.map(options, fn {k, v} -> " #{k}=#{inspect(v)}" end)}" ) - - state end defp info(%State{} = state) do - info("State", - channel: state.channel_id, - active: map_size(state.active_contacts), - queued: PQueue.len(state.contacts_queue), - queued_high: PQueue.len(state.contacts_queue, :high), - queued_normal: PQueue.len(state.contacts_queue, :normal), - queued_low: PQueue.len(state.contacts_queue, :low) - ) - + info("State", State.statistics(state)) state end end diff --git a/lib/ask/runtime/channel_broker_state.ex b/lib/ask/runtime/channel_broker_state.ex index cc7072501..d41081a01 100644 --- a/lib/ask/runtime/channel_broker_state.ex +++ b/lib/ask/runtime/channel_broker_state.ex @@ -1,5 +1,8 @@ defmodule Ask.Runtime.ChannelBrokerState do - alias Ask.{Config, PQueue, SystemTime} + import Ecto.Query + + alias Ask.{Config, Repo, SystemTime} + alias Ask.ChannelBrokerQueue, as: Queue @enforce_keys [ :channel_id, @@ -10,6 +13,7 @@ defmodule Ask.Runtime.ChannelBrokerState do defstruct [ # Each ChannelBroker process manages a single channel. :channel_id, + :channel_type, # Each ChannelBroker have the sole responsibility of interacting with their # Ask.Runtime.Channel @@ -19,41 +23,15 @@ defmodule Ask.Runtime.ChannelBrokerState do :capacity, # See `Config.channel_broker_config/0` - :config, - - # A dictionary of active contacts with the following shape: - # ``` - # %{respondent_id => %{ - # contacts: Integer, - # last_contact: DateTime, - # channel_state: %{String.t => any()} - # }} - # ``` - # - # Where: - # - contacts: number of contacts being currently managed by the channel (ivr=1, sms=1+) - # - last_contact: timestamp of the last sent contact or received callback - # - channel_state: identifier(s) for the contact on the channel - active_contacts: %{}, - - # A priority queue. - # - # When a contact is queued, the received params are stored to be used when the time - # to make the contact comes. - # - # Elements are tuples whose shape depend on the channel provider: - # - Verboice: `{respondent, token, not_before, not_after}` - # - Nuntium: `{respondent, token, reply}` - contacts_queue: PQueue.new() + :config ] - def new(channel_id, settings) do - config = Config.channel_broker_config() - + def new(channel_id, channel_type, settings) do %Ask.Runtime.ChannelBrokerState{ channel_id: channel_id, + channel_type: channel_type, capacity: Map.get(settings, "capacity", Config.default_channel_capacity()), - config: config + config: Config.channel_broker_config() } end @@ -80,201 +58,234 @@ defmodule Ask.Runtime.ChannelBrokerState do state.config.gc_active_idle_minutes * 60 end - def is_active(state, respondent_id) do - state.active_contacts - |> Map.has_key?(respondent_id) - end - - def put_channel_state(state, respondent_id, channel_state) do - update_active_contact(state, respondent_id, fn active_contact -> - Map.put(active_contact, :channel_state, channel_state) - end) + # Returns true when there are neither active nor queued contacts (idle state). + def inactive?(state) do + !Repo.exists?( + from q in Queue, + where: q.channel_id == ^state.channel_id + ) end - def get_channel_state(state, respondent_id) do - case Map.get(state.active_contacts, respondent_id) do - %{channel_state: channel_state} -> channel_state - # TODO: shall we raise instead? - _ -> %{} - end + # Returns true if a respondent is currently in queue (active or not). + def queued_or_active?(state, respondent_id) do + Repo.exists?( + from q in Queue, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + ) end # Adds a contact to the queue. The priority is set from the respondent's # disposition. def queue_contact(state, contact, size) do respondent = elem(contact, 0) - priority = if respondent.disposition == :queued, do: :normal, else: :high - queue_contact(state, contact, size, priority) + + if respondent.disposition == :queued do + queue_contact(state, contact, size, :normal) + else + queue_contact(state, contact, size, :high) + end end - # Adds a contact to the queue with given priority (`:high`, `:normal`, `:low`). - def queue_contact(state, contact, size, priority) do - new_contacts_queue = PQueue.push(state.contacts_queue, [size, contact], priority) - Map.put(state, :contacts_queue, new_contacts_queue) + # Adds an IVR contact to the queue with given priority (`:high`, `:normal`, `:low`). + def queue_contact(state, {respondent, token, not_before, not_after}, size, priority) do + Queue.upsert!(%{ + channel_id: state.channel_id, + respondent_id: respondent.id, + queued_at: SystemTime.time().now, + priority: priority, + size: size, + token: token, + not_before: not_before, + not_after: not_after, + reply: nil + }) + + state end - # Updates the active contact for the respondent. Does nothing if there are - # no active contact for this respondent. - defp update_active_contact(%{active_contacts: active_contacts} = state, respondent_id, cb) do - if active_contact = Map.get(active_contacts, respondent_id) do - new_active_contacts = Map.put(active_contacts, respondent_id, cb.(active_contact)) - Map.put(state, :active_contacts, new_active_contacts) - else - state - end + # Adds an SMS contact to the queue with given priority (`:high`, `:normal`, `:low`). + def queue_contact(state, {respondent, token, reply}, size, priority) do + Queue.upsert!(%{ + channel_id: state.channel_id, + respondent_id: respondent.id, + queued_at: SystemTime.time().now, + priority: priority, + size: size, + token: token, + not_before: nil, + not_after: nil, + reply: reply + }) + + state + end + + def put_channel_state(state, respondent_id, channel_state) do + query = + from q in Queue, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + + Repo.update_all(query, set: [channel_state: channel_state]) + state end - # Touches the :last_contact attribute for a respondent. Does nothing if the - # respondent can't be found. + def get_channel_state(state, respondent_id) do + channel_state = + Repo.one( + from q in Queue, + select: q.channel_state, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + ) + + channel_state || %{} + end + + # Touches the `last_contact` attribute for a respondent. Assumes that the + # respondent has already been contacted. Does nothing if the respondent can't + # be found. def touch_last_contact(state, respondent_id) do - update_active_contact(state, respondent_id, fn active_contact -> - Map.put(active_contact, :last_contact, SystemTime.time().now) - end) + query = + from q in Queue, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + + Repo.update_all(query, set: [last_contact: SystemTime.time().now]) + state end # Activates the next contact from the queue. There must be at least one # contact currently waiting in queue! - def activate_next_in_queue(%{active_contacts: active_contacts} = state) do - {new_contacts_queue, [size, unqueued_item]} = PQueue.pop(state.contacts_queue) - respondent_id = queued_respondent_id(unqueued_item) - - active_contact = - active_contacts - |> Map.get(respondent_id, %{contacts: 0}) + def activate_next_in_queue(state) do + # add leeway to activate contacts to be scheduled soon: + not_before = SystemTime.time().now |> DateTime.add(60, :second) + + contact = + Repo.one!( + from q in Queue, + where: + q.channel_id == ^state.channel_id and is_nil(q.last_contact) and + (q.not_before <= ^not_before or is_nil(q.not_before)), + order_by: [q.priority, q.queued_at], + preload: [:respondent], + limit: 1 + ) - new_active_contact = - active_contact - |> Map.put(:contacts, active_contact.contacts + size) - |> Map.put(:last_contact, SystemTime.time().now) + contact + |> Queue.changeset(%{ + contacts: contact.size, + last_contact: SystemTime.time().now + }) + |> Repo.update() - new_active_contacts = - active_contacts - |> Map.put(respondent_id, new_active_contact) + {state, to_item(state.channel_type, contact)} + end - new_state = - state - |> Map.put(:contacts_queue, new_contacts_queue) - |> Map.put(:active_contacts, new_active_contacts) + defp to_item("ivr", contact) do + {contact.respondent, contact.token, contact.not_before, contact.not_after} + end - {new_state, unqueued_item} + defp to_item("sms", contact) do + {contact.respondent, contact.token, contact.reply} end # Increments the number of contacts for the respondent. Activates the contact # if it wasn't already. - def increment_respondents_contacts( - %{active_contacts: active_contacts} = state, - respondent_id, - size - ) do - active_contact = - active_contacts - |> Map.get(respondent_id, %{contacts: 0}) - - new_active_contact = - active_contact - |> Map.put(:contacts, active_contact.contacts + size) - |> Map.put(:last_contact, SystemTime.time().now) - - new_active_contacts = - active_contacts - |> Map.put(respondent_id, new_active_contact) - - Map.put(state, :active_contacts, new_active_contacts) + def increment_respondents_contacts(state, respondent_id, size) do + query = + from q in Queue, + update: [ + set: [ + contacts: coalesce(q.contacts, 0) + ^size, + last_contact: ^SystemTime.time().now + ] + ], + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + + Repo.update_all(query, []) + + state end # Decrements the number of contacts for the respondent. Does nothing if the # respondent isn't an active contact. Deactivates the respondent if the number # of contacts falls down to zero. - def decrement_respondents_contacts( - %{active_contacts: active_contacts} = state, - respondent_id, - size - ) do - active_contact = - active_contacts - |> Map.get(respondent_id) - - if active_contact do - new_value = active_contact.contacts - size - - if new_value <= 0 do - deactivate_contact(state, respondent_id) - else - new_active_contact = - active_contact - |> Map.put(:contacts, new_value) - |> Map.put(:last_contact, SystemTime.time().now) - - new_active_contacts = - active_contacts - |> Map.put(respondent_id, new_active_contact) - - Map.put(state, :active_contacts, new_active_contacts) - end - else - state - end + def decrement_respondents_contacts(state, respondent_id, size) do + query = + from q in Queue, + update: [ + set: [ + contacts: coalesce(q.contacts, 0) - ^size, + last_contact: ^SystemTime.time().now + ] + ], + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + + Repo.update_all(query, []) + + Repo.delete_all( + from q in Queue, + where: + q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id and + not is_nil(q.contacts) and q.contacts <= 0 + ) + + state end + # Deactivates a contact and removes them from the queue. def deactivate_contact(state, respondent_id) do - respondent_contacts = contacts_for(state, respondent_id) - - if respondent_contacts > 1 do - update_active_contact(state, respondent_id, fn active_contact -> - active_contact - |> Map.put(:contacts, respondent_contacts - 1) - |> Map.put(:last_contact, SystemTime.time().now) - end) - else - state - |> Map.put(:active_contacts, Map.delete(state.active_contacts, respondent_id)) - end + Repo.delete_all( + from q in Queue, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + ) + + state end - # Returns the current number of active contacts for the respondent. - defp contacts_for(state, respondent_id) do - case Map.get(state.active_contacts, respondent_id) do - %{contacts: contacts} -> contacts - _ -> 0 - end + # Deactivates a contact and puts them back into the queue. + def reenqueue_contact(state, respondent_id, priority \\ :normal) do + query = + from q in Queue, + where: q.channel_id == ^state.channel_id and q.respondent_id == ^respondent_id + + Repo.update_all(query, + set: [ + queued_at: SystemTime.time().now, + priority: priority, + contacts: nil, + last_contact: nil, + channel_state: nil + ] + ) + + state end + # Returns true if we can active a new contact: there are pending contacts in + # queue that we can activate, and the channel is under capacity. def can_unqueue(state) do - cond do - PQueue.empty?(state.contacts_queue) -> false - activable_contacts?(state.contacts_queue) -> under_capacity?(state) - true -> false + if Queue.activable_contacts?(state.channel_id) do + under_capacity?(state) + else + false end end - defp activable_contacts?(queue) do - PQueue.any?(queue, fn [_, item] -> - case item do - {_, _, not_before, _} -> Date.compare(not_before, SystemTime.time().now) != :gt - _ -> true - end - end) - end - + # OPTIMIZE: Cache the number of active contacts into the state to heavily + # reduce the number of COUNT(*) queries that tend to be slow. + # + # on load: count from the database & cache the value + # on activate: increment the number (by size) + # on increment/decrement/deactivate/reenqueue: update the count defp under_capacity?(state) do - count_active_contacts(state) < state.capacity + Queue.count_active_contacts(state.channel_id) < state.capacity end - defp count_active_contacts(state) do - state.active_contacts - |> Enum.reduce(0, fn {_, %{contacts: contacts}}, acc -> contacts + acc end) - end - - def is_queued(state, respondent_id) do - state.contacts_queue - |> PQueue.any?(fn [_, item] -> queued_respondent_id(item) == respondent_id end) - end - - def remove_from_queue(state, respondent_id) do - new_contacts_queue = - state.contacts_queue - |> PQueue.delete(fn [_, item] -> queued_respondent_id(item) == respondent_id end) - Map.put(state, :contacts_queue, new_contacts_queue) + def active_respondent_ids(state) do + Repo.all( + from q in Queue, + select: q.respondent_id, + where: q.channel_id == ^state.channel_id and not is_nil(q.last_contact) + ) end # Keep only the contacts for active respondents. @@ -284,45 +295,64 @@ defmodule Ask.Runtime.ChannelBrokerState do # # FIXME: understand why Surveda would know about a contact having failed but # the channel broker wouldn't have been notified?! - def clean_inactive_respondents(state, active_respondents) do - # TODO: Elixir 1.13 has Map.filter/2 - new_active_contacts = - :maps.filter( - fn respondent_id, _ -> respondent_id in active_respondents end, - state.active_contacts - ) - - Map.put(state, :active_contacts, new_active_contacts) + def clean_inactive_respondents(state, active_respondent_ids) do + Repo.delete_all( + from q in Queue, + where: + q.channel_id == ^state.channel_id and not (q.respondent_id in ^active_respondent_ids) and + not is_nil(q.last_contact) + ) + + state end - # For leftover active contacts, we ask the remote channel for the actual IVR - # call or SMS message state. Keep only the contacts that are active or queued. + # For idle contacts, we ask the remote channel for the actual IVR call or SMS + # message state. Keep only the contacts that are active or queued. + # + # FIXME: this may take a while, and during that time the channel broker + # won't process its mailbox, if it ever becomes a problem, we might + # consider: + # + # - only process a random N number of idle contacts on each call + # - run the task in its own concurrent process def clean_outdated_respondents(state) do idle_time = gc_allowed_idle_time(state) - now = SystemTime.time().now - - # TODO: Elixir 1.13 has Map.filter/2 - new_active_contacts = - :maps.filter( - fn _, %{last_contact: last_contact} = active_contact -> - if DateTime.diff(now, last_contact, :second) < idle_time do - true - else - !Ask.Runtime.Channel.message_inactive?(state.runtime_channel, active_contact.channel_state) - end - end, - state.active_contacts - ) + last_contact = SystemTime.time().now |> DateTime.add(-idle_time, :second) + + query = + from q in Queue, + select: [:channel_id, :respondent_id, :channel_state], + where: q.channel_id == ^state.channel_id and q.last_contact < ^last_contact + + Repo.all(query) + |> Enum.each(fn active_contact -> + if Ask.Runtime.Channel.message_inactive?( + state.runtime_channel, + active_contact.channel_state + ) do + Repo.delete(active_contact) + end + end) - Map.put(state, :active_contacts, new_active_contacts) + state end - defp queued_respondent_id(queued_item) do - elem(queued_item, 0).id - end + def statistics(state) do + queued = + Repo.all( + from q in Queue, + select: {q.priority, count()}, + where: q.channel_id == ^state.channel_id and is_nil(q.last_contact), + group_by: q.priority + ) - # Returns true when there are neither active nor queued contacts (idle state). - def inactive?(state) do - map_size(state.active_contacts) == 0 && PQueue.empty?(state.contacts_queue) + [ + channel: state.channel_id, + active: Queue.count_active_contacts(state.channel_id), + queued: Enum.reduce(queued, 0, fn {_, count}, a -> a + count end), + queued_low: queued[:low] || 0, + queued_normal: queued[:normal] || 0, + queued_high: queued[:high] || 0 + ] end end diff --git a/priv/repo/migrations/20230821100203_create_channel_broker_queue.exs b/priv/repo/migrations/20230821100203_create_channel_broker_queue.exs new file mode 100644 index 000000000..5fa2d4fe3 --- /dev/null +++ b/priv/repo/migrations/20230821100203_create_channel_broker_queue.exs @@ -0,0 +1,32 @@ +defmodule Ask.Repo.Migrations.CreateChannelBrokerQueue do + use Ecto.Migration + + def up do + create table(:channel_broker_queue, primary_key: false) do + add :channel_id, :"bigint unsigned", null: false, primary_key: true + add :respondent_id, :"bigint unsigned", null: false, primary_key: true + + # queued (pending): + add :queued_at, :utc_datetime, null: false + add :priority, :tinyint, null: false + add :size, :integer, null: false + add :token, :string, null: false + add :not_before, :utc_datetime + add :not_after, :utc_datetime + add :reply, :binary + + # sent (active): + add :last_contact, :utc_datetime + add :contacts, :integer + add :channel_state, :binary + end + + index(:channel_broker_queue, [:priority, :queued_at]) + index(:channel_broker_queue, [:not_before]) + index(:channel_broker_queue, [:last_contact]) + end + + def down do + drop table(:channel_broker_queue) + end +end diff --git a/priv/repo/structure.sql b/priv/repo/structure.sql index e51f51a08..1b6cb5e7e 100644 --- a/priv/repo/structure.sql +++ b/priv/repo/structure.sql @@ -1,8 +1,8 @@ --- MySQL dump 10.19 Distrib 10.3.34-MariaDB, for debian-linux-gnu (x86_64) +-- MySQL dump 10.19 Distrib 10.3.38-MariaDB, for debian-linux-gnu (x86_64) -- -- Host: db Database: ask_dev -- ------------------------------------------------------ --- Server version 5.7.37 +-- Server version 5.7.41 /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; @@ -63,6 +63,30 @@ CREATE TABLE `audios` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; +-- +-- Table structure for table `channel_broker_queue` +-- + +DROP TABLE IF EXISTS `channel_broker_queue`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `channel_broker_queue` ( + `channel_id` bigint(20) unsigned NOT NULL, + `respondent_id` bigint(20) unsigned NOT NULL, + `queued_at` datetime NOT NULL, + `priority` tinyint(4) NOT NULL, + `size` int(11) NOT NULL, + `token` varchar(255) NOT NULL, + `not_before` datetime DEFAULT NULL, + `not_after` datetime DEFAULT NULL, + `reply` blob, + `last_contact` datetime DEFAULT NULL, + `contacts` int(11) DEFAULT NULL, + `channel_state` blob, + PRIMARY KEY (`channel_id`,`respondent_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +/*!40101 SET character_set_client = @saved_cs_client */; + -- -- Table structure for table `channels` -- @@ -988,7 +1012,7 @@ CREATE TABLE `users` ( /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2023-04-13 8:17:16 +-- Dump completed on 2023-08-28 11:00:34 INSERT INTO `schema_migrations` (version) VALUES (20160812145257); INSERT INTO `schema_migrations` (version) VALUES (20160816183915); INSERT INTO `schema_migrations` (version) VALUES (20160830200454); @@ -1204,4 +1228,6 @@ INSERT INTO `schema_migrations` (version) VALUES (20220131103226); INSERT INTO `schema_migrations` (version) VALUES (20230217143550); INSERT INTO `schema_migrations` (version) VALUES (20230317094712); INSERT INTO `schema_migrations` (version) VALUES (20230402091100); +INSERT INTO `schema_migrations` (version) VALUES (20230405111657); INSERT INTO `schema_migrations` (version) VALUES (20230413101342); +INSERT INTO `schema_migrations` (version) VALUES (20230821100203); diff --git a/test/ask/pqueue_test.exs b/test/ask/pqueue_test.exs deleted file mode 100644 index 8e2d88822..000000000 --- a/test/ask/pqueue_test.exs +++ /dev/null @@ -1,107 +0,0 @@ -defmodule Ask.PQueueTest do - use ExUnit.Case - alias Ask.PQueue - - test "push and pop" do - queue = - PQueue.new() - |> PQueue.push(1, :low) - |> PQueue.push(2, :high) - |> PQueue.push(3, :normal) - refute PQueue.empty?(queue) - - assert {queue, 2} = PQueue.pop(queue) - assert {queue, 3} = PQueue.pop(queue) - assert {queue, 1} = PQueue.pop(queue) - assert {^queue, nil} = PQueue.pop(queue) - assert PQueue.empty?(queue) - end - - test "delete" do - queue = PQueue.new() - assert queue == queue |> PQueue.delete(fn _ -> raise "unreachable" end) - - queue = - PQueue.new() - |> PQueue.push(1, :high) - |> PQueue.push(2, :low) - |> PQueue.push(3, :normal) - - q1 = PQueue.delete(queue, fn item -> item == 1 end) - refute PQueue.any?(q1, fn item -> item == 1 end) - assert PQueue.any?(q1, fn item -> item == 2 end) - assert PQueue.any?(q1, fn item -> item == 3 end) - - q2 = PQueue.delete(queue, fn item -> item == 2 end) - assert PQueue.any?(q2, fn item -> item == 1 end) - refute PQueue.any?(q2, fn item -> item == 2 end) - assert PQueue.any?(q2, fn item -> item == 3 end) - - q3 = PQueue.delete(queue, fn item -> item == 3 end) - assert PQueue.any?(q3, fn item -> item == 1 end) - assert PQueue.any?(q3, fn item -> item == 2 end) - refute PQueue.any?(q3, fn item -> item == 3 end) - end - - test "each" do - assert :ok == PQueue.new() |> PQueue.each(fn _ -> raise "unreachable" end) - - queue = - PQueue.new() - |> PQueue.push(1, :high) - |> PQueue.push(2, :low) - |> PQueue.push(3, :normal) - - # TODO: how to verify that the callback is really called? - PQueue.each(queue, fn item -> - assert item == 1 || item == 2 || item == 3 - end) - end - - test "any?" do - refute PQueue.new() |> PQueue.any?(fn _ -> raise "unreachable" end) - - queue = - PQueue.new() - |> PQueue.push(1, :high) - |> PQueue.push(2, :low) - |> PQueue.push(3, :normal) - - assert queue |> PQueue.any?(fn item -> item == 1 end) - assert queue |> PQueue.any?(fn item -> item == 2 end) - assert queue |> PQueue.any?(fn item -> item == 3 end) - refute queue |> PQueue.any?(fn item -> item == 4 end) - end - - test "empty?" do - assert PQueue.new() |> PQueue.empty?() - refute PQueue.new() |> PQueue.push(1, :high) |> PQueue.empty?() - refute PQueue.new() |> PQueue.push(1, :normal) |> PQueue.empty?() - refute PQueue.new() |> PQueue.push(1, :low) |> PQueue.empty?() - end - - test "len" do - queue = PQueue.new() - assert 0 == PQueue.len(queue) - - assert 1 == PQueue.len(queue = PQueue.push(queue, 1, :high)) - assert 1 == PQueue.len(queue, :high) - assert 0 == PQueue.len(queue, :normal) - assert 0 == PQueue.len(queue, :low) - - assert 2 == PQueue.len(queue = PQueue.push(queue, 1, :high)) - assert 2 == PQueue.len(queue, :high) - assert 0 == PQueue.len(queue, :normal) - assert 0 == PQueue.len(queue, :low) - - assert 3 == PQueue.len(queue = PQueue.push(queue, 1, :normal)) - assert 2 == PQueue.len(queue, :high) - assert 1 == PQueue.len(queue, :normal) - assert 0 == PQueue.len(queue, :low) - - assert 4 == PQueue.len(queue = PQueue.push(queue, 1, :low)) - assert 2 == PQueue.len(queue, :high) - assert 1 == PQueue.len(queue, :normal) - assert 1 == PQueue.len(queue, :low) - end -end diff --git a/test/ask/runtime/channel_broker_state_test.exs b/test/ask/runtime/channel_broker_state_test.exs index c250b8d42..5518fb1e8 100644 --- a/test/ask/runtime/channel_broker_state_test.exs +++ b/test/ask/runtime/channel_broker_state_test.exs @@ -1,49 +1,436 @@ defmodule Ask.Runtime.ChannelBrokerStateTest do use AskWeb.ConnCase use Ask.TestHelpers + use Ask.MockTime + + alias Ask.ChannelBrokerQueue, as: Queue alias Ask.Runtime.ChannelBrokerState, as: State setup do - mock_queued_contact = fn respondent_id, params, disposition -> - {%{id: respondent_id, disposition: disposition}, params} + {:ok, state: State.new(0, "ivr", %{"capacity" => 4})} + end + + describe ".inactive?" do + test "returns true when empty", %{state: state} do + assert state + |> State.inactive?() + end + + test "returns false if any pending contact", %{state: state} do + refute state + |> State.queue_contact(new_contact(2), 1) + |> State.inactive?() end - {:ok, state: State.new(0, %{}), mock_queued_contact: mock_queued_contact} + test "returns false if any active contact", %{state: state} do + refute state + |> State.queue_contact(new_contact(2), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.inactive?() + end end - test "queues contact", %{state: state, mock_queued_contact: mqc} do - respondent_id = 2 - params = 3 - disposition = 4 - contact = mqc.(respondent_id, params, disposition) - size = 5 + describe ".queued_or_active?" do + test "returns true if respondent in queue", %{state: state} do + assert state + |> State.queue_contact(new_contact(2), 1) + |> State.queued_or_active?(2) + end - %{contacts_queue: q} = State.queue_contact(state, contact, size) + test "returns true if respondent is active", %{state: state} do + assert state + |> State.queue_contact(new_contact(2), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.queued_or_active?(2) + end - {_, [queud_size, queued_contact]} = Ask.PQueue.pop(q) - assert queued_contact == {%{id: respondent_id, disposition: disposition}, params} - assert queud_size == size + test "returns false when empty", %{state: state} do + refute state + |> State.queued_or_active?(2) + end end - test "removes the respondent", %{state: state, mock_queued_contact: mqc} do - respondent_id = 2 - contact = mqc.(respondent_id, 3, 4) - size = 5 - state = State.queue_contact(state, contact, size) + describe ".queue_contact" do + @tag :time_mock + test "queues ivr contact", %{state: state} do + now = DateTime.utc_now() + mock_time(now) + State.queue_contact(state, {%{id: 2, disposition: :queued}, "secret", nil, nil}, 5) + + assert [%{respondent_id: 2, size: 5, queued_at: now}] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end + + @tag :time_mock + test "queues sms contact" do + now = DateTime.utc_now() + mock_time(now) + + State.new(0, "sms", %{}) + |> State.queue_contact({%{id: 2, disposition: :queued}, "secret", []}, 5) + + assert [%{respondent_id: 2, size: 5, queued_at: now, reply: []}] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end - %{contacts_queue: q} = State.remove_from_queue(state, respondent_id) + test "sets priority from respondent's disposition", %{state: state} do + state + |> State.queue_contact(new_contact(1, :queued), 1) + |> State.queue_contact(new_contact(2, :started), 1) - assert Ask.PQueue.empty?(q) + assert [ + %{respondent_id: 1, priority: :normal}, + %{respondent_id: 2, priority: :high} + ] = Queue.queued_contacts(0) + + assert [] = Queue.active_contacts(0) + end + end + + test "channel state", %{state: state} do + channel_state = + state + |> State.queue_contact(new_contact(1), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.put_channel_state(1, %{"verboice_id" => 123}) + |> State.get_channel_state(1) + + assert channel_state == %{"verboice_id" => 123} + + channel_state = + state + |> State.queue_contact(new_contact(2), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.get_channel_state(2) + + assert channel_state == %{} end - test "doesn't removes other respondent", %{state: state, mock_queued_contact: mqc} do - respondent_id = 2 - contact = mqc.(respondent_id, 3, 4) - size = 5 - state = State.queue_contact(state, contact, size) + describe ".touch_last_contact" do + test "", %{state: state} do + state + |> State.queue_contact(new_contact(1), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.touch_last_contact(1) + + assert [%{respondent_id: 1}] = Queue.active_contacts(0) + end + end + + describe ".can_unqueue" do + test "returns false when no pending contacts", %{state: state} do + refute state + |> State.can_unqueue() + end + + test "returns false when non activable contacts", %{state: state} do + refute state + |> State.queue_contact( + new_contact(1, :queued, DateTime.utc_now() |> DateTime.add(7200, :second)), + 1 + ) + |> State.can_unqueue() + end + + test "returns true when activable contacts (not_before <= now)", %{state: state} do + assert state + |> State.queue_contact( + new_contact(1, :queued, DateTime.utc_now() |> DateTime.add(-1, :second)), + 1 + ) + |> State.can_unqueue() + end + + test "returns true when activable contacts (not_before <= now + leeway)", %{state: state} do + assert state + |> State.queue_contact( + new_contact(1, :queued, DateTime.utc_now() |> DateTime.add(50, :second)), + 1 + ) + |> State.can_unqueue() + end + + test "returns true when activable contacts (not_before is null)", %{state: state} do + assert state + |> State.queue_contact(new_contact(1, nil), 1) + |> State.can_unqueue() + end + + test "returns false when capacity is reached", %{state: state} do + refute state + |> State.queue_contact(new_contact(1), 1) + |> State.activate_next_in_queue() + |> elem(0) + |> State.queue_contact(new_contact(2), 2) + |> State.activate_next_in_queue() + |> elem(0) + |> State.queue_contact(new_contact(3), 3) + |> State.activate_next_in_queue() + |> elem(0) + |> State.queue_contact(new_contact(4), 1) + |> State.can_unqueue() + end + end + + describe ".activate_next_in_queue" do + @tag :time_mock + test "activates contact", %{state: state} do + now = DateTime.utc_now() + mock_time(now) + + state + |> State.queue_contact(new_contact(1), 2) + |> State.activate_next_in_queue() + + assert [ + %{ + respondent_id: 1, + contacts: 2, + last_contact: now + } + ] = Queue.active_contacts(0) + end + + test "returns unqueued ivr contact" do + respondent = insert(:respondent) + respondent_id = respondent.id + + {_, contact} = + State.new(0, "ivr", %{}) + |> State.queue_contact({respondent, "secret", nil, nil}, 2) + |> State.activate_next_in_queue() + + assert {%Ask.Respondent{id: ^respondent_id}, "secret", nil, nil} = contact + end + + test "returns unqueued sms contact" do + respondent = insert(:respondent) + respondent_id = respondent.id + + {_, contact} = + State.new(0, "sms", %{}) + |> State.queue_contact({respondent, "secret", []}, 2) + |> State.activate_next_in_queue() + + assert {%Ask.Respondent{id: ^respondent_id}, "secret", []} = contact + end + + @tag :time_mock + test "activates contacts by priority then queued time", %{state: state} do + now = DateTime.utc_now() + + # queue a new contact every second + mock_time(now) + state = State.queue_contact(state, new_contact(1), 1, :normal) + + mock_time(DateTime.add(now, 1, :second)) + state = State.queue_contact(state, new_contact(2), 1, :high) + + mock_time(DateTime.add(now, 2, :second)) + state = State.queue_contact(state, new_contact(3), 1, :low) + + mock_time(DateTime.add(now, 3, :second)) + state = State.queue_contact(state, new_contact(4), 1, :normal) + + mock_time(DateTime.add(now, 5, :second)) + state = State.queue_contact(state, new_contact(5), 1, :low) + + # 1. high priority + {state, _} = State.activate_next_in_queue(state) - new_state = State.remove_from_queue(state, 6) + assert [2] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + # 2. normal priority queued 1st + {state, _} = State.activate_next_in_queue(state) + + assert [1, 2] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + # 3. normal priority queued 2nd + {state, _} = State.activate_next_in_queue(state) + + assert [1, 2, 4] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + # 4. low priority queued 1st + {_, _} = State.activate_next_in_queue(state) + + assert [1, 2, 3, 4] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + assert [%{respondent_id: 5}] = Queue.queued_contacts(0) + end + + @tag :time_mock + test "skips respondent until not_before <= now + leeway", %{state: state} do + now = DateTime.utc_now() + mock_time(now) + + state + |> State.queue_contact(new_contact(1, :queued, DateTime.add(now, 90, :second)), 1) + |> State.queue_contact(new_contact(2, :queued, DateTime.add(now, 80, :second)), 1) + |> State.queue_contact(new_contact(3), 1) + + mock_time(DateTime.add(now, 15, :second)) + {state, _} = State.activate_next_in_queue(state) + + assert [3] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + mock_time(DateTime.add(now, 25, :second)) + {state, _} = State.activate_next_in_queue(state) + + assert [2, 3] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + + mock_time(DateTime.add(now, 35, :second)) + {_, _} = State.activate_next_in_queue(state) + + assert [1, 2, 3] = + Queue.active_contacts(0) |> Enum.map(fn c -> c.respondent_id end) |> Enum.sort() + end + end + + describe ".deactivate_contact" do + test "removes from the queue", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.deactivate_contact(2) + + assert [] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end + + test "removes from the active queue", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.activate_next_in_queue() + |> elem(0) + |> State.deactivate_contact(2) + + assert [] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end + + test "silently fails for unknown respondent", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.deactivate_contact(6) + + assert [%{respondent_id: 2}] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end + end + + describe ".increment_respondents_contacts" do + test "increments the number of contacts for a respondent", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.activate_next_in_queue() + |> elem(0) + |> State.increment_respondents_contacts(2, 3) + + assert [] = Queue.queued_contacts(0) + assert [%{respondent_id: 2, contacts: 8}] = Queue.active_contacts(0) + end + + test "activates the contact", %{state: state} do + state + |> State.queue_contact(new_contact(2), 3) + |> State.increment_respondents_contacts(2, 1) + + assert [] = Queue.queued_contacts(0) + assert [%{respondent_id: 2, contacts: 1}] = Queue.active_contacts(0) + end + + test "silently fails for unknown respondent", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.activate_next_in_queue() + |> elem(0) + |> State.increment_respondents_contacts(6, 1) + + assert [] = Queue.queued_contacts(0) + assert [%{respondent_id: 2, contacts: 5}] = Queue.active_contacts(0) + end + end + + describe ".decrement_respondents_contacts" do + test "decrements the number of contacts for a respondent", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.activate_next_in_queue() + |> elem(0) + |> State.decrement_respondents_contacts(2, 3) + + assert [] = Queue.queued_contacts(0) + assert [%{respondent_id: 2, contacts: 2}] = Queue.active_contacts(0) + end + + test "deletes the contact when number of contacts reaches zero (or less)", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.queue_contact(new_contact(4), 3) + |> State.activate_next_in_queue() + |> elem(0) + |> State.activate_next_in_queue() + |> elem(0) + |> State.decrement_respondents_contacts(2, 5) + |> State.decrement_respondents_contacts(4, 7) + + assert [] = Queue.queued_contacts(0) + assert [] = Queue.active_contacts(0) + end + + test "silently fails for unknown respondent", %{state: state} do + state + |> State.queue_contact(new_contact(2), 5) + |> State.activate_next_in_queue() + |> elem(0) + |> State.decrement_respondents_contacts(6, 1) + + assert [] = Queue.queued_contacts(0) + assert [%{respondent_id: 2, contacts: 5}] = Queue.active_contacts(0) + end + end + + describe ".reenqueue_contact" do + @tag :time_mock + test "deactivates the contact and puts it back into queue", %{state: state} do + now = DateTime.utc_now() |> DateTime.truncate(:second) + mock_time(now) + + {new_state, _} = + state + |> State.queue_contact(new_contact(2), 1) + |> State.activate_next_in_queue() + + later = DateTime.add(now, 600, :second) + mock_time(later) + + State.reenqueue_contact(new_state, 2) + + assert [ + %{ + respondent_id: 2, + contacts: nil, + last_contact: nil, + channel_state: nil, + queued_at: later + } + ] = Queue.queued_contacts(0) + + assert [] = Queue.active_contacts(0) + end + end - assert new_state == state + defp new_contact(respondent_id, disposition \\ :queued, not_before \\ nil) do + {%{id: respondent_id, disposition: disposition}, "secret", not_before, nil} end end diff --git a/test/ask/runtime/channel_broker_test.exs b/test/ask/runtime/channel_broker_test.exs index 11a941b05..5880e6778 100644 --- a/test/ask/runtime/channel_broker_test.exs +++ b/test/ask/runtime/channel_broker_test.exs @@ -80,10 +80,10 @@ defmodule Ask.Runtime.ChannelBrokerTest do |> activate_respondent("ivr", Enum.at(respondents, 4), not_before, not_after) assert [ - Enum.at(respondents, 0).id, - Enum.at(respondents, 3).id, - Enum.at(respondents, 4).id, - ] == Enum.map(state.active_contacts, fn {r_id, _} -> r_id end) + Enum.at(respondents, 0).id, + Enum.at(respondents, 3).id, + Enum.at(respondents, 4).id + ] == active_respondent_ids(state) assert_not_called(Ask.Runtime.Survey.contact_attempt_expired(%{id: Enum.at(respondents, 0).id})) assert_called_exactly(Ask.Runtime.Survey.contact_attempt_expired(%{id: Enum.at(respondents, 1).id}), 1) @@ -112,31 +112,38 @@ defmodule Ask.Runtime.ChannelBrokerTest do |> activate_respondent("ivr", Enum.at(respondents, 3), the_future, not_after) |> activate_respondent("ivr", Enum.at(respondents, 4), not_before, not_after) + # activated soon to be contacted respondents: assert [ - Enum.at(respondents, 0).id, - Enum.at(respondents, 2).id, - Enum.at(respondents, 4).id, - ] == Enum.map(state.active_contacts, fn {r_id, _} -> r_id end) + Enum.at(respondents, 0).id, + Enum.at(respondents, 2).id, + Enum.at(respondents, 4).id + ] == active_respondent_ids(state) # skip to the future time_passes(minutes: 5) + # trigger more enqueues (by pushing more contacts): state = state - |> activate_respondent("ivr", Enum.at(respondents, 1), the_future, not_after) - |> activate_respondent("ivr", Enum.at(respondents, 3), the_future, not_after) + |> activate_respondent("ivr", Enum.at(respondents, 5), not_after, not_after) + |> activate_respondent("ivr", Enum.at(respondents, 7), not_after, not_after) assert [ - Enum.at(respondents, 0).id, - Enum.at(respondents, 1).id, - Enum.at(respondents, 2).id, - Enum.at(respondents, 3).id, - Enum.at(respondents, 4).id, - ] == Enum.map(state.active_contacts, fn {r_id, _} -> r_id end) + Enum.at(respondents, 0).id, + Enum.at(respondents, 1).id, + Enum.at(respondents, 2).id, + Enum.at(respondents, 3).id, + Enum.at(respondents, 4).id + ] == active_respondent_ids(state) end defp activate_respondent(state, "ivr", respondent, not_before, not_after) do - {_, state, _} = ChannelBroker.handle_cast({:setup, "ivr", respondent, "token", not_before, not_after}, state) + {_, state, _} = + ChannelBroker.handle_cast( + {:setup, "ivr", respondent, "token", not_before, not_after}, + state + ) + state end end @@ -195,7 +202,7 @@ defmodule Ask.Runtime.ChannelBrokerTest do Enum.at(respondents, 4) |> Respondent.changeset(%{state: :failed}) |> Repo.update!() # run: - {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage, "ivr"}, state) + {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage}, state) # it removed failed respondents (1, 3, 4) and activated queued ones (5, 6, 7): assert [ @@ -204,16 +211,17 @@ defmodule Ask.Runtime.ChannelBrokerTest do Enum.at(respondents, 5).id, Enum.at(respondents, 6).id, Enum.at(respondents, 7).id - ] == Map.keys(new_state.active_contacts) + ] == active_respondent_ids(new_state) end test "asks verboice for actual state of long idle contacts" do %{state: state, respondents: respondents} = start_survey("ivr") + initial_active = active_respondent_ids(state) # travel to the future (within allowed contact idle time): time_passes(minutes: trunc(state.config.gc_active_idle_minutes / 2)) - {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage, "ivr"}, state) - assert new_state.active_contacts == state.active_contacts + {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage}, state) + assert active_respondent_ids(new_state) == initial_active # travel to the future again (after allowed contact idle time): time_passes(minutes: state.config.gc_active_idle_minutes * 2) @@ -235,7 +243,7 @@ defmodule Ask.Runtime.ChannelBrokerTest do ] with_mock Ask.Runtime.Channel, mocks do - {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage, "ivr"}, state) + {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage}, state) # it asked verboice for call state (all calls are long idle in this test case): assert_called_exactly(Ask.Runtime.Channel.message_inactive?(:_, :_), @channel_capacity) @@ -247,17 +255,18 @@ defmodule Ask.Runtime.ChannelBrokerTest do Enum.at(respondents, 5).id, Enum.at(respondents, 6).id, Enum.at(respondents, 7).id - ] == Map.keys(new_state.active_contacts) + ] == active_respondent_ids(new_state) end end test "asks nuntium for actual state of long idle contacts" do %{state: state, respondents: respondents} = start_survey("sms") + initial_active = active_respondent_ids(state) # travel to the future (within allowed contact idle time): time_passes(minutes: trunc(state.config.gc_active_idle_minutes / 2)) - {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage, "sms"}, state) - assert new_state.active_contacts == state.active_contacts + {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage}, state) + assert active_respondent_ids(new_state) == initial_active # travel to the future again (after allowed contact idle time): time_passes(minutes: state.config.gc_active_idle_minutes * 2) @@ -280,7 +289,7 @@ defmodule Ask.Runtime.ChannelBrokerTest do ] with_mock Ask.Runtime.Channel, mocks do - {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage, "sms"}, state) + {:noreply, new_state, _} = ChannelBroker.handle_info({:collect_garbage}, state) # it asked nuntium for call state (all messages are long idle in this test case): assert_called_exactly(Ask.Runtime.Channel.message_inactive?(:_, :_), @channel_capacity) @@ -292,7 +301,7 @@ defmodule Ask.Runtime.ChannelBrokerTest do Enum.at(respondents, 5).id, Enum.at(respondents, 6).id, Enum.at(respondents, 7).id - ] == Map.keys(new_state.active_contacts) + ] == active_respondent_ids(new_state) end end end @@ -353,10 +362,9 @@ defmodule Ask.Runtime.ChannelBrokerTest do # build state state = %Ask.Runtime.ChannelBrokerState{ channel_id: channel.id, + channel_type: channel.type, runtime_channel: test_channel, capacity: @channel_capacity, - active_contacts: %{}, - contacts_queue: Ask.PQueue.new(), config: Config.channel_broker_config() } @@ -366,40 +374,39 @@ defmodule Ask.Runtime.ChannelBrokerTest do defp start_survey(channel_type) do %{state: state, respondents: respondents, channel: channel} = build_survey(channel_type) - # activate respondents (up to capacity): - active_contacts = - respondents - |> Enum.slice(0..4) - |> Enum.reduce(%{}, fn e, a -> - Map.put(a, e.id, %{ - contacts: 1, - last_contact: SystemTime.time().now, - channel_state: channel_state(channel_type, e) - }) + # queue respondents: + state = + Enum.reduce(respondents, state, fn respondent, state -> + contact = respondent_to_contact(channel_type, respondent) + state |> Ask.Runtime.ChannelBrokerState.queue_contact(contact, 1, :normal) end) - # queue the other respondents: - contacts_queue = + # force activate respondents (up to capacity): + state = respondents - |> Enum.slice(5..9) - |> Enum.reduce(Ask.PQueue.new(), fn e, a -> - not_before = SystemTime.time().now |> DateTime.add(-3600, :second) - not_after = SystemTime.time().now |> DateTime.add(3600, :second) - - case channel_type do - "ivr" -> Ask.PQueue.push(a, [1, {e, "secret", not_before, not_after}], :normal) - "sms" -> Ask.PQueue.push(a, [1, {e, "secret", []}], :normal) - end + |> Enum.slice(0..4) + |> Enum.reduce(state, fn respondent, state -> + state + |> Ask.Runtime.ChannelBrokerState.increment_respondents_contacts(respondent.id, 1) + |> Ask.Runtime.ChannelBrokerState.put_channel_state( + respondent.id, + channel_state(channel.type, respondent) + ) end) - state = - state - |> Map.put(:active_contacts, active_contacts) - |> Map.put(:contacts_queue, contacts_queue) - %{state: state, respondents: respondents, channel: channel} end + defp respondent_to_contact("ivr", respondent) do + not_before = SystemTime.time().now |> DateTime.add(-3600, :second) + not_after = SystemTime.time().now |> DateTime.add(3600, :second) + {respondent, "secret", not_before, not_after} + end + + defp respondent_to_contact("sms", respondent) do + {respondent, "secret", []} + end + defp channel_state("ivr", respondent) do %{"verboice_call_id" => respondent.id} end @@ -435,4 +442,9 @@ defmodule Ask.Runtime.ChannelBrokerTest do assert Repo.get(Respondent, id).state == state end) end + + defp active_respondent_ids(state) do + Ask.ChannelBrokerQueue.active_contacts(state.channel_id) + |> Enum.map(fn c -> c.respondent_id end) + end end diff --git a/test/ask/runtime/channel_status_server_test.exs b/test/ask/runtime/channel_status_server_test.exs index 98b31f606..a6068dcbf 100644 --- a/test/ask/runtime/channel_status_server_test.exs +++ b/test/ask/runtime/channel_status_server_test.exs @@ -115,6 +115,10 @@ defmodule Ask.Runtime.ChannelStatusServerTest do setup_surveys_with_channels([survey], [channel]) ChannelStatusServer.poll(pid) assert_receive [:email, ^email] + + # FIXME: flaky test: need time between polls for async task to update the channel status server state + Process.sleep(100) + ChannelStatusServer.poll(pid) refute_receive [:email, ^email] end @@ -133,6 +137,10 @@ defmodule Ask.Runtime.ChannelStatusServerTest do setup_surveys_with_channels([survey], [channel]) ChannelStatusServer.poll(pid) assert_receive [:email, ^email] + + # FIXME: flaky test: need time between polls for async task to update the channel status server state + Process.sleep(100) + ChannelStatusServer.poll(pid) refute_receive [:email, ^email] end diff --git a/test/ask/runtime/session_test.exs b/test/ask/runtime/session_test.exs index b9a6b0852..b1cf8bbbc 100644 --- a/test/ask/runtime/session_test.exs +++ b/test/ask/runtime/session_test.exs @@ -112,10 +112,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent, ^token] assert session_respondent.id == respondent.id + respondent_id = respondent.id + assert_receive [ :ask, ^test_channel, - ^respondent, + %Respondent{id: ^respondent_id}, ^token, ReplyHelper.simple("Contact", message), _channel_id @@ -235,10 +237,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent, ^token] assert session_respondent.id == respondent.id + respondent_id = respondent.id + assert_receive [ :ask, ^test_channel, - ^respondent, + %Respondent{id: ^respondent_id}, ^token, ReplyHelper.simple("Contact", message), _channel_id @@ -361,7 +365,8 @@ defmodule Ask.Runtime.SessionTest do assert 120 = timeout assert token != nil - assert_receive [:setup, ^test_channel, ^respondent, ^token] + respondent_id = respondent.id + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] refute_receive _ end @@ -382,10 +387,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -582,10 +589,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -648,10 +657,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent_received.id == respondent.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -716,10 +727,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -805,10 +818,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -839,10 +854,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -948,10 +965,12 @@ defmodule Ask.Runtime.SessionTest do assert_receive [:setup, ^test_channel, respondent_received, ^token] assert respondent.id == respondent_received.id + respondent_received_id = respondent_received.id + assert_receive [ :ask, ^test_channel, - ^respondent_received, + %Respondent{id: ^respondent_received_id}, ^token, ReplyHelper.simple("Do you smoke?", "Do you smoke? Reply 1 for YES, 2 for NO"), _channel_id @@ -1200,35 +1219,37 @@ defmodule Ask.Runtime.SessionTest do test "doesn't consume a retry if it has an expired message" do quiz = insert(:questionnaire, steps: @dummy_steps) respondent = insert(:respondent) + respondent_id = respondent.id test_channel = TestChannel.new(:expired) channel = insert(:channel, type: "ivr", settings: test_channel |> TestChannel.settings()) assert {:ok, session = %Session{token: token, respondent: respondent}, _, 5} = Session.start(quiz, respondent, channel, "ivr", Schedule.always(), [5]) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] assert {:ok, %Session{token: token, respondent: respondent}, %Reply{}, 5} = Session.timeout(session) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] end test "doesn't fail if it has an expired message" do quiz = insert(:questionnaire, steps: @dummy_steps) respondent = insert(:respondent) + respondent_id = respondent.id test_channel = TestChannel.new(:expired) channel = insert(:channel, settings: test_channel |> TestChannel.settings(), type: "ivr") assert {:ok, session = %Session{token: token, respondent: respondent}, _, 120} = Session.start(quiz, respondent, channel, "ivr", Schedule.always()) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] assert {:ok, %Session{token: token, respondent: respondent}, %Reply{}, 120} = Session.timeout(session) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] end test "doesn't switch to fallback if it has an expired message", %{ @@ -1258,12 +1279,14 @@ defmodule Ask.Runtime.SessionTest do fallback_retries ) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + respondent_id = respondent.id + + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] assert {:ok, %Session{token: token, respondent: respondent}, %Reply{}, 120} = Session.timeout(session) - assert_receive [:setup, ^test_channel, ^respondent, ^token] + assert_receive [:setup, ^test_channel, %Respondent{id: ^respondent_id}, ^token] end test "uses retry configuration", %{quiz: quiz, respondent: respondent, channel: channel} do diff --git a/test/ask_web/controllers/survey_controller_test.exs b/test/ask_web/controllers/survey_controller_test.exs index c628eca90..56a1ef9fa 100644 --- a/test/ask_web/controllers/survey_controller_test.exs +++ b/test/ask_web/controllers/survey_controller_test.exs @@ -345,6 +345,9 @@ defmodule AskWeb.SurveyControllerTest do ChannelStatusServer.poll(pid) + # FIXME: flaky test: need time for async task to update the channel status server state + Process.sleep(100) + conn = get(conn, project_survey_path(conn, :index, project.id)) [survey_1, survey_2, survey_3] = json_response(conn, 200)["data"] @@ -808,6 +811,9 @@ defmodule AskWeb.SurveyControllerTest do ChannelStatusServer.poll(pid) + # FIXME: flaky test: need time for async task to update the channel status server state + Process.sleep(100) + conn = get(conn, project_survey_path(conn, :show, project, survey)) data = json_response(conn, 200)["data"] @@ -2884,6 +2890,8 @@ defmodule AskWeb.SurveyControllerTest do post(conn, project_survey_survey_path(conn, :stop, survey.project, survey)) + # FIXME: flaky test: sometimes Repo.all will only return 1 row + Process.sleep(100) logs = Repo.all(ActivityLog) assert_survey_log(%{ diff --git a/test/support/channel_case.ex b/test/support/channel_case.ex index a5729818d..c486f2459 100644 --- a/test/support/channel_case.ex +++ b/test/support/channel_case.ex @@ -30,13 +30,8 @@ defmodule AskWeb.ChannelCase do end end - setup tags do - :ok = Ecto.Adapters.SQL.Sandbox.checkout(Ask.Repo) - - unless tags[:async] do - Ecto.Adapters.SQL.Sandbox.mode(Ask.Repo, {:shared, self()}) - end - + setup do + on_exit(&Ask.DatabaseCleaner.delete/0) :ok end end diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index 639af2057..f3073a393 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -33,13 +33,8 @@ defmodule AskWeb.ConnCase do end end - setup tags do - :ok = Ecto.Adapters.SQL.Sandbox.checkout(Ask.Repo) - - unless tags[:async] do - Ecto.Adapters.SQL.Sandbox.mode(Ask.Repo, {:shared, self()}) - end - + setup do + on_exit(&Ask.DatabaseCleaner.delete/0) {:ok, conn: Phoenix.ConnTest.build_conn()} end end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index d71c888fb..ef430fd6b 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -26,13 +26,8 @@ defmodule Ask.DataCase do end end - setup tags do - :ok = Ecto.Adapters.SQL.Sandbox.checkout(Ask.Repo) - - unless tags[:async] do - Ecto.Adapters.SQL.Sandbox.mode(Ask.Repo, {:shared, self()}) - end - + setup do + on_exit(&Ask.DatabaseCleaner.delete/0) :ok end diff --git a/test/support/database_cleaner.ex b/test/support/database_cleaner.ex new file mode 100644 index 000000000..f5f47681c --- /dev/null +++ b/test/support/database_cleaner.ex @@ -0,0 +1,87 @@ +defmodule Ask.DatabaseCleaner do + use GenServer + + @server_ref {:global, __MODULE__} + + def start_link do + GenServer.start_link(__MODULE__, [], name: @server_ref) + end + + def init([]) do + {:ok, %{}} + end + + def truncate() do + GenServer.call(@server_ref, :truncate) + end + + def delete() do + GenServer.call(@server_ref, :delete) + end + + def handle_call(:truncate, _, state) do + {new_state, tables} = get_tables(state) + tables |> Enum.filter(&changed(&1)) |> truncate() + {:reply, :ok, new_state} + end + + def handle_call(:delete, _, state) do + {new_state, tables} = get_tables(state) + tables |> Enum.filter(&changed(&1)) |> delete() + {:reply, :ok, new_state} + end + + defp changed(table) do + case sql_query("SELECT EXISTS (SELECT 1 FROM #{table} LIMIT 1)") do + {:ok, %{rows: [[0]]}} -> false + {:ok, %{rows: [[_]]}} -> true + end + end + + defp truncate([]) do + end + + defp truncate(tables) do + disable_integrity(fn -> + tables |> Enum.each(fn table -> sql_query("TRUNCATE #{table}") end) + end) + end + + defp delete([]) do + end + + defp delete(tables) do + disable_integrity(fn -> + tables |> Enum.each(fn table -> sql_query("DELETE FROM #{table}") end) + end) + end + + defp disable_integrity(callback) do + Ask.Repo.checkout(fn -> + sql_query("SET foreign_key_checks = 0") + callback.() + sql_query("SET foreign_key_checks = 1") + end) + end + + defp get_tables(state) do + case state do + %{tables: tables} -> + {state, tables} + + _ -> + {:ok, %{rows: tables}} = sql_query("SHOW tables") + + tables = + tables + |> List.flatten() + |> Enum.reject(fn table -> table == "schema_migrations" end) + + {Map.put(state, :tables, tables), tables} + end + end + + defp sql_query(sql) do + Ask.Repo |> Ecto.Adapters.SQL.query(sql, [], telemetry: false, log: false, query_type: :text) + end +end diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 742bc512c..8c99031cf 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -2,7 +2,15 @@ defmodule Ask.TestHelpers do defmacro __using__(_) do quote do use Ask.DummySteps - alias Ask.Runtime.{SurveyBroker, Flow, RespondentGroupAction, ChannelBrokerSupervisor, ChannelBrokerAgent} + + alias Ask.Runtime.{ + SurveyBroker, + Flow, + RespondentGroupAction, + ChannelBrokerSupervisor, + ChannelBrokerAgent + } + alias Ask.{PanelSurvey, Repo, Respondent, Survey, TestChannel} @foo_string "foo" diff --git a/test/test_helper.exs b/test/test_helper.exs index 7a1ac10c2..cf3076333 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,7 +1,5 @@ ExUnit.start(exclude: [:skip]) -Ecto.Adapters.SQL.Sandbox.mode(Ask.Repo, :manual) - {:ok, _} = Application.ensure_all_started(:ex_machina) {:ok, _} = Application.ensure_all_started(:bypass) {:ok, _} = Application.ensure_all_started(:mox)