diff --git a/lib/pigeon/apns_worker.ex b/lib/pigeon/apns_worker.ex index 1e0c0081..b930f88f 100644 --- a/lib/pigeon/apns_worker.ex +++ b/lib/pigeon/apns_worker.ex @@ -2,7 +2,7 @@ defmodule Pigeon.APNSWorker do @moduledoc """ Handles all APNS request and response parsing over an HTTP2 connection. """ - use Pigeon.GenericH2Worker + use Pigeon.GenericH2Worker, ping_interval: 600_000 require Logger def host(config) do @@ -19,6 +19,10 @@ defmodule Pigeon.APNSWorker do end end + def encode_notification(notification) do + Pigeon.Notification.json_payload(notification.payload) + end + def socket_options(config) do cert = get_opt(config, :cert, :certfile) key = get_opt(config, :key, :keyfile) @@ -43,7 +47,8 @@ defmodule Pigeon.APNSWorker do end def req_headers(_config, notification) do - [] + [{"content-type", "application/json"}, + {"accept", "application/json"}] |> put_apns_id(notification) |> put_apns_topic(notification) end diff --git a/lib/pigeon/gcm_worker.ex b/lib/pigeon/gcm_worker.ex index a63bc32c..9ec83841 100644 --- a/lib/pigeon/gcm_worker.ex +++ b/lib/pigeon/gcm_worker.ex @@ -2,7 +2,7 @@ defmodule Pigeon.GCMWorker do @moduledoc """ Handles all FCM request and response parsing over an HTTP2 connection. """ - use Pigeon.GenericH2Worker + use Pigeon.GenericH2Worker, ping_interval: 60_000 alias Pigeon.GCM.NotificationResponse require Logger @@ -17,11 +17,17 @@ defmodule Pigeon.GCMWorker do end def socket_options(_config) do - [] + {:ok, []} + end + + def encode_notification({_registration_ids, notification}) do + notification end def req_headers(config, _notification) do - [{"authorization", "key=#{config[:key]}"}] + [{"authorization", "key=#{config[:key]}"}, + {"content-type", "application/json"}, + {"accept", "application/json"}] end def req_path(_notification) do @@ -33,9 +39,9 @@ defmodule Pigeon.GCMWorker do response["reason"] |> Macro.underscore |> String.to_existing_atom end - defp parse_response(notification, _headers, body) do + defp parse_response({registration_ids, payload}, _headers, body) do result = Poison.decode! body - parse_result(notification.registration_ids, result) + parse_result(registration_ids, result) end def parse_result(ids, %{"results" => results}) do diff --git a/lib/pigeon/generic_h2_worker.ex b/lib/pigeon/generic_h2_worker.ex index 89ebf988..d21a2961 100644 --- a/lib/pigeon/generic_h2_worker.ex +++ b/lib/pigeon/generic_h2_worker.ex @@ -1,169 +1,207 @@ defmodule Pigeon.GenericH2Worker do - defmacro __using__(_opts) do - quote do - use GenServer - require Logger + defmacro __using__(opts) do + + [ + case opts[:ping_interval] do + nil -> + quote do + def schedule_ping(_pid) do + :ok + end + end + ping_interval -> + quote do + def schedule_ping(pid) do + Process.send_after(pid, :ping, unquote(ping_interval)) + end + end + end, + + quote do + use GenServer + require Logger + + def start_link(name, config) do + start_link(Keyword.merge(config, [name: name])) + end - def start_link(name, config) do - start_link(Keyword.merge(config, [name: name])) - end + def start_link(config) do + GenServer.start_link(__MODULE__, config, name: config[:name]) + end - def start_link(config) do - GenServer.start_link(__MODULE__, config, name: config[:name]) - end + def stop, do: :gen_server.cast(self(), :stop) - def stop, do: :gen_server.cast(self(), :stop) + def init(config) do + Process.flag(:trap_exit, true) + schedule_ping(self()) + {:ok, new_state(config)} + end - def init(config) do - Process.flag(:trap_exit, true) - {:ok, new_state(config)} - end + defp new_state(config, socket \\ nil) do + %{ + socket: socket, + queue: %{}, + config: config + } + end - defp new_state(config, socket \\ nil) do - %{ - socket: socket, - queue: %{}, - config: config - } - end + def initialize_worker(config) do + mode = config[:mode] + case connect_socket(config, 0) do + {:ok, socket} -> + {:ok, new_state(config, socket)} + {:closed, _socket} -> + Logger.error """ + Socket closed unexpectedly. + """ + {:ok, new_state(config)} + {:error, :timeout} -> + Logger.error """ + Failed to establish SSL connection. Is the certificate signed for :#{mode} mode? + """ + {:ok, new_state(config)} + {:error, :invalid_config} -> + Logger.error """ + Invalid configuration. + """ + {:stop, {:error, :invalid_config}} + end + end - def initialize_worker(config) do - mode = config[:mode] - case connect_socket(config, 0) do - {:ok, socket} -> - {:ok, new_state(config, socket)} - {:closed, _socket} -> - Logger.error """ - Socket closed unexpectedly. - """ - {:ok, new_state(config)} - {:error, :timeout} -> - Logger.error """ - Failed to establish SSL connection. Is the certificate signed for :#{mode} mode? - """ - {:ok, new_state(config)} - {:error, :invalid_config} -> - Logger.error """ - Invalid configuration. - """ - {:stop, {:error, :invalid_config}} + def connect_socket(_config, 3), do: {:error, :timeout} + def connect_socket(config, tries) do + uri = host(config) + case socket_options(config) do + {:ok, options} -> do_connect_socket(config, uri, options, tries) + error -> error + end end - end - def connect_socket(_config, 3), do: {:error, :timeout} - def connect_socket(config, tries) do - uri = host(config) - case socket_options(config) do - {:ok, options} -> do_connect_socket(config, uri, options, tries) - error -> error + defp do_connect_socket(config, uri, options, tries) do + case Pigeon.H2.open(uri, port(config), options) do + {:ok, socket} -> {:ok, socket} + {:error, reason} -> + Logger.error(inspect(reason)) + connect_socket(config, tries + 1) + end end - end - defp do_connect_socket(config, uri, options, tries) do - case Pigeon.H2.open(uri, port(config), options) do - {:ok, socket} -> {:ok, socket} - {:error, reason} -> - Logger.error(inspect(reason)) - connect_socket(config, tries + 1) + def handle_cast(:stop, state), do: { :noreply, state } + + def handle_cast({:push, _, notification}, state) do + send_push(state, notification, nil, []) end - end - def handle_cast(:stop, state), do: { :noreply, state } + def handle_cast({:push, _, notification, on_response}, state) do + send_push(state, notification, on_response, []) + end - def handle_cast({:push, _, notification}, state) do - send_push(state, notification, nil) - end + def handle_cast({:push, _, notification, on_response, opts}, state) do + send_push(state, notification, on_response, opts) + end - def handle_cast({:push, _, notification, on_response}, state) do - send_push(state, notification, on_response) - end + def handle_cast(_msg, state) do + {:noreply, state} + end - def handle_cast(_msg, state) do - {:noreply, state} - end + def send_push(%{socket: nil, config: config}, notification, + on_reponse, opts) do + Logger.info "Reconnecting to push service provider before request" + case initialize_worker(config) do + {:ok, newstate} -> send_push(newstate, notification, on_reponse, opts) + error -> error + end + end - def send_push(%{socket: nil, config: config}, notification, on_reponse) do - Logger.info "Reconnecting to push service provider before request" - case initialize_worker(config) do - {:ok, newstate} -> send_push(newstate, notification, on_reponse) - error -> error + def send_push(state, notification, on_response, _opts) do + %{socket: socket, queue: queue, config: config} = state + payload = encode_notification(notification) + + headers = req_headers(config, notification) + uri = host(config) + path = req_path(notification) + case Pigeon.H2.post(socket, uri, path, headers, payload) do + {:ok, stream_id} -> + new_q = Map.put(queue, "#{stream_id}", {notification, on_response}) + {:noreply, %{state | queue: new_q }} + {:error, reason} -> + maybe_respond({:error, reason}, on_response) + {:noreply, state} + end end - end - def send_push(state, notification, on_response) do - %{socket: socket, queue: queue, config: config} = state - json = Pigeon.Notification.json_payload(notification.payload) - - headers = req_headers(config, notification) - uri = host(config) - path = req_path(notification) - case Pigeon.H2.post(socket, uri, path, headers, json) do - {:ok, stream_id} -> - new_q = Map.put(queue, "#{stream_id}", {notification, on_response}) - {:noreply, %{state | queue: new_q }} - {:error, reason} -> - maybe_respond({:error, reason}, on_response) - {:noreply, state} + defp maybe_respond(response, on_response) do + unless on_response == nil, do: on_response.(response) end - end - defp maybe_respond(response, on_response) do - unless on_response == nil, do: on_response.(response) - end + def handle_info(:ping, state) do + case state do + %{socket: nil} -> + :ok + %{socket: conn} -> + Pigeon.H2.ping(conn) + end + schedule_ping(self()) + {:noreply, state} + end - def handle_info({:END_STREAM, stream_id}, %{socket: socket, queue: queue} = state) do - {:ok, {headers, body}} = Pigeon.H2.receive(socket, stream_id) - - {notification, on_response} = queue["#{stream_id}"] - case get_status(headers) do - "200" -> - notification = parse_response(notification, headers, body) - maybe_respond({:ok, notification}, on_response) - new_queue = Map.delete(queue, "#{stream_id}") - nil -> - :ok - code -> - reason = - try do - parse_error(notification, headers, body) - catch - _, unknown -> unknown - end - log_error(code, reason, notification) - maybe_respond({:error, reason, notification}, on_response) - end - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - end + def handle_info({:END_STREAM, stream_id}, %{socket: socket, queue: queue} = state) do + {:ok, {headers, body}} = Pigeon.H2.receive(socket, stream_id) + + {notification, on_response} = queue["#{stream_id}"] + case get_status(headers) do + "200" -> + notification = parse_response(notification, headers, body) + maybe_respond({:ok, notification}, on_response) + new_queue = Map.delete(queue, "#{stream_id}") + nil -> + :ok + code -> + reason = + try do + parse_error(notification, headers, body) + catch + _, unknown -> unknown + end + log_error(code, reason, notification) + maybe_respond({:error, reason, notification}, on_response) + end + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + end - def handle_info({:ping, _from}, state), do: {:noreply, state} + def handle_info({:PONG, _from}, state) do + Logger.debug ~s"Received ping from the push service" + {:noreply, state} + end - def handle_info({:ok, _from}, state), do: {:noreply, state} + def handle_info({:ok, _from}, state), do: {:noreply, state} - def handle_info({:EXIT, socket, _}, %{socket: socket} = state) do - {:noreply, %{state | socket: nil}} - end - def handle_info({:EXIT, _socket, _}, state) do - {:noreply, state} - end + def handle_info({:EXIT, socket, _}, %{socket: socket} = state) do + {:noreply, %{state | socket: nil}} + end + def handle_info({:EXIT, _socket, _}, state) do + {:noreply, state} + end - def handle_info(unknown, state) do - Logger.warn(~s"Unknown info #{inspect unknown}") - {:noreply, state} - end + def handle_info(unknown, state) do + Logger.warn(~s"Unknown info #{inspect unknown}") + {:noreply, state} + end - defp get_status(nil), do: nil - defp get_status(headers) do - case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do - {":status", status} -> status - nil -> nil + defp get_status(nil), do: nil + defp get_status(headers) do + case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do + {":status", status} -> status + nil -> nil + end end - end - defp log_error(code, reason, notification) do - Logger.error("Error code #{{code}} (#{reason}): #{error_msg(code, reason)}") - end + defp log_error(code, reason, notification) do + Logger.error("Error code #{code} (#{inspect reason}): #{error_msg(code, reason)}") + end - end + end + ] end end diff --git a/lib/pigeon/h2.ex b/lib/pigeon/h2.ex index c5e8b85c..b6789cd6 100644 --- a/lib/pigeon/h2.ex +++ b/lib/pigeon/h2.ex @@ -33,6 +33,10 @@ defmodule Pigeon.H2 do end end + def ping(conn) do + :h2_client.send_ping(conn) + end + defp make_headers(method, uri, path, headers, body) do [ {":method", String.upcase(Atom.to_string(method))}, diff --git a/mix.exs b/mix.exs index 10baac8c..f458ad88 100644 --- a/mix.exs +++ b/mix.exs @@ -31,7 +31,7 @@ defmodule Pigeon.Mixfile do defp deps do [{:poison, "~> 2.0 or ~> 3.0"}, {:httpoison, "~> 0.7"}, - {:chatterbox, github: "rslota/chatterbox"}, + {:chatterbox, github: "rslota/chatterbox", tag: "20f0096"}, {:poolboy, "~> 1.5"}, {:dogma, "~> 0.1", only: :dev}, {:earmark, "~> 1.0", only: :dev}, diff --git a/mix.lock b/mix.lock index b96c627b..3a4718f3 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{"certifi": {:hex, :certifi, "1.0.0", "1c787a85b1855ba354f0b8920392c19aa1d06b0ee1362f9141279620a5be2039", [:rebar3], []}, - "chatterbox": {:git, "https://github.com/rslota/chatterbox.git", "fcdc1214dfeedc14c138e6d248714883042c7268", []}, + "chatterbox": {:git, "https://github.com/rslota/chatterbox.git", "054edf85227be59c5a8bc85d02212240977f0325", [tag: "054edf8"]}, "dogma": {:hex, :dogma, "0.1.14", "561abecb25a2408738e7d2bc34cc565a07e6fadbc4fb91b2cbb35a5a9234cf1c", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]}, "earmark": {:hex, :earmark, "1.1.1", "433136b7f2e99cde88b745b3a0cfc3fbc81fe58b918a09b40fce7f00db4d8187", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.15.0", "e73333785eef3488cf9144a6e847d3d647e67d02bd6fdac500687854dd5c599f", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]},