From b7be74dc4d491254fc79d0fa925e7b1de27f874c Mon Sep 17 00:00:00 2001 From: Josh Bavari Date: Thu, 25 Jan 2018 11:12:50 -0700 Subject: [PATCH 1/3] Add in the ability to pass in a timestamp and append it to the line protocol. Taken from https://github.com/ebostijancic/fluxter --- lib/fluxter.ex | 12 +++++++----- lib/fluxter/conn.ex | 8 ++++---- lib/fluxter/packet.ex | 9 +++++++-- test/fluxter_test.exs | 4 ++++ 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/lib/fluxter.ex b/lib/fluxter.ex index af785c3..32282a5 100644 --- a/lib/fluxter.ex +++ b/lib/fluxter.ex @@ -320,19 +320,21 @@ defmodule Fluxter do end end - def write(measurement, tags \\ [], fields) + def write(measurement, tags \\ [], fields) do + write(measurement, tags, fields, nil) + end - def write(measurement, tags, fields) when is_list(fields) do + def write(measurement, tags, fields, timestamp) when is_list(fields) do System.unique_integer([:positive]) |> rem(@pool_size) |> worker_name() - |> Fluxter.Conn.write(measurement, tags, fields) + |> Fluxter.Conn.write(measurement, tags, fields, timestamp) end - def write(measurement, tags, value) + def write(measurement, tags, value, timestamp) when is_float(value) or is_integer(value) when is_boolean(value) or is_binary(value) do - write(measurement, tags, [value: value]) + write(measurement, tags, [value: value], timestamp) end def measure(measurement, tags \\ [], fields \\ [], fun) diff --git a/lib/fluxter/conn.ex b/lib/fluxter/conn.ex index 2cf1bfe..3687869 100644 --- a/lib/fluxter/conn.ex +++ b/lib/fluxter/conn.ex @@ -23,11 +23,11 @@ defmodule Fluxter.Conn do GenServer.start_link(__MODULE__, conn, [name: worker]) end - def write(worker, name, tags, fields) + def write(worker, name, tags, fields, timestamp) when (is_binary(name) or is_list(name)) and is_list(tags) and is_list(fields) do # TODO: Remove `try` wrapping when we depend on Elixir ~> 1.3 try do - GenServer.cast(worker, {:write, name, tags, fields}) + GenServer.cast(worker, {:write, name, tags, fields, timestamp}) catch _, _ -> :ok end @@ -38,8 +38,8 @@ defmodule Fluxter.Conn do {:ok, %{conn | sock: sock}} end - def handle_cast({:write, name, tags, fields}, conn) do - packet = Packet.build(conn.header, name, tags, fields) + def handle_cast({:write, name, tags, fields, timestamp}, conn) do + packet = Packet.build(conn.header, name, tags, fields, timestamp) send(conn.sock, {self(), {:command, packet}}) {:noreply, conn} end diff --git a/lib/fluxter/packet.ex b/lib/fluxter/packet.ex index e227cd5..760aab7 100644 --- a/lib/fluxter/packet.ex +++ b/lib/fluxter/packet.ex @@ -17,10 +17,15 @@ defmodule Fluxter.Packet do ] end - def build(header, name, tags, fields) do + def build(header, name, tags, fields, timestamp) do tags = encode_tags(tags) fields = encode_fields(fields) - [header, encode_key(name), tags, ?\s, fields] + + case is_nil(timestamp) do + true -> [header, encode_key(name), tags, ?\s, fields] + false -> [header, encode_key(name), tags, ?\s, fields, ?\s, + Integer.to_string(DateTime.to_unix(timestamp, :microseconds))] + end end defp encode_tags([]), do: "" diff --git a/test/fluxter_test.exs b/test/fluxter_test.exs index f8b796b..5002584 100644 --- a/test/fluxter_test.exs +++ b/test/fluxter_test.exs @@ -89,6 +89,10 @@ defmodule FluxterTest do TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0) assert_receive {:echo, "foo,bar=baz,qux=baz value=0i"} + TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0, + DateTime.from_unix!(1415521167028459, :microseconds)) + assert_receive {:echo, "foo,bar=baz,qux=baz value=0i 1415521167028459"} + refute_receive _any end From 1439badafcb84777ece82bb1c3e8dff950d9c890 Mon Sep 17 00:00:00 2001 From: Josh Bavari Date: Thu, 25 Jan 2018 14:38:21 -0700 Subject: [PATCH 2/3] Alter parameters to be specific about expected timestamp in milliseconds. Convert to nanoseconds --- lib/fluxter.ex | 8 ++++---- lib/fluxter/conn.ex | 8 ++++---- lib/fluxter/packet.ex | 14 ++++++++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/fluxter.ex b/lib/fluxter.ex index 32282a5..7b55808 100644 --- a/lib/fluxter.ex +++ b/lib/fluxter.ex @@ -324,17 +324,17 @@ defmodule Fluxter do write(measurement, tags, fields, nil) end - def write(measurement, tags, fields, timestamp) when is_list(fields) do + def write(measurement, tags, fields, timestamp_milli_secs) when is_list(fields) do System.unique_integer([:positive]) |> rem(@pool_size) |> worker_name() - |> Fluxter.Conn.write(measurement, tags, fields, timestamp) + |> Fluxter.Conn.write(measurement, tags, fields, timestamp_milli_secs) end - def write(measurement, tags, value, timestamp) + def write(measurement, tags, value, timestamp_milli_secs) when is_float(value) or is_integer(value) when is_boolean(value) or is_binary(value) do - write(measurement, tags, [value: value], timestamp) + write(measurement, tags, [value: value], timestamp_milli_secs) end def measure(measurement, tags \\ [], fields \\ [], fun) diff --git a/lib/fluxter/conn.ex b/lib/fluxter/conn.ex index 3687869..02a8be0 100644 --- a/lib/fluxter/conn.ex +++ b/lib/fluxter/conn.ex @@ -23,11 +23,11 @@ defmodule Fluxter.Conn do GenServer.start_link(__MODULE__, conn, [name: worker]) end - def write(worker, name, tags, fields, timestamp) + def write(worker, name, tags, fields, timestamp_milli_secs) when (is_binary(name) or is_list(name)) and is_list(tags) and is_list(fields) do # TODO: Remove `try` wrapping when we depend on Elixir ~> 1.3 try do - GenServer.cast(worker, {:write, name, tags, fields, timestamp}) + GenServer.cast(worker, {:write, name, tags, fields, timestamp_milli_secs}) catch _, _ -> :ok end @@ -38,8 +38,8 @@ defmodule Fluxter.Conn do {:ok, %{conn | sock: sock}} end - def handle_cast({:write, name, tags, fields, timestamp}, conn) do - packet = Packet.build(conn.header, name, tags, fields, timestamp) + def handle_cast({:write, name, tags, fields, timestamp_milli_secs}, conn) do + packet = Packet.build(conn.header, name, tags, fields, timestamp_milli_secs) send(conn.sock, {self(), {:command, packet}}) {:noreply, conn} end diff --git a/lib/fluxter/packet.ex b/lib/fluxter/packet.ex index 760aab7..79db93c 100644 --- a/lib/fluxter/packet.ex +++ b/lib/fluxter/packet.ex @@ -17,14 +17,20 @@ defmodule Fluxter.Packet do ] end - def build(header, name, tags, fields, timestamp) do + def build(header, name, tags, fields, unix_timestamp_ms) do tags = encode_tags(tags) fields = encode_fields(fields) - case is_nil(timestamp) do + case is_nil(unix_timestamp_ms) do true -> [header, encode_key(name), tags, ?\s, fields] - false -> [header, encode_key(name), tags, ?\s, fields, ?\s, - Integer.to_string(DateTime.to_unix(timestamp, :microseconds))] + false -> + # Convert time to nanoseconds, which is the precision influxdb uses + unix_timestamp_nano_secs = unix_timestamp_ms + |> Kernel.*(1_000_000) + |> Integer.to_string() + + [header, encode_key(name), tags, ?\s, fields, ?\s, + unix_timestamp_nano_secs] end end From 7b9c542569213f5e6d17b5e736e98c9f6c07e671 Mon Sep 17 00:00:00 2001 From: Josh Bavari Date: Thu, 25 Jan 2018 14:38:37 -0700 Subject: [PATCH 3/3] Fix test to pass in milliseconds and expect nanoseconds sent out --- test/fluxter_test.exs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/test/fluxter_test.exs b/test/fluxter_test.exs index 5002584..dfd0419 100644 --- a/test/fluxter_test.exs +++ b/test/fluxter_test.exs @@ -89,9 +89,16 @@ defmodule FluxterTest do TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0) assert_receive {:echo, "foo,bar=baz,qux=baz value=0i"} - TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0, - DateTime.from_unix!(1415521167028459, :microseconds)) - assert_receive {:echo, "foo,bar=baz,qux=baz value=0i 1415521167028459"} + + timestamp_milli_secs = 1415521167028459 + TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0, timestamp_milli_secs) + + timestamp_nanoseconds = (timestamp_milli_secs * 1_000_000) + |> Integer.to_string() + + expected_line_msg = "foo,bar=baz,qux=baz value=0i #{timestamp_nanoseconds}" + + assert_receive {:echo, ^expected_line_msg} refute_receive _any end