Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic HTTP/2 workers #61

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e018168
Customize endpoint values for GCM and APNS
cstar Jan 3, 2017
b9e6368
Port is configurable
cstar Jan 3, 2017
19b4600
port is simple to configure
cstar Jan 6, 2017
c17aabf
Initial happy path
cstar Jan 11, 2017
5f510f4
FCM over HTTP2, tests pass.
cstar Jan 13, 2017
d0bab72
Tuplify success passed to on_response
cstar Jan 13, 2017
085c1be
This use case is not handled yet.
cstar Jan 13, 2017
ad2a579
Merge branch 'fcm-http2'
cstar Jan 13, 2017
c8b5fc8
Configurable port and endpoint for FCM
cstar Jan 13, 2017
4b87855
Configurable port and endpoint for FCM
cstar Jan 13, 2017
7a248e3
%Pigeon.GCM.NotificationResponse as callback arg
cstar Jan 16, 2017
e5f451a
Chunk messages when sending to over 1000 recipients
cstar Jan 17, 2017
6103104
GCM Custom configuration ; `gcm_key` supported.
cstar Jan 17, 2017
1fe6a79
Merge branch 'fcm-http2' of https://github.com/cstar/pigeon into csta…
hpopp Jan 18, 2017
cb87dd6
refactor: GCM.merge/2 and various whitespace issues
hpopp Jan 18, 2017
f070c26
Merge branch 'fcm-http2'
cstar Jan 18, 2017
054b3e0
Ensure reply message is delivered to correct callback in sync mode
cstar Jan 19, 2017
0ae790b
Reconnect socket automatically.
cstar Jan 19, 2017
ccb446e
Remove pings
cstar Jan 30, 2017
22bf3e3
cleanup code
cstar Jan 30, 2017
8e6e1e1
Do not agressively reconnect, now only on demand.
cstar Jan 30, 2017
3d70150
Allow external configuration
rslota Feb 2, 2017
ad93c44
Allow for multiple GCM pools
rslota Feb 2, 2017
38ec10f
Allow for map payload in APNS
rslota Feb 3, 2017
f14e2f3
Fix FCM payload size
rslota Feb 8, 2017
6d1e4e3
Fix APNS port setting
rslota Feb 8, 2017
a07d2e3
Fix GCM crash
rslota Feb 28, 2017
64fddf1
Make timeout configurable
rslota Mar 2, 2017
3f94555
Use chatterbox for FCM connections
rslota Mar 3, 2017
f026c19
Use chatterbox for APNS connections
rslota Mar 6, 2017
267cc90
Update chatterbox
rslota Mar 6, 2017
1bf81a6
Introduce generic H2 worker implementation
rslota Mar 6, 2017
087bb38
Add ping feature
rslota Mar 7, 2017
0bbd55b
Handle empty responses
rslota Mar 20, 2017
c79ecd8
Add support for APNS custom data payload
rslota Apr 13, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lib/pigeon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ defmodule Pigeon do

def start(_type, _args) do
opts = [strategy: :one_for_one, name: :pigeon]
Application.put_env(:chatterbox, :ssl_options, [])
Supervisor.start_link(workers(), opts)
end

defp workers do
adm_worker() ++ apns_workers()
adm_worker() ++ apns_workers() ++ gcm_workers()
end

def adm_worker do
Expand All @@ -38,4 +39,13 @@ defmodule Pigeon do
true -> []
end
end

defp gcm_workers do
name = Pigeon.GCMWorker.default_name()
cond do
config = Application.get_env(:pigeon, :gcm) ->
[worker(Pigeon.GCMWorker, [name, config], id: name)]
true -> []
end
end
end
11 changes: 7 additions & 4 deletions lib/pigeon/apns.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ defmodule Pigeon.APNS do

def push(notification, opts \\ [])
def push(notification, opts) when is_list(notification) do
timeout = opts[:timeout] || @default_timeout
case opts[:on_response] do
nil ->
tasks = for n <- notification, do: Task.async(fn -> do_sync_push(n, opts) end)
tasks
|> Task.yield_many(@default_timeout + 500)
|> Task.yield_many(timeout + 500)
|> Enum.map(fn {task, response} -> response || Task.shutdown(task, :brutal_kill) end)
|> group_responses
on_response -> push(notification, on_response, opts)
Expand All @@ -31,15 +32,17 @@ defmodule Pigeon.APNS do

