diff --git a/server/lib/realtime/replication.ex b/server/lib/realtime/replication.ex index 6f96cfce8..ce3796e71 100644 --- a/server/lib/realtime/replication.ex +++ b/server/lib/realtime/replication.ex @@ -80,7 +80,11 @@ defmodule Realtime.Replication do # Feel free to delete after testing Logger.debug("Final Update of Columns " <> inspect(state.relations, limit: :infinity)) - notify_subscribers(%{state | transaction: {current_txn_lsn, %{txn | changes: Enum.reverse(changes)}}}) + notify_subscribers(%{ + state + | transaction: {current_txn_lsn, %{txn | changes: Enum.reverse(changes)}} + }) + :ok = adapter_impl(state.config).acknowledge_lsn(state.connection, end_lsn) %{state | transaction: nil} @@ -241,13 +245,52 @@ defmodule Realtime.Replication do state end - # TODO: Typecast to meaningful Elixir types here later - defp data_tuple_to_map(_columns, nil), do: %{} + defp data_tuple_to_map(columns, tuple_data) when is_list(columns) and is_tuple(tuple_data) do + columns + |> Enum.with_index() + |> Enum.reduce_while(%{}, fn {column_map, index}, acc -> + case column_map do + %Relation.Column{name: column_name, type: column_type} + when is_binary(column_name) and is_binary(column_type) -> + try do + {:ok, Kernel.elem(tuple_data, index)} + rescue + ArgumentError -> :error + end + |> case do + {:ok, record} -> + {:cont, Map.put(acc, column_name, convert_column_record(record, column_type))} + + :error -> + {:halt, acc} + end + + _ -> + {:cont, acc} + end + end) + end + + defp data_tuple_to_map(_columns, _tuple_data), do: %{} + + defp convert_column_record(record, "timestamp") when is_binary(record) do + with {:ok, %NaiveDateTime{} = naive_date_time} <- Timex.parse(record, "{RFC3339}"), + %DateTime{} = date_time <- Timex.to_datetime(naive_date_time) do + DateTime.to_iso8601(date_time) + else + _ -> record + end + end + + defp convert_column_record(record, "timestamptz") when is_binary(record) do + case Timex.parse(record, "{RFC3339}") do + {:ok, %DateTime{} = date_time} -> DateTime.to_iso8601(date_time) + _ -> record + end + end - defp data_tuple_to_map(columns, tuple_data) do - for {column, index} <- Enum.with_index(columns, 1), - do: {column.name, :erlang.element(index, tuple_data)}, - into: %{} + defp convert_column_record(record, _column_type) do + record end defp adapter_impl(config) do diff --git a/server/mix.exs b/server/mix.exs index aa1eedb9c..5036c7ccd 100644 --- a/server/mix.exs +++ b/server/mix.exs @@ -35,16 +35,17 @@ defmodule Realtime.MixProject do # Type `mix help deps` for examples and options. defp deps do [ - {:phoenix, "~> 1.4.10"}, - {:phoenix_pubsub, "~> 1.1"}, - {:phoenix_html, "~> 2.11"}, - {:phoenix_live_reload, "~> 1.2", only: :dev}, - {:gettext, "~> 0.11"}, - {:httpoison, "~> 1.6"}, + {:phoenix, "~> 1.5"}, + {:phoenix_pubsub, "~> 2.0"}, + {:phoenix_html, "~> 2.14"}, + {:phoenix_live_reload, "~> 1.3", only: :dev}, + {:gettext, "~> 0.18"}, + {:httpoison, "~> 1.8"}, {:jason, "~> 1.2.2"}, {:joken, "~> 2.3.0"}, - {:plug_cowboy, "~> 2.0"}, - {:epgsql, "~> 4.2"}, + {:plug_cowboy, "~> 2.4"}, + {:epgsql, "~> 4.5"}, + {:timex, "~> 3.0"}, {:mock, "~> 0.3.0", only: :test} ] end diff --git a/server/mix.lock b/server/mix.lock index 116bf12ce..c895d56d2 100644 --- a/server/mix.lock +++ b/server/mix.lock @@ -1,31 +1,35 @@ %{ - "certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"}, - "cowboy": {:hex, :cowboy, "2.6.3", "99aa50e94e685557cad82e704457336a453d4abcb77839ad22dbe71f311fcc06", [:rebar3], [{:cowlib, "~> 2.7.3", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "e5580029080f3f1ad17436fb97b0d5ed2ed4e4815a96bac36b5a992e20f58db6"}, - "cowlib": {:hex, :cowlib, "2.7.3", "a7ffcd0917e6d50b4d5fb28e9e2085a0ceb3c97dea310505f7460ff5ed764ce9", [:rebar3], [], "hexpm", "1e1a3d176d52daebbecbbcdfd27c27726076567905c2a9d7398c54da9d225761"}, - "epgsql": {:hex, :epgsql, "4.3.0", "26d9cf04d74773d1dc4da24ad39e926b34e107232591fe1866efdfbc0a098396", [:rebar3], [], "hexpm", "37961b8550eed7474423eba7961355c503e85dcb9ffa55fbb79ad9ba2b20af03"}, - "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm", "b4cfa2d69c7f0b18fd06db222b2398abeef743a72504e6bd7df9c52f171b047f"}, - "gettext": {:hex, :gettext, "0.17.0", "abe21542c831887a2b16f4c94556db9c421ab301aee417b7c4fbde7fbdbe01ec", [:mix], [], "hexpm", "e0b8598e802676c81e66b061a2148c37c03886b24a3ca86a1f98ed40693b94b3"}, - "hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"}, - "httpoison": {:hex, :httpoison, "1.6.2", "ace7c8d3a361cebccbed19c283c349b3d26991eff73a1eaaa8abae2e3c8089b6", [:mix], [{:hackney, "~> 1.15 and >= 1.15.2", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "aa2c74bd271af34239a3948779612f87df2422c2fdcfdbcec28d9c105f0773fe"}, - "idna": {:hex, :idna, "6.0.1", "1d038fb2e7668ce41fbf681d2c45902e52b3cb9e9c77b55334353b222c2ee50c", [:rebar3], [{:unicode_util_compat, "0.5.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a02c8a1c4fd601215bb0b0324c8a6986749f807ce35f25449ec9e69758708122"}, + "certifi": {:hex, :certifi, "2.5.3", "70bdd7e7188c804f3a30ee0e7c99655bc35d8ac41c23e12325f36ab449b70651", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "ed516acb3929b101208a9d700062d520f3953da3b6b918d866106ffa980e1c10"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, + "cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, + "cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"}, + "epgsql": {:hex, :epgsql, "4.5.0", "ca863ee3a771e7696ae58ec924a29df8435cdaffa64dba70c02dd2571ad2122d", [:rebar3], [], "hexpm", "0a02d338cc1426c5873b412fed9d694f7b5143933c5f85f244655a5e77b23078"}, + "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "gettext": {:hex, :gettext, "0.18.2", "7df3ea191bb56c0309c00a783334b288d08a879f53a7014341284635850a6e55", [:mix], [], "hexpm", "f9f537b13d4fdd30f3039d33cb80144c3aa1f8d9698e47d7bcbcc8df93b1f5c5"}, + "hackney": {:hex, :hackney, "1.17.0", "717ea195fd2f898d9fe9f1ce0afcc2621a41ecfe137fae57e7fe6e9484b9aa99", [:rebar3], [{:certifi, "~>2.5", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "64c22225f1ea8855f584720c0e5b3cd14095703af1c9fbc845ba042811dc671c"}, + "httpoison": {:hex, :httpoison, "1.8.0", "6b85dea15820b7804ef607ff78406ab449dd78bed923a49c7160e1886e987a3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "28089eaa98cf90c66265b6b5ad87c59a3729bea2e74e9d08f9b51eb9729b3c3a"}, + "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "joken": {:hex, :joken, "2.3.0", "62a979c46f2c81dcb8ddc9150453b60d3757d1ac393c72bb20fc50a7b0827dc6", [:mix], [{:jose, "~> 1.10", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "57b263a79c0ec5d536ac02d569c01e6b4de91bd1cb825625fe90eab4feb7bc1e"}, "jose": {:hex, :jose, "1.11.1", "59da64010c69aad6cde2f5b9248b896b84472e99bd18f246085b7b9fe435dcdb", [:mix, :rebar3], [], "hexpm", "078f6c9fb3cd2f4cfafc972c814261a7d1e8d2b3685c0a76eb87e158efff1ac5"}, "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"}, + "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mock": {:hex, :mock, "0.3.6", "e810a91fabc7adf63ab5fdbec5d9d3b492413b8cda5131a2a8aa34b4185eb9b4", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "bcf1d0a6826fb5aee01bae3d74474669a3fa8b2df274d094af54a25266a1ebd2"}, - "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, - "phoenix": {:hex, :phoenix, "1.4.10", "619e4a545505f562cd294df52294372d012823f4fd9d34a6657a8b242898c255", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "256ad7a140efadc3f0290470369da5bd3de985ec7c706eba07c2641b228974be"}, - "phoenix_html": {:hex, :phoenix_html, "2.13.3", "850e292ff6e204257f5f9c4c54a8cb1f6fbc16ed53d360c2b780a3d0ba333867", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "8b01b3d6d39731ab18aa548d928b5796166d2500755f553725cfe967bafba7d9"}, - "phoenix_live_reload": {:hex, :phoenix_live_reload, "1.2.1", "274a4b07c4adbdd7785d45a8b0bb57634d0b4f45b18d2c508b26c0344bd59b8f", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "41b4103a2fa282cfd747d377233baf213c648fdcc7928f432937676532490eee"}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm", "1f13f9f0f3e769a667a6b6828d29dec37497a082d195cc52dbef401a9b69bf38"}, - "plug": {:hex, :plug, "1.8.3", "12d5f9796dc72e8ac9614e94bda5e51c4c028d0d428e9297650d09e15a684478", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "164baaeb382d19beee0ec484492aa82a9c8685770aee33b24ec727a0971b34d0"}, - "plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6cd8ddd1bd1fbfa54d3fc61d4719c2057dae67615395d58d40437a919a46f132"}, - "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm", "73c1682f0e414cfb5d9b95c8e8cd6ffcfdae699e3b05e1db744e58b7be857759"}, + "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, + "phoenix": {:hex, :phoenix, "1.5.7", "2923bb3af924f184459fe4fa4b100bd25fa6468e69b2803dfae82698269aa5e0", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "774cd64417c5a3788414fdbb2be2eb9bcd0c048d9e6ad11a0c1fd67b7c0d0978"}, + "phoenix_html": {:hex, :phoenix_html, "2.14.3", "51f720d0d543e4e157ff06b65de38e13303d5778a7919bcc696599e5934271b8", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "efd697a7fff35a13eeeb6b43db884705cba353a1a41d127d118fda5f90c8e80f"}, + "phoenix_live_reload": {:hex, :phoenix_live_reload, "1.3.0", "f35f61c3f959c9a01b36defaa1f0624edd55b87e236b606664a556d6f72fd2e7", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "02c1007ae393f2b76ec61c1a869b1e617179877984678babde131d716f95b582"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"}, + "plug": {:hex, :plug, "1.11.0", "f17217525597628298998bc3baed9f8ea1fa3f1160aa9871aee6df47a6e4d38e", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2d9c633f0499f9dc5c2fd069161af4e2e7756890b81adcbb2ceaa074e8308876"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.4.1", "779ba386c0915027f22e14a48919a9545714f849505fa15af2631a0d298abf0f", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d72113b6dff7b37a7d9b2a5b68892808e3a9a752f2bf7e503240945385b70507"}, + "plug_crypto": {:hex, :plug_crypto, "1.2.0", "1cb20793aa63a6c619dd18bb33d7a3aa94818e5fd39ad357051a67f26dfa2df6", [:mix], [], "hexpm", "a48b538ae8bf381ffac344520755f3007cc10bd8e90b240af98ea29b69683fc2"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"}, + "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"}, + "timex": {:hex, :timex, "3.6.3", "58ce6c9eda8ed47fc80c24dde09d481465838d3bcfc230949287fc1b0b0041c1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "6d69f4f95fcf5684102a9cb3cf92c5ba6545bd60ed8d8a6a93cd2a4a4fb0d9ec"}, + "tzdata": {:hex, :tzdata, "1.0.5", "69f1ee029a49afa04ad77801febaf69385f3d3e3d1e4b56b9469025677b89a28", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "55519aa2a99e5d2095c1e61cc74c9be69688f8ab75c27da724eb8279ff402a5a"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/server/test/realtime/replication_test.exs b/server/test/realtime/replication_test.exs index 0f02948fa..ad0f4de2d 100644 --- a/server/test/realtime/replication_test.exs +++ b/server/test/realtime/replication_test.exs @@ -1,10 +1,82 @@ defmodule Realtime.ReplicationTest do use ExUnit.Case - doctest Realtime.Replication, import: true + alias Realtime.Replication + + doctest Replication, import: true + + setup do + test_state = %Replication.State{ + config: [], + connection: "pid", + relations: %{ + 26725 => %Realtime.Decoder.Messages.Relation{ + columns: [ + %Realtime.Decoder.Messages.Relation.Column{ + flags: [:key], + name: "id", + type: "int8", + type_modifier: 4_294_967_295 + }, + %Realtime.Decoder.Messages.Relation.Column{ + flags: [], + name: "details", + type: "text", + type_modifier: 4_294_967_295 + }, + %Realtime.Decoder.Messages.Relation.Column{ + flags: [], + name: "user_id", + type: "int8", + type_modifier: 4_294_967_295 + }, + %Realtime.Decoder.Messages.Relation.Column{ + flags: [], + name: "inserted_at_with_time_zone", + type: "timestamptz", + type_modifier: 4_294_967_295 + }, + %Realtime.Decoder.Messages.Relation.Column{ + flags: [], + name: "inserted_at_without_time_zone", + type: "timestamp", + type_modifier: 4_294_967_295 + } + ], + id: 26725, + name: "todos", + namespace: "public", + replica_identity: :default + } + }, + subscribers: [], + transaction: + {{0, 688_510_024}, + %Realtime.Adapters.Changes.Transaction{ + changes: [], + commit_timestamp: %DateTime{ + calendar: Calendar.ISO, + day: 16, + hour: 23, + microsecond: {518_844, 0}, + minute: 47, + month: 2, + second: 47, + std_offset: 0, + time_zone: "Etc/UTC", + utc_offset: 0, + year: 2021, + zone_abbr: "UTC" + } + }}, + types: %{} + } + + {:ok, test_state: test_state} + end test "Integration Test: 0.2.0" do - assert Realtime.Replication.handle_info( + assert Replication.handle_info( {:epgsql, 0, {:x_log_data, 0, 0, <<82, 0, 0, 64, 2, 112, 117, 98, 108, 105, 99, 0, 117, 115, 101, 114, 115, 0, 100, @@ -14,10 +86,10 @@ defmodule Realtime.ReplicationTest do 111, 0, 0, 0, 14, 218, 255, 255, 255, 255, 0, 105, 110, 115, 101, 114, 116, 101, 100, 95, 97, 116, 0, 0, 0, 4, 90, 255, 255, 255, 255, 0, 117, 112, 100, 97, 116, 101, 100, 95, 97, 116, 0, 0, 0, 4, 90, 255, 255, 255, 255>>}}, - %Realtime.Replication.State{} + %Replication.State{} ) == {:noreply, - %Realtime.Replication.State{ + %Replication.State{ config: [], connection: nil, relations: %{ @@ -71,4 +143,104 @@ defmodule Realtime.ReplicationTest do types: %{} }} end + + test "insert record with data type conversion", %{test_state: test_state} do + {:noreply, + %Replication.State{ + transaction: {_lsn, %{changes: [%Realtime.Adapters.Changes.NewRecord{record: record}]}} + }} = + Replication.handle_info( + {:epgsql, "pid", + {:x_log_data, 0, 0, + <<73, 0, 0, 104, 101, 78, 0, 5, 116, 0, 0, 0, 1, 51, 116, 0, 0, 0, 17, 83, 117, 112, 97, + 98, 97, 115, 101, 32, 105, 115, 32, 103, 111, 111, 100, 33, 116, 0, 0, 0, 1, 49, 116, + 0, 0, 0, 28, 50, 48, 50, 49, 45, 48, 50, 45, 49, 54, 32, 50, 51, 58, 52, 55, 58, 52, + 55, 46, 53, 49, 54, 49, 51, 43, 48, 48, 116, 0, 0, 0, 25, 50, 48, 50, 49, 45, 48, 50, + 45, 49, 54, 32, 50, 51, 58, 52, 55, 58, 52, 55, 46, 53, 49, 54, 49, 51>>}}, + test_state + ) + + assert record == %{ + "details" => "Supabase is good!", + "id" => "3", + "inserted_at_with_time_zone" => "2021-02-16T23:47:47.51613Z", + "inserted_at_without_time_zone" => "2021-02-16T23:47:47.51613Z", + "user_id" => "1" + } + end + + test "update record with data type conversion", %{test_state: test_state} do + {:noreply, + %Replication.State{ + transaction: + {_lsn, + %{ + changes: [ + %Realtime.Adapters.Changes.UpdatedRecord{old_record: old_record, record: record} + ] + }} + }} = + Replication.handle_info( + {:epgsql, "pid", + {:x_log_data, 0, 0, + <<85, 0, 0, 104, 101, 79, 0, 5, 116, 0, 0, 0, 1, 51, 116, 0, 0, 0, 17, 83, 117, 112, 97, + 98, 97, 115, 101, 32, 105, 115, 32, 103, 111, 111, 100, 33, 116, 0, 0, 0, 1, 49, 116, + 0, 0, 0, 28, 50, 48, 50, 49, 45, 48, 50, 45, 49, 54, 32, 50, 51, 58, 52, 55, 58, 52, + 55, 46, 53, 49, 54, 49, 51, 43, 48, 48, 116, 0, 0, 0, 25, 50, 48, 50, 49, 45, 48, 50, + 45, 49, 54, 32, 50, 51, 58, 52, 55, 58, 52, 55, 46, 53, 49, 54, 49, 51, 78, 0, 5, 116, + 0, 0, 0, 1, 51, 116, 0, 0, 0, 22, 78, 111, 44, 32, 83, 117, 112, 97, 98, 97, 115, 101, + 32, 105, 115, 32, 103, 114, 101, 97, 116, 33, 116, 0, 0, 0, 1, 49, 116, 0, 0, 0, 28, + 50, 48, 50, 49, 45, 48, 50, 45, 49, 54, 32, 50, 51, 58, 52, 55, 58, 52, 55, 46, 53, + 49, 54, 49, 51, 43, 48, 48, 116, 0, 0, 0, 25, 50, 48, 50, 49, 45, 48, 50, 45, 49, 54, + 32, 50, 51, 58, 52, 55, 58, 52, 55, 46, 53, 49, 54, 49, 51>>}}, + test_state + ) + + assert old_record == %{ + "details" => "Supabase is good!", + "id" => "3", + "inserted_at_with_time_zone" => "2021-02-16T23:47:47.51613Z", + "inserted_at_without_time_zone" => "2021-02-16T23:47:47.51613Z", + "user_id" => "1" + } + + assert record == %{ + "details" => "No, Supabase is great!", + "id" => "3", + "inserted_at_with_time_zone" => "2021-02-16T23:47:47.51613Z", + "inserted_at_without_time_zone" => "2021-02-16T23:47:47.51613Z", + "user_id" => "1" + } + end + + test "delete record with data type conversion", %{test_state: test_state} do + {:noreply, + %Replication.State{ + transaction: + {_lsn, + %{ + changes: [ + %Realtime.Adapters.Changes.DeletedRecord{old_record: old_record} + ] + }} + }} = + Replication.handle_info( + {:epgsql, "pid", + {:x_log_data, 0, 0, + <<68, 0, 0, 104, 101, 79, 0, 5, 116, 0, 0, 0, 1, 52, 116, 0, 0, 0, 13, 83, 101, 101, 32, + 121, 97, 32, 108, 97, 116, 101, 114, 33, 116, 0, 0, 0, 1, 49, 116, 0, 0, 0, 29, 50, + 48, 50, 49, 45, 48, 50, 45, 49, 55, 32, 48, 49, 58, 48, 48, 58, 53, 54, 46, 50, 49, + 54, 54, 53, 52, 43, 48, 48, 116, 0, 0, 0, 26, 50, 48, 50, 49, 45, 48, 50, 45, 49, 55, + 32, 48, 49, 58, 48, 48, 58, 53, 54, 46, 50, 49, 54, 54, 53, 52>>}}, + test_state + ) + + assert old_record == %{ + "details" => "See ya later!", + "id" => "4", + "inserted_at_with_time_zone" => "2021-02-17T01:00:56.216654Z", + "inserted_at_without_time_zone" => "2021-02-17T01:00:56.216654Z", + "user_id" => "1" + } + end end