diff --git a/lib/pigeon.ex b/lib/pigeon.ex index fadc940c..6f5b9cd7 100644 --- a/lib/pigeon.ex +++ b/lib/pigeon.ex @@ -9,11 +9,12 @@ defmodule Pigeon do def start(_type, _args) do opts = [strategy: :one_for_one, name: :pigeon] + Application.put_env(:chatterbox, :ssl_options, []) Supervisor.start_link(workers(), opts) end defp workers do - adm_worker() ++ apns_workers() + adm_worker() ++ apns_workers() ++ gcm_workers() end def adm_worker do @@ -38,4 +39,13 @@ defmodule Pigeon do true -> [] end end + + defp gcm_workers do + name = Pigeon.GCMWorker.default_name() + cond do + config = Application.get_env(:pigeon, :gcm) -> + [worker(Pigeon.GCMWorker, [name, config], id: name)] + true -> [] + end + end end diff --git a/lib/pigeon/apns.ex b/lib/pigeon/apns.ex index 6089b8b8..2520bb4d 100644 --- a/lib/pigeon/apns.ex +++ b/lib/pigeon/apns.ex @@ -12,11 +12,12 @@ defmodule Pigeon.APNS do def push(notification, opts \\ []) def push(notification, opts) when is_list(notification) do + timeout = opts[:timeout] || @default_timeout case opts[:on_response] do nil -> tasks = for n <- notification, do: Task.async(fn -> do_sync_push(n, opts) end) tasks - |> Task.yield_many(@default_timeout + 500) + |> Task.yield_many(timeout + 500) |> Enum.map(fn {task, response} -> response || Task.shutdown(task, :brutal_kill) end) |> group_responses on_response -> push(notification, on_response, opts) @@ -31,15 +32,17 @@ defmodule Pigeon.APNS do defp do_sync_push(notification, opts) do pid = self() - on_response = fn(x) -> send pid, {:ok, x} end + ref = :erlang.make_ref + on_response = fn(x) -> send pid, {ref, x} end + timeout = opts[:timeout] || @default_timeout worker_name = opts[:name] || Config.default_name GenServer.cast(worker_name, {:push, :apns, notification, on_response}) receive do - {:ok, x} -> x + {^ref, x} -> x after - @default_timeout -> {:error, :timeout, notification} + timeout -> {:error, :timeout, notification} end end diff --git a/lib/pigeon/apns_config.ex b/lib/pigeon/apns_config.ex index 589a9fe7..1f32c777 100644 --- a/lib/pigeon/apns_config.ex +++ b/lib/pigeon/apns_config.ex @@ -3,11 +3,18 @@ defmodule Pigeon.APNS.Config do Validates configuration settings that initialize APNSWorkers. """ - def default_name, do: :default + def default_name, do: :apns_default def config(name) do config = Application.get_env(:pigeon, :apns)[name] + config(name, config) + end + + def config(name, config) do %{ + production_endpoint: config[:production_endpoint] || "api.push.apple.com", + development_endpoint: config[:development_endpoint] || "api.development.push.apple.com", + port: config[:port] || 443, name: name, mode: config[:mode], cert: cert(config[:cert]), diff --git a/lib/pigeon/apns_worker.ex b/lib/pigeon/apns_worker.ex index 0e7bc221..b930f88f 100644 --- a/lib/pigeon/apns_worker.ex +++ b/lib/pigeon/apns_worker.ex @@ -2,69 +2,28 @@ defmodule Pigeon.APNSWorker do @moduledoc """ Handles all APNS request and response parsing over an HTTP2 connection. """ - use GenServer + use Pigeon.GenericH2Worker, ping_interval: 600_000 require Logger - @ping_period 600_000 # 10 minutes - - defp apns_production_api_uri, do: "api.push.apple.com" - defp apns_development_api_uri, do: "api.development.push.apple.com" - - def push_uri(mode) do - case mode do - :dev -> apns_development_api_uri() - :prod -> apns_production_api_uri() + def host(config) do + case config[:mode] do + :dev -> config[:development_endpoint] + :prod -> config[:production_endpoint] end end - def start_link(config) do - GenServer.start_link(__MODULE__, {:ok, config}, name: config[:name]) - end - - def stop, do: :gen_server.cast(self(), :stop) - - def init({:ok, config}), do: initialize_worker(config) - - def initialize_worker(config) do - mode = config[:mode] - case connect_socket(config, 0) do - {:ok, socket} -> - Process.send_after(self(), :ping, @ping_period) - {:ok, %{ - apns_socket: socket, - mode: mode, - config: config, - stream_id: 1, - queue: %{} - }} - {:closed, _socket} -> - Logger.error """ - Socket closed unexpectedly. - """ - {:stop, {:error, :bad_connection}} - {:error, :timeout} -> - Logger.error """ - Failed to establish SSL connection. Is the certificate signed for :#{mode} mode? - """ - {:stop, {:error, :timeout}} - {:error, :invalid_config} -> - Logger.error """ - Invalid configuration. - """ - {:stop, {:error, :invalid_config}} + def port(config) do + case config[:use_2197] do + true -> 2197 + _ -> config[:port] end end - def connect_socket(_config, 3), do: {:error, :timeout} - def connect_socket(config, tries) do - uri = config[:mode] |> push_uri |> to_char_list - case connect_socket_options(config) do - {:ok, options} -> do_connect_socket(config, uri, options, tries) - error -> error - end + def encode_notification(notification) do + Pigeon.Notification.json_payload(notification.payload) end - def connect_socket_options(config) do + def socket_options(config) do cert = get_opt(config, :cert, :certfile) key = get_opt(config, :key, :keyfile) cond do @@ -72,25 +31,13 @@ defmodule Pigeon.APNSWorker do options = [cert, key, - {:password, ''}, - {:packet, 0}, - {:reuseaddr, true}, - {:active, true}, - :binary] - |> optional_add_2197(config) + {:password, ''}] {:ok, options} true -> {:error, :invalid_config} end end - defp optional_add_2197(options, config) do - case config[:use_2197] do - true -> options ++ [{:port, 2197}] - _ -> options - end - end - defp get_opt(config, key_1, key_2) do cond do config[key_1] -> {key_1, config[key_1]} @@ -99,44 +46,11 @@ defmodule Pigeon.APNSWorker do end end - defp do_connect_socket(config, uri, options, tries) do - case Kadabra.open(uri, :https, options) do - {:ok, socket} -> {:ok, socket} - {:error, reason} -> - Logger.error(inspect(reason)) - connect_socket(config, tries + 1) - end - end - - def handle_cast(:stop, state), do: { :noreply, state } - - def handle_cast({:push, :apns, notification}, state) do - send_push(state, notification, nil) - end - - def handle_cast({:push, :apns, notification, on_response}, state) do - send_push(state, notification, on_response) - end - - def handle_cast(msg, state) do - Logger.debug "Recv: #{inspect(msg)}" - {:noreply, state} - end - - def send_push(state, notification, on_response) do - %{apns_socket: socket, stream_id: stream_id, queue: queue} = state - json = Pigeon.Notification.json_payload(notification.payload) - req_headers = [ - {":method", "POST"}, - {":path", "/3/device/#{notification.device_token}"}, - {"content-length", "#{byte_size(json)}"}] - |> put_apns_id(notification) - |> put_apns_topic(notification) - - Kadabra.request(socket, req_headers, json) - new_q = Map.put(queue, "#{stream_id}", {notification, on_response}) - new_stream_id = stream_id + 2 - { :noreply, %{state | stream_id: new_stream_id, queue: new_q } } + def req_headers(_config, notification) do + [{"content-type", "application/json"}, + {"accept", "application/json"}] + |> put_apns_id(notification) + |> put_apns_topic(notification) end defp put_apns_id(headers, notification) do @@ -153,16 +67,24 @@ defmodule Pigeon.APNSWorker do end end - defp parse_error(data) do - {:ok, response} = Poison.decode(data) + def req_path(notification) do + "/3/device/#{notification.device_token}" + end + + defp parse_error(_notification, _headers, body) do + {:ok, response} = Poison.decode(body) response["reason"] |> Macro.underscore |> String.to_existing_atom end - defp log_error(reason, notification) do - Logger.error("#{reason}: #{error_msg(reason)}\n#{inspect(notification)}") + defp parse_response(notification, headers, _body) do + case List.keyfind(headers, "apns-id", 1) do + nil -> notification + apns_id -> + %{notification | id: apns_id} + end end - def error_msg(error) do + def error_msg(_code, error) do case error do # 400 :bad_collapse_id -> @@ -216,7 +138,7 @@ defmodule Pigeon.APNSWorker do :missing_provider_token -> """ No provider certificate was used to connect to APNs and Authorization header was missing - or no provider token was specified." + or no provider token was specified." """ # 404 @@ -257,51 +179,4 @@ defmodule Pigeon.APNSWorker do "" end end - - def handle_info(:ping, state) do - Kadabra.ping(state.apns_socket) - Process.send_after(self(), :ping, @ping_period) - - { :noreply, state } - end - - def handle_info({:end_stream, %Kadabra.Stream{id: stream_id, headers: headers, body: body}}, - %{apns_socket: _socket, queue: queue} = state) do - - {notification, on_response} = queue["#{stream_id}"] - - case get_status(headers) do - "200" -> - notification = %{notification | id: get_apns_id(headers)} - unless on_response == nil do on_response.({:ok, notification}) end - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - nil -> - {:noreply, state} - _error -> - reason = parse_error(body) - log_error(reason, notification) - unless on_response == nil do on_response.({:error, reason, notification}) end - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - end - end - - def handle_info({:ping, _from}, state), do: {:noreply, state} - - def handle_info({:ok, _from}, state), do: {:noreply, state} - - defp get_status(headers) do - case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do - {":status", status} -> status - nil -> nil - end - end - - defp get_apns_id(headers) do - case Enum.find(headers, fn({key, _val}) -> key == "apns-id" end) do - {"apns-id", id} -> id - nil -> nil - end - end end diff --git a/lib/pigeon/gcm.ex b/lib/pigeon/gcm.ex index 87c7e659..8cbc031a 100644 --- a/lib/pigeon/gcm.ex +++ b/lib/pigeon/gcm.ex @@ -3,147 +3,120 @@ defmodule Pigeon.GCM do Handles all Google Cloud Messaging (GCM) request and response functionality. """ require Logger - - # defp gcm_uri, do: 'https://gcm-http.googleapis.com/gcm/send' - defp gcm_uri, do: 'fcm.googleapis.com/fcm/send' - - defp gcm_headers(key) do - [{ "Authorization", "key=#{key}" }, - { "Content-Type", "application/json" }, - { "Accept", "application/json" }] + import Supervisor.Spec + + alias Pigeon.GCM.NotificationResponse + + @default_timeout 5_000 + + def push(notification, opts \\ []) + def push(notification, opts) when is_list(notification) do + timeout = opts[:timeout] || @default_timeout + case opts[:on_response] do + nil -> + ref = make_ref + pid = self() + for n <- notification do + on_response = fn(x) -> send pid, {ref, x} end + send_push(n, on_response, opts) + end + List.foldl(notification, %{}, fn(_n, acc) -> + receive do + {^ref, %NotificationResponse{message_id: id} = response} -> + if Map.has_key?(acc, id) do + %{acc | id => merge(response, acc[:message_id])} + else + Map.merge(%{id => response}, acc) + end + after timeout -> + acc + end + end) + on_response -> send_push(notification, on_response, opts) + end end - defp default_gcm_key, do: Application.get_env(:pigeon, :gcm)[:key] - - @doc """ - Sends a push over GCM - """ - @spec push(Pigeon.GCM.Notification) :: none - def push(notification) do - do_push(notification, %{gcm_key: default_gcm_key()}) + def push(notification, opts) do + case opts[:on_response] do + nil -> do_sync_push(notification, opts) + on_response -> send_push(notification, on_response, opts) + end end - @doc """ - Sends a push over GCM and executes function on success/failure. - """ - @spec push(Pigeon.GCM.Notification, (() -> none)) :: none - def push(notification, on_response) when is_function(on_response) do - do_push(notification, %{gcm_key: default_gcm_key()}, on_response) + defp do_sync_push(notification, opts) do + ref = :erlang.make_ref + pid = self() + timeout = opts[:timeout] || @default_timeout + on_response = fn(x) -> send pid, {ref, x} end + send_push(notification, on_response, opts) + receive do + {^ref, x} -> x + after + timeout -> {:error, :timeout, notification} + end end - def push(notification, config, on_response \\ nil) do - do_push(notification, config, on_response) + def encode_requests(%{registration_id: regid} = notification) when is_binary(regid) do + encode_requests(%{notification | registration_id: [regid]}) end - - defp do_push(notification, %{gcm_key: gcm_key}, on_response \\ nil) do - requests = - notification.registration_id - |> chunk_registration_ids - |> encode_requests(notification.payload) - - response = - case on_response do - nil -> - fn({_reg_ids, payload}) -> - HTTPoison.post(gcm_uri(), payload, gcm_headers(gcm_key)) - end - _ -> - fn({reg_ids, payload}) -> - {:ok, %HTTPoison.Response{status_code: status, body: body}} = - HTTPoison.post(gcm_uri(), payload, gcm_headers(gcm_key)) - - notification = %{ notification | registration_id: reg_ids } - process_response(status, body, notification, on_response) - end - end - for r <- requests, do: Task.async(fn -> response.(r) end) - :ok + def encode_requests(%{registration_id: regid} = notification) when length(regid) < 1001 do + res = + regid + |> recipient_attr() + |> Map.merge(notification.payload) + |> Poison.encode! + formatted_regid = regid + |> List.wrap + + [{formatted_regid, res}] end - def chunk_registration_ids(reg_ids) when is_binary(reg_ids), do: [[reg_ids]] - def chunk_registration_ids(reg_ids), do: Enum.chunk(reg_ids, 1000, 1000, []) - - def encode_requests([[reg_id]|_rest], payload) do - to_send = Map.merge(%{"to" => reg_id}, payload) - [{reg_id, Poison.encode!(to_send)}] - end - def encode_requests(registration_ids, payload) do - Enum.map(registration_ids, fn(x) -> encode_payload(x, payload) end) + def encode_requests(notification) do + notification.registration_id + |> Enum.chunk(1000, 1000, []) + |> Enum.map(& encode_requests(%{notification | registration_id: &1})) + |> List.flatten end - defp encode_payload(x, payload) do - encoded = - %{"registration_ids" => x} - |> Map.merge(payload) - |> Poison.encode! - {x, encoded} - end + defp recipient_attr([regid]), do: %{"to" => regid} + defp recipient_attr(regid) when is_list(regid), do: %{"registration_ids" => regid} - def process_response(status, body, notification, on_response) do - case status do - 200 -> - handle_200_status(body, notification, on_response) - 400 -> - handle_error_status_code(:invalid_json, notification, on_response) - 401 -> - handle_error_status_code(:authentication_error, notification, on_response) - 500 -> - handle_error_status_code(:internal_server_error, notification, on_response) - _ -> - handle_error_status_code(:unknown_error, notification, on_response) - end + @doc """ + Sends a push over GCM. + """ + def send_push(notification, on_response, opts) do + worker = opts[:name] || Pigeon.GCMWorker.default_name() + notification + |> encode_requests() + |> Enum.map(& GenServer.cast(worker, generate_envelope(&1, on_response, opts))) end - def handle_error_status_code(reason, notification, on_response), - do: on_response.({:error, reason, notification}) - - def handle_200_status(body, %{registration_id: reg_id} = n, on_response) when is_list(reg_id) do - {:ok, json} = Poison.decode(body) - results = Enum.zip(n.registration_id, json["results"]) - for result <- results, do: process_callback(result, n, on_response) - end - def handle_200_status(body, %{registration_id: _reg_id} = notification, on_response) do - {:ok, json} = Poison.decode(body) - results = Enum.zip([notification.registration_id], json["results"]) - for result <- results, do: process_callback(result, notification, on_response) + def start_connection(name) do + config = %{ + name: name, + gcm_key: Application.get_env(:pigeon, :gcm)[:key] + } + Supervisor.start_child(:pigeon, worker(Pigeon.GCMWorker, [config], id: name)) end - def process_callback({reg_id, response}, notification, on_response) do - case parse_result(response) do - {:ok, message_id} -> - notification = %{ notification | registration_id: reg_id, message_id: message_id } - on_response.({:ok, notification}) - - {:ok, message_id, registration_id} -> - notification = - %{ notification | registration_id: reg_id, - message_id: message_id, - updated_registration_id: registration_id } - on_response.({:ok, notification}) - - {:error, reason} -> - notification = %{ notification | registration_id: reg_id } - on_response.({:error, reason, notification}) - end + def stop_connection(name) do + Supervisor.terminate_child(:pigeon, name) + Supervisor.delete_child(:pigeon, name) end - def parse_result(result) do - error = result["error"] - if is_nil(error) do - parse_success(result) - else - error_atom = error |> Macro.underscore |> String.to_atom - {:error, error_atom} - end + def generate_envelope(payload, on_response, opts) do + {:push, :gcm, payload, on_response, Map.new(opts)} end - def parse_success(result) do - message_id = result["message_id"] - registration_id = result["registration_id"] - if is_nil(registration_id) do - {:ok, message_id} - else - {:ok, message_id, registration_id} - end + def merge(response_1, response_2) do + Map.merge(response_1, response_2, fn(key, value_1, value_2) -> + cond do + key == :__struct__ -> value_1 + is_map(value_1) -> merge(value_1, value_2) + is_nil(value_1) -> value_2 + is_nil(value_2) -> value_1 + true -> value_1 ++ value_2 + end + end) end end diff --git a/lib/pigeon/gcm_worker.ex b/lib/pigeon/gcm_worker.ex new file mode 100644 index 00000000..9ec83841 --- /dev/null +++ b/lib/pigeon/gcm_worker.ex @@ -0,0 +1,110 @@ +defmodule Pigeon.GCMWorker do + @moduledoc """ + Handles all FCM request and response parsing over an HTTP2 connection. + """ + use Pigeon.GenericH2Worker, ping_interval: 60_000 + alias Pigeon.GCM.NotificationResponse + require Logger + + def default_name, do: :gcm_default + + def host(config) do + config[:endpoint] || "fcm.googleapis.com" + end + + def port(config) do + config[:port] || 443 + end + + def socket_options(_config) do + {:ok, []} + end + + def encode_notification({_registration_ids, notification}) do + notification + end + + def req_headers(config, _notification) do + [{"authorization", "key=#{config[:key]}"}, + {"content-type", "application/json"}, + {"accept", "application/json"}] + end + + def req_path(_notification) do + "/fcm/send" + end + + defp parse_error(_notification, _headers, body) do + {:ok, response} = Poison.decode(body) + response["reason"] |> Macro.underscore |> String.to_existing_atom + end + + defp parse_response({registration_ids, payload}, _headers, body) do + result = Poison.decode! body + parse_result(registration_ids, result) + end + + def parse_result(ids, %{"results" => results}) do + parse_result1(ids, results, %NotificationResponse{}) + end + + def parse_result1([], [], result) do + result + end + + def parse_result1(regid, results, result) when is_binary(regid) do + parse_result1([regid], results, result) + end + + def parse_result1([regid | reg_res], + [%{"message_id" => id, "registration_id" => new_regid} | rest_results], + %NotificationResponse{ update: update} = resp) do + + new_updates = [{regid, new_regid} | update] + parse_result1(reg_res, rest_results, %{resp | message_id: id, update: new_updates}) + end + + def parse_result1([regid | reg_res], + [%{"message_id" => id} | rest_results], + %NotificationResponse{ok: ok} = resp) do + + parse_result1(reg_res, rest_results, %{resp | message_id: id, ok: [regid | ok]}) + end + + def parse_result1([regid | reg_res], + [%{"error" => "Unavailable"} | rest_results], + %NotificationResponse{retry: retry} = resp) do + + parse_result1(reg_res, rest_results, %{resp | retry: [regid | retry]}) + end + + def parse_result1([regid | reg_res], + [%{"error" => invalid } | rest_results], + %NotificationResponse{remove: remove} = resp) when invalid == "NotRegistered" + or invalid == "InvalidRegistration" do + + parse_result1(reg_res, rest_results, %{resp | remove: [regid | remove]}) + end + + def parse_result1([regid | reg_res] = regs, + [%{"error" => error} | rest_results] = results, + %NotificationResponse{error: regs_in_error} = resp) do + + case Map.has_key?(regs_in_error, error) do + true -> + parse_result1(reg_res, rest_results, + %{resp | error: %{regs_in_error | error => regid}}) + false -> # create map key if required. + parse_result1(regs, results, + %{resp | error: Map.merge(%{error => []}, regs_in_error)}) + end + end + + def error_msg(code, _error) do + case code do + "400" -> "Malformed JSON" + "401" -> "Unauthorized" + _ -> "Unknown" + end + end +end diff --git a/lib/pigeon/generic_h2_worker.ex b/lib/pigeon/generic_h2_worker.ex new file mode 100644 index 00000000..c3f65013 --- /dev/null +++ b/lib/pigeon/generic_h2_worker.ex @@ -0,0 +1,208 @@ +defmodule Pigeon.GenericH2Worker do + defmacro __using__(opts) do + + [ + case opts[:ping_interval] do + nil -> + quote do + def schedule_ping(_pid) do + :ok + end + end + ping_interval -> + quote do + def schedule_ping(pid) do + Process.send_after(pid, :ping, unquote(ping_interval)) + end + end + end, + + quote do + use GenServer + require Logger + + def start_link(name, config) do + start_link(Keyword.merge(config, [name: name])) + end + + def start_link(config) do + GenServer.start_link(__MODULE__, config, name: config[:name]) + end + + def stop, do: :gen_server.cast(self(), :stop) + + def init(config) do + Process.flag(:trap_exit, true) + schedule_ping(self()) + {:ok, new_state(config)} + end + + defp new_state(config, socket \\ nil) do + %{ + socket: socket, + queue: %{}, + config: config + } + end + + def initialize_worker(config) do + mode = config[:mode] + case connect_socket(config, 0) do + {:ok, socket} -> + {:ok, new_state(config, socket)} + {:closed, _socket} -> + Logger.error """ + Socket closed unexpectedly. + """ + {:ok, new_state(config)} + {:error, :timeout} -> + Logger.error """ + Failed to establish SSL connection. Is the certificate signed for :#{mode} mode? + """ + {:ok, new_state(config)} + {:error, :invalid_config} -> + Logger.error """ + Invalid configuration. + """ + {:stop, {:error, :invalid_config}} + end + end + + def connect_socket(_config, 3), do: {:error, :timeout} + def connect_socket(config, tries) do + uri = host(config) + case socket_options(config) do + {:ok, options} -> do_connect_socket(config, uri, options, tries) + error -> error + end + end + + defp do_connect_socket(config, uri, options, tries) do + case Pigeon.H2.open(uri, port(config), options) do + {:ok, socket} -> {:ok, socket} + {:error, reason} -> + Logger.error(inspect(reason)) + connect_socket(config, tries + 1) + end + end + + def handle_cast(:stop, state), do: { :noreply, state } + + def handle_cast({:push, _, notification}, state) do + send_push(state, notification, nil, []) + end + + def handle_cast({:push, _, notification, on_response}, state) do + send_push(state, notification, on_response, []) + end + + def handle_cast({:push, _, notification, on_response, opts}, state) do + send_push(state, notification, on_response, opts) + end + + def handle_cast(_msg, state) do + {:noreply, state} + end + + def send_push(%{socket: nil, config: config}, notification, + on_reponse, opts) do + Logger.info "Reconnecting to push service provider before request" + case initialize_worker(config) do + {:ok, newstate} -> send_push(newstate, notification, on_reponse, opts) + error -> error + end + end + + def send_push(state, notification, on_response, _opts) do + %{socket: socket, queue: queue, config: config} = state + payload = encode_notification(notification) + + headers = req_headers(config, notification) + uri = host(config) + path = req_path(notification) + case Pigeon.H2.post(socket, uri, path, headers, payload) do + {:ok, stream_id} -> + new_q = Map.put(queue, "#{stream_id}", {notification, on_response}) + {:noreply, %{state | queue: new_q }} + {:error, reason} -> + maybe_respond({:error, reason}, on_response) + {:noreply, state} + end + end + + defp maybe_respond(response, on_response) do + unless on_response == nil, do: on_response.(response) + end + + def handle_info(:ping, state) do + case state do + %{socket: nil} -> + :ok + %{socket: conn} -> + Pigeon.H2.ping(conn) + end + schedule_ping(self()) + {:noreply, state} + end + + def handle_info({:END_STREAM, stream_id}, %{socket: socket, queue: queue} = state) do + {:ok, {headers, body}} = Pigeon.H2.receive(socket, stream_id) + + {notification, on_response} = queue["#{stream_id}"] + case get_status(headers) do + "200" -> + notification = parse_response(notification, headers, body) + maybe_respond({:ok, notification}, on_response) + new_queue = Map.delete(queue, "#{stream_id}") + nil -> + maybe_respond({:error, :no_respose, notification}, + on_response) + code -> + reason = + try do + parse_error(notification, headers, body) + catch + _, unknown -> unknown + end + log_error(code, reason, notification) + maybe_respond({:error, reason, notification}, on_response) + end + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + end + + def handle_info({:PONG, _from}, state) do + Logger.debug ~s"Received ping from the push service" + {:noreply, state} + end + + def handle_info({:ok, _from}, state), do: {:noreply, state} + + def handle_info({:EXIT, socket, _}, %{socket: socket} = state) do + {:noreply, %{state | socket: nil}} + end + def handle_info({:EXIT, _socket, _}, state) do + {:noreply, state} + end + + def handle_info(unknown, state) do + Logger.warn(~s"Unknown info #{inspect unknown}") + {:noreply, state} + end + + defp get_status(nil), do: nil + defp get_status(headers) do + case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do + {":status", status} -> status + nil -> nil + end + end + + defp log_error(code, reason, notification) do + Logger.error("Error code #{code} (#{inspect reason}): #{error_msg(code, reason)}") + end + + end + ] + end +end diff --git a/lib/pigeon/h2.ex b/lib/pigeon/h2.ex new file mode 100644 index 00000000..06ef0ac1 --- /dev/null +++ b/lib/pigeon/h2.ex @@ -0,0 +1,51 @@ +defmodule Pigeon.H2 do + def open(uri, port, opts \\ []) do + try do + :h2_client.start_link(:https, to_charlist(uri), port, opts) + catch + _, reason -> + {:error, reason} + end + end + + def close(conn) do + :h2_client.stop(conn) + end + + def post(conn, uri, path, headers, body) do + case :h2_connection.new_stream(conn) do + {:error, _code} -> + {:error, :unable_to_add_stream} + stream_id -> + headers = make_headers(:post, uri, path, headers, body) + :ok = :h2_connection.send_headers(conn, stream_id, headers) + :ok = :h2_connection.send_body(conn, stream_id, body) + {:ok, stream_id} + end + end + + def receive(conn, stream_id) do + case :h2_connection.get_response(conn, stream_id) do + {:ok, {headers, :undefined}} -> + {:ok, {headers, ""}} + {:ok, {headers, body}} -> + {:ok, {headers, Enum.join(body)}} + {:error, reason} -> + {:error, reason} + end + end + + def ping(conn) do + :h2_client.send_ping(conn) + end + + defp make_headers(method, uri, path, headers, body) do + [ + {":method", String.upcase(Atom.to_string(method))}, + {":path", path}, + {":scheme", "https"}, + {":authority", uri}, + {"content-length", "#{byte_size(body)}"} + ] ++ headers + end +end diff --git a/lib/pigeon/notification.ex b/lib/pigeon/notification.ex index 48a78e0b..15edc4f7 100644 --- a/lib/pigeon/notification.ex +++ b/lib/pigeon/notification.ex @@ -21,11 +21,11 @@ defmodule Pigeon.APNS.Notification do """ defstruct device_token: nil, payload: %{"aps" => %{}}, expiration: nil, topic: nil, id: nil - def new(msg, token, topic \\ nil) do + def new(msg, token, topic \\ nil, data \\ nil) do %Pigeon.APNS.Notification{ device_token: token, topic: topic, - payload: %{"aps" => %{"alert" => msg}} + payload: Map.merge(%{"aps" => msg}, data || %{}), } end @@ -33,7 +33,7 @@ defmodule Pigeon.APNS.Notification do %Pigeon.APNS.Notification{ device_token: token, topic: topic, - payload: %{"aps" => %{"alert" => msg}}, + payload: %{"aps" => msg}, id: id } end @@ -91,6 +91,14 @@ defmodule Pigeon.GCM.Notification do end end +defmodule Pigeon.GCM.NotificationResponse do + @moduledoc """ + Passed to the GCM on_response callback + """ + defstruct message_id: nil, ok: [], retry: [], update: [], remove: [], error: %{} + +end + defmodule Pigeon.ADM.Notification do @moduledoc """ Defines Amazon ADM notification struct and convenience constructor functions. diff --git a/mix.exs b/mix.exs index 2939c6a6..f458ad88 100644 --- a/mix.exs +++ b/mix.exs @@ -24,14 +24,14 @@ defmodule Pigeon.Mixfile do end def application do - [applications: [:logger, :httpoison, :poolboy, :kadabra], + [applications: [:logger, :httpoison, :poolboy, :chatterbox, :hpack], mod: {Pigeon, []}] end defp deps do [{:poison, "~> 2.0 or ~> 3.0"}, {:httpoison, "~> 0.7"}, - {:kadabra, "~> 0.1.0"}, + {:chatterbox, github: "rslota/chatterbox", tag: "20f0096"}, {:poolboy, "~> 1.5"}, {:dogma, "~> 0.1", only: :dev}, {:earmark, "~> 1.0", only: :dev}, diff --git a/mix.lock b/mix.lock new file mode 100644 index 00000000..5f97b5ee --- /dev/null +++ b/mix.lock @@ -0,0 +1,19 @@ +%{"certifi": {:hex, :certifi, "1.0.0", "1c787a85b1855ba354f0b8920392c19aa1d06b0ee1362f9141279620a5be2039", [:rebar3], []}, + "chatterbox": {:git, "https://github.com/rslota/chatterbox.git", "20f00960127a5ffd1f2b79cf9f7b4b8fe85bc97b", [tag: "20f0096"]}, + "dogma": {:hex, :dogma, "0.1.14", "561abecb25a2408738e7d2bc34cc565a07e6fadbc4fb91b2cbb35a5a9234cf1c", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]}, + "earmark": {:hex, :earmark, "1.1.1", "433136b7f2e99cde88b745b3a0cfc3fbc81fe58b918a09b40fce7f00db4d8187", [:mix], []}, + "ex_doc": {:hex, :ex_doc, "0.15.0", "e73333785eef3488cf9144a6e847d3d647e67d02bd6fdac500687854dd5c599f", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, + "excoveralls": {:hex, :excoveralls, "0.6.2", "0e993d096f1fbb6e70a3daced5c89aac066bda6bce57829622aa2d1e2b338cfb", [:mix], [{:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]}, + "exjsx": {:hex, :exjsx, "3.2.1", "1bc5bf1e4fd249104178f0885030bcd75a4526f4d2a1e976f4b428d347614f0f", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]}, + "goldrush": {:git, "https://github.com/basho/goldrush.git", "8f1b715d36b650ec1e1f5612c00e28af6ab0de82", [tag: "0.1.9"]}, + "hackney": {:hex, :hackney, "1.7.1", "e238c52c5df3c3b16ce613d3a51c7220a784d734879b1e231c9babd433ac1cb4", [:rebar3], [{:certifi, "1.0.0", [hex: :certifi, optional: false]}, {:idna, "4.0.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, + "hpack": {:git, "https://github.com/joedevivo/hpack.git", "6b58b6231e9b6ab83096715120578976f72f4f7c", [tag: "0.2.3"]}, + "httpoison": {:hex, :httpoison, "0.11.1", "d06c571274c0e77b6cc50e548db3fd7779f611fbed6681fd60a331f66c143a0b", [:mix], [{:hackney, "~> 1.7.0", [hex: :hackney, optional: false]}]}, + "idna": {:hex, :idna, "4.0.0", "10aaa9f79d0b12cf0def53038547855b91144f1bfcc0ec73494f38bb7b9c4961", [:rebar3], []}, + "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], []}, + "lager": {:git, "https://github.com/basho/lager.git", "81eaef0ce98fdbf64ab95665e3bc2ec4b24c7dac", [tag: "3.2.4"]}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []}, + "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []}, + "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}} diff --git a/test/gcm_test.exs b/test/gcm_test.exs index b364c13d..19cee77f 100644 --- a/test/gcm_test.exs +++ b/test/gcm_test.exs @@ -1,79 +1,100 @@ defmodule Pigeon.GCMTest do use ExUnit.Case - + alias Pigeon.GCM.Notification + alias Pigeon.GCM.NotificationResponse + require Logger @data %{"message" => "Test push"} @payload %{"data" => @data} defp valid_gcm_reg_id, do: Application.get_env(:pigeon, :test)[:valid_gcm_reg_id] test "successfully sends a valid push" do - result = + {:ok, notification} = valid_gcm_reg_id() - |> Pigeon.GCM.Notification.new(%{}, @data) + |> Notification.new(%{}, @data) |> Pigeon.GCM.push - assert result == :ok + assert notification.ok == [valid_gcm_reg_id()] + end + + test "Can merge two responses" do + nr1 = %NotificationResponse{ + ok: ["42"], + retry: ["12"], + error: %{"error" => ["1"]} + } + nr2 = %NotificationResponse{ + ok: ["43"], + update: ["12"], + error: %{"error" => ["2"], "error2"=> ["1"]} + } + assert Pigeon.GCM.merge(nr1, nr2) == + %NotificationResponse{ + ok: ["42", "43"], + retry: ["12"], + update: ["12"], + error: %{"error" => ["1", "2"] , "error2" => ["1"]} + } + end + + test "Message for less than 1000 recipients should not be chunked" do + regs = Enum.to_list(1..999) + notification = Notification.new(regs, %{}, @data) + assert [{^regs, encoded}] = res = Pigeon.GCM.encode_requests(notification) + end + + test "Message for over 1000 recipients should be chunked" do + regs = Enum.to_list(1..2534) + notification = Notification.new(regs, %{}, @data) + res = Pigeon.GCM.encode_requests(notification) + assert [{r1, e1}, {r2, e2}, {r3, e3}] = res + assert length(r1) == 1000 + assert length(r2) == 1000 + assert length(r3) == 534 end test "successfully sends a valid push with an explicit config" do - result = + response = valid_gcm_reg_id() - |> Pigeon.GCM.Notification.new(%{}, @data) - |> Pigeon.GCM.push(%{gcm_key: System.get_env("GCM_KEY")}) + |> Notification.new(%{}, @data) + |> Pigeon.GCM.push(%{gcm_key: "explicit"}) - assert result == :ok + assert response == {:error, :unauthorized} end test "successfully sends a valid push with callback" do reg_id = valid_gcm_reg_id() - n = Pigeon.GCM.Notification.new(reg_id, %{}, @data) - - Pigeon.GCM.push(n, fn(x) -> send self(), x end) + n = Notification.new(reg_id, %{}, @data) + pid = self() + Pigeon.GCM.send_push(n, fn(x) -> send pid, x end, %{}) - assert_receive {_ref, [{:ok, notification}]}, 5000 - assert notification.registration_id == reg_id - assert notification.payload == %{"data" => @data} + assert_receive {:ok, notification}, 5000 + assert notification.ok == [reg_id] end test "returns an error on pushing with a bad registration_id" do reg_id = "bad_registration_id" - n = Pigeon.GCM.Notification.new(reg_id, %{}, @data) + n = Notification.new(reg_id, %{}, @data) + pid = self() + Pigeon.GCM.send_push(n, fn(x) -> send pid, x end, %{}) - Pigeon.GCM.push(n, fn(x) -> send self(), x end) - - assert_receive {_ref, [{:error, :invalid_registration, n}]}, 5000 + assert_receive {:ok, %Pigeon.GCM.NotificationResponse{remove: ["bad_registration_id"]}}, 5000 assert n.registration_id == reg_id assert n.payload == %{"data" => @data} end - test "parse_result with success" do - assert Pigeon.GCM.parse_result(%{ "message_id" => "1:0408" }) == {:ok, "1:0408"} - end - - test "parse_result with success and new registration_id" do - assert Pigeon.GCM.parse_result(%{ "message_id" => "1:2342", "registration_id" => "32" }) == - {:ok, "1:2342", "32"} - end - - test "parse_result with error unavailable" do - assert Pigeon.GCM.parse_result(%{ "error" => "Unavailable" }) == {:error, :unavailable} - end - test "encode_requests with one registration_id" do - registration_id = [["123456"]] - assert Pigeon.GCM.encode_requests(registration_id, @payload) == - [{"123456", ~S({"to":"123456","data":{"message":"Test push"}})}] + registration_id = "123456" + payload = Notification.new(registration_id, %{},@data) + assert Pigeon.GCM.encode_requests(payload) == + [{["123456"], ~S({"to":"123456","data":{"message":"Test push"}})}] end test "encode_requests with multiple registration_ids" do registration_id = ["aaaaaa", "bbbbbb", "cccccc"] + payload = Notification.new(registration_id, %{},@data) expected = ~S({"registration_ids":["aaaaaa","bbbbbb","cccccc"],"data":{"message":"Test push"}}) - assert Pigeon.GCM.encode_requests([registration_id], @payload) == [{registration_id, expected}] + assert Pigeon.GCM.encode_requests(payload) == [{registration_id, expected}] end - test "encode_requests with over 1000 registration_ids" do - reg_ids = Enum.chunk(Enum.to_list(1..2500), 1000, 1000, []) - result = Pigeon.GCM.encode_requests(reg_ids, @payload) - assert Enum.count(result) == 3 - end end diff --git a/test/gcm_worker_test.exs b/test/gcm_worker_test.exs new file mode 100644 index 00000000..8fcde1e1 --- /dev/null +++ b/test/gcm_worker_test.exs @@ -0,0 +1,63 @@ +defmodule Pigeon.GCMWorkerTest do + use ExUnit.Case + alias Pigeon.GCMWorker + alias Pigeon.GCM.NotificationResponse + + @data %{"message" => "Test push"} + @payload %{"data" => @data} + + defp valid_gcm_reg_id, do: Application.get_env(:pigeon, :test)[:valid_gcm_reg_id] + + test "parse_result with success" do + {:ok, response} = + GCMWorker.parse_result1( + ["regid"], + [%{ "message_id" => "1:0408" }], + &(&1), %NotificationResponse{} + ) + assert response.ok == ["regid"] + end + + test "parse_result with success and new registration_id" do + {:ok, response} = + GCMWorker.parse_result1( + ["regid"], + [%{ "message_id" => "1:2342", "registration_id" => "32" }], + &(&1), %NotificationResponse{} + ) + + assert response.update == [{"regid", "32"}] + assert response.message_id == "1:2342" + end + + test "parse_result with error unavailable" do + {:ok, response} = + GCMWorker.parse_result1( + ["regid"], + [%{ "error" => "Unavailable" }], + &(&1), + %NotificationResponse{} + ) + assert response.retry == ["regid"] + end + + test "parse_result with custom error" do + {:ok, response} = + GCMWorker.parse_result1( + ["regid"], + [%{ "error" => "CustomError" }], + &(&1), + %NotificationResponse{} + ) + assert response.error == %{"CustomError" => "regid"} + end + + test "send malformed JSON" do + {:ok, pid} = GCMWorker.start_link(:gonecrashing, key: Application.get_env(:pigeon, :gcm)[:key]) + me = self() + :gen_server.cast(pid, {:push, :gcm, {"toto", "this is not json"}, &(send me, &1), %{}}) + assert_receive {:error, :malformed_json}, 5000 + :gen_server.cast(pid, :stop) + end + +end