defp do_sync_push(notification, opts) do
pid = self()
on_response = fn(x) -> send pid, {:ok, x} end
ref = :erlang.make_ref
on_response = fn(x) -> send pid, {ref, x} end
timeout = opts[:timeout] || @default_timeout

worker_name = opts[:name] || Config.default_name
GenServer.cast(worker_name, {:push, :apns, notification, on_response})

receive do
{:ok, x} -> x
{^ref, x} -> x
after
@default_timeout -> {:error, :timeout, notification}
timeout -> {:error, :timeout, notification}
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/pigeon/apns_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ defmodule Pigeon.APNS.Config do
Validates configuration settings that initialize APNSWorkers.
"""

def default_name, do: :default
def default_name, do: :apns_default

def config(name) do
config = Application.get_env(:pigeon, :apns)[name]
config(name, config)
end

def config(name, config) do
%{
production_endpoint: config[:production_endpoint] || "api.push.apple.com",
development_endpoint: config[:development_endpoint] || "api.development.push.apple.com",
port: config[:port] || 443,
name: name,
mode: config[:mode],
cert: cert(config[:cert]),
Expand Down
189 changes: 32 additions & 157 deletions lib/pigeon/apns_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,42 @@ defmodule Pigeon.APNSWorker do
@moduledoc """
Handles all APNS request and response parsing over an HTTP2 connection.
"""
use GenServer
use Pigeon.GenericH2Worker, ping_interval: 600_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rslota any chance you can make this ping_interval configurable? some networks (such as AWS) require activity at a higher frequency so the ping has to be lower

require Logger

@ping_period 600_000 # 10 minutes

defp apns_production_api_uri, do: "api.push.apple.com"
defp apns_development_api_uri, do: "api.development.push.apple.com"

def push_uri(mode) do
case mode do
:dev -> apns_development_api_uri()
:prod -> apns_production_api_uri()
def host(config) do
case config[:mode] do
:dev -> config[:development_endpoint]
:prod -> config[:production_endpoint]
end
end

def start_link(config) do
GenServer.start_link(__MODULE__, {:ok, config}, name: config[:name])
end

def stop, do: :gen_server.cast(self(), :stop)

def init({:ok, config}), do: initialize_worker(config)

def initialize_worker(config) do
mode = config[:mode]
case connect_socket(config, 0) do
{:ok, socket} ->
Process.send_after(self(), :ping, @ping_period)
{:ok, %{
apns_socket: socket,
mode: mode,
config: config,
stream_id: 1,
queue: %{}
}}
{:closed, _socket} ->
Logger.error """
Socket closed unexpectedly.
"""
{:stop, {:error, :bad_connection}}
{:error, :timeout} ->
Logger.error """
Failed to establish SSL connection. Is the certificate signed for :#{mode} mode?
"""
{:stop, {:error, :timeout}}
{:error, :invalid_config} ->
Logger.error """
Invalid configuration.
"""
{:stop, {:error, :invalid_config}}
def port(config) do
case config[:use_2197] do
true -> 2197
_ -> config[:port]
end
end

def connect_socket(_config, 3), do: {:error, :timeout}
def connect_socket(config, tries) do
uri = config[:mode] |> push_uri |> to_char_list
case connect_socket_options(config) do
{:ok, options} -> do_connect_socket(config, uri, options, tries)
error -> error
end
def encode_notification(notification) do
Pigeon.Notification.json_payload(notification.payload)
end

def connect_socket_options(config) do
def socket_options(config) do
cert = get_opt(config, :cert, :certfile)
key = get_opt(config, :key, :keyfile)
cond do
cert && key ->
options =
[cert,
key,
{:password, ''},
{:packet, 0},
{:reuseaddr, true},
{:active, true},
:binary]
|> optional_add_2197(config)
{:password, ''}]
{:ok, options}
true ->
{:error, :invalid_config}
end
end

defp optional_add_2197(options, config) do
case config[:use_2197] do
true -> options ++ [{:port, 2197}]
_ -> options
end
end

defp get_opt(config, key_1, key_2) do
cond do
config[key_1] -> {key_1, config[key_1]}
Expand All @@ -99,44 +46,11 @@ defmodule Pigeon.APNSWorker do
end
end

defp do_connect_socket(config, uri, options, tries) do
case Kadabra.open(uri, :https, options) do
{:ok, socket} -> {:ok, socket}
{:error, reason} ->
Logger.error(inspect(reason))
connect_socket(config, tries + 1)
end
end

def handle_cast(:stop, state), do: { :noreply, state }

def handle_cast({:push, :apns, notification}, state) do
send_push(state, notification, nil)
end

def handle_cast({:push, :apns, notification, on_response}, state) do
send_push(state, notification, on_response)
end

def handle_cast(msg, state) do
Logger.debug "Recv: #{inspect(msg)}"
{:noreply, state}
end

def send_push(state, notification, on_response) do
%{apns_socket: socket, stream_id: stream_id, queue: queue} = state
json = Pigeon.Notification.json_payload(notification.payload)
req_headers = [
{":method", "POST"},
{":path", "/3/device/#{notification.device_token}"},
{"content-length", "#{byte_size(json)}"}]
|> put_apns_id(notification)
|> put_apns_topic(notification)

Kadabra.request(socket, req_headers, json)
new_q = Map.put(queue, "#{stream_id}", {notification, on_response})
new_stream_id = stream_id + 2
{ :noreply, %{state | stream_id: new_stream_id, queue: new_q } }
def req_headers(_config, notification) do
[{"content-type", "application/json"},
{"accept", "application/json"}]
|> put_apns_id(notification)
|> put_apns_topic(notification)
end

defp put_apns_id(headers, notification) do
Expand All @@ -153,16 +67,24 @@ defmodule Pigeon.APNSWorker do
end
end

defp parse_error(data) do
{:ok, response} = Poison.decode(data)
def req_path(notification) do
"/3/device/#{notification.device_token}"
end

defp parse_error(_notification, _headers, body) do
{:ok, response} = Poison.decode(body)
response["reason"] |> Macro.underscore |> String.to_existing_atom
end

defp log_error(reason, notification) do
Logger.error("#{reason}: #{error_msg(reason)}\n#{inspect(notification)}")
defp parse_response(notification, headers, _body) do
case List.keyfind(headers, "apns-id", 1) do
nil -> notification
apns_id ->
%{notification | id: apns_id}
end
end

def error_msg(error) do
def error_msg(_code, error) do
case error do
# 400
:bad_collapse_id ->
Expand Down Expand Up @@ -216,7 +138,7 @@ defmodule Pigeon.APNSWorker do
:missing_provider_token ->
"""
No provider certificate was used to connect to APNs and Authorization header was missing
or no provider token was specified."
or no provider token was specified."
"""

# 404
Expand Down Expand Up @@ -257,51 +179,4 @@ defmodule Pigeon.APNSWorker do
""
end
end

def handle_info(:ping, state) do
Kadabra.ping(state.apns_socket)
Process.send_after(self(), :ping, @ping_period)

{ :noreply, state }
end

def handle_info({:end_stream, %Kadabra.Stream{id: stream_id, headers: headers, body: body}},
%{apns_socket: _socket, queue: queue} = state) do

{notification, on_response} = queue["#{stream_id}"]

case get_status(headers) do
"200" ->
notification = %{notification | id: get_apns_id(headers)}
unless on_response == nil do on_response.({:ok, notification}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
nil ->
{:noreply, state}
_error ->
reason = parse_error(body)
log_error(reason, notification)
unless on_response == nil do on_response.({:error, reason, notification}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
end
end

def handle_info({:ping, _from}, state), do: {:noreply, state}

def handle_info({:ok, _from}, state), do: {:noreply, state}

defp get_status(headers) do
case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do
{":status", status} -> status
nil -> nil
end
end

defp get_apns_id(headers) do
case Enum.find(headers, fn({key, _val}) -> key == "apns-id" end) do
{"apns-id", id} -> id
nil -> nil
end
end
end
Loading