Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep ChannelBrokerState in the database #2297

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ config :phoenix, :stacktrace_depth, 20

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: "root",
password: "",
database: "ask_dev",
Expand Down
2 changes: 1 addition & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ config :ask, AskWeb.Endpoint, secret_key_base: System.get_env("SECRET_KEY_BASE")

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: System.get_env("DATABASE_USER") || "root",
password: System.get_env("DATABASE_PASS") || "",
database: System.get_env("DATABASE_NAME") || "ask",
Expand Down
4 changes: 2 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ config :logger, level: :warn

# Configure your database
config :ask, Ask.Repo,
adapter: Ecto.Adapters.MySQL,
adapter: Ecto.Adapters.MyXQL,
username: "root",
password: "",
database: "ask_test",
hostname: System.get_env("DATABASE_HOST") || "localhost",
pool: Ecto.Adapters.SQL.Sandbox
pool_size: 10

config :ask, Ask.Runtime.SurveyBroker, batch_size: 10

Expand Down
36 changes: 22 additions & 14 deletions lib/ask.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ defmodule Ask do
]

children =
if Mix.env() != :test && !IEx.started?() do
[
worker(Ask.OAuthTokenServer, []),
worker(Ask.Runtime.SurveyLogger, []),
worker(Ask.Runtime.SurveyBroker, []),
worker(Ask.FloipPusher, []),
worker(Ask.JsonSchema, []),
worker(Ask.Runtime.ChannelStatusServer, []),
worker(Ask.Config, []),
worker(Ask.Runtime.QuestionnaireSimulatorStore, [])
| children
]
else
children
cond do
Mix.env() == :test ->
[
worker(Ask.DatabaseCleaner, [])
| children
]

!IEx.started?() ->
[
worker(Ask.OAuthTokenServer, []),
worker(Ask.Runtime.SurveyLogger, []),
worker(Ask.Runtime.SurveyBroker, []),
worker(Ask.FloipPusher, []),
worker(Ask.JsonSchema, []),
worker(Ask.Runtime.ChannelStatusServer, []),
worker(Ask.Config, []),
worker(Ask.Runtime.QuestionnaireSimulatorStore, [])
| children
]

true ->
children
end

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
Expand Down
105 changes: 105 additions & 0 deletions lib/ask/channel_broker_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Ask.ChannelBrokerQueue do
use Ask.Model

alias Ask.ChannelBrokerQueue, as: Queue
alias Ask.Repo

@primary_key false

schema "channel_broker_queue" do
belongs_to :channel, Ask.Channel, primary_key: true
belongs_to :respondent, Ask.Respondent, primary_key: true

# queued (pending):
field :queued_at, :utc_datetime
field :priority, Ecto.Enum, values: [low: 2, normal: 1, high: 0]
# number of outgoing messages (ivr=1, sms=1+)
field :size, :integer
field :token, :string
field :not_before, :utc_datetime
field :not_after, :utc_datetime
field :reply, Ask.Ecto.Type.ErlangTerm

# sent (active):
field :last_contact, :utc_datetime
# number of active messages left (pending confirmation)
field :contacts, :integer
field :channel_state, Ask.Ecto.Type.ErlangTerm
end

def upsert!(params) do
upsert_params =
params
|> Map.drop([:channel_id, :respondent_id])
|> Map.put_new(:last_contact, nil)
|> Map.put_new(:contacts, nil)
|> Map.put_new(:channel_state, nil)
|> Map.to_list()

%Queue{}
|> changeset(params)
|> Repo.insert!(on_conflict: [set: upsert_params])
end

def changeset(struct, params \\ %{}) do
struct
|> cast(params, [
:channel_id,
:respondent_id,
:queued_at,
:priority,
:size,
:token,
:not_before,
:not_after,
:reply,
:last_contact,
:contacts,
:channel_state
])
|> validate_required([
:channel_id,
:respondent_id,
:queued_at,
:priority,
:size,
:token
])

# |> assoc_constraint(:channel, :respondent)
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
end

def activable_contacts?(channel_id) do
# add leeway to activate contacts to be scheduled soon
not_before = Ask.SystemTime.time().now |> DateTime.add(60, :second)

Repo.exists?(
from q in Queue,
where:
q.channel_id == ^channel_id and is_nil(q.last_contact) and
(is_nil(q.not_before) or q.not_before <= ^not_before)
)
end

def count_active_contacts(channel_id) do
Repo.one(
from q in Queue,
select: type(coalesce(sum(coalesce(q.contacts, 0)), 0), :integer),
where: q.channel_id == ^channel_id and not is_nil(q.last_contact)
)
end

def queued_contacts(channel_id) do
Repo.all(
from q in Ask.ChannelBrokerQueue,
where: q.channel_id == ^channel_id and is_nil(q.last_contact)
)
end

def active_contacts(channel_id) do
Repo.all(
from q in Ask.ChannelBrokerQueue,
where: q.channel_id == ^channel_id and not is_nil(q.last_contact)
)
end
end
20 changes: 20 additions & 0 deletions lib/ask/ecto_types/erlang_term.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Ask.Ecto.Type.ErlangTerm do
@moduledoc """
A custom Ecto type for handling the serialization of arbitrary
data types stored as binary data in the database. Requires the
underlying DB field to be a binary.
"""
use Ecto.Type
def type, do: :binary

def cast(:any, term), do: {:ok, term}
def cast(term), do: {:ok, term}

def load(binary) when is_binary(binary) do
{:ok, :erlang.binary_to_term(binary)}
end

def dump(term) do
{:ok, :erlang.term_to_binary(term)}
end
end
93 changes: 0 additions & 93 deletions lib/ask/pqueue.ex

This file was deleted.

Loading