Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/add_influx_timestamp #24

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions lib/fluxter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -320,19 +320,21 @@ defmodule Fluxter do
end
end

def write(measurement, tags \\ [], fields)
def write(measurement, tags \\ [], fields) do
write(measurement, tags, fields, nil)
end

def write(measurement, tags, fields) when is_list(fields) do
def write(measurement, tags, fields, timestamp_milli_secs) when is_list(fields) do
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a braking change. Why not add default value for timestamp_milli_secs?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hauleth is right—we need to have a default value, otherwise all existing code will be broken.

System.unique_integer([:positive])
|> rem(@pool_size)
|> worker_name()
|> Fluxter.Conn.write(measurement, tags, fields)
|> Fluxter.Conn.write(measurement, tags, fields, timestamp_milli_secs)
end

def write(measurement, tags, value)
def write(measurement, tags, value, timestamp_milli_secs)
when is_float(value) or is_integer(value)
when is_boolean(value) or is_binary(value) do
write(measurement, tags, [value: value])
write(measurement, tags, [value: value], timestamp_milli_secs)
end

def measure(measurement, tags \\ [], fields \\ [], fun)
Expand Down
8 changes: 4 additions & 4 deletions lib/fluxter/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ defmodule Fluxter.Conn do
GenServer.start_link(__MODULE__, conn, [name: worker])
end

def write(worker, name, tags, fields)
def write(worker, name, tags, fields, timestamp_milli_secs)
when (is_binary(name) or is_list(name)) and is_list(tags) and is_list(fields) do
# TODO: Remove `try` wrapping when we depend on Elixir ~> 1.3
try do
GenServer.cast(worker, {:write, name, tags, fields})
GenServer.cast(worker, {:write, name, tags, fields, timestamp_milli_secs})
catch
_, _ -> :ok
end
Expand All @@ -38,8 +38,8 @@ defmodule Fluxter.Conn do
{:ok, %{conn | sock: sock}}
end

def handle_cast({:write, name, tags, fields}, conn) do
packet = Packet.build(conn.header, name, tags, fields)
def handle_cast({:write, name, tags, fields, timestamp_milli_secs}, conn) do
packet = Packet.build(conn.header, name, tags, fields, timestamp_milli_secs)
send(conn.sock, {self(), {:command, packet}})
{:noreply, conn}
end
Expand Down
15 changes: 13 additions & 2 deletions lib/fluxter/packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,21 @@ defmodule Fluxter.Packet do
]
end

def build(header, name, tags, fields) do
def build(header, name, tags, fields, unix_timestamp_ms) do
tags = encode_tags(tags)
fields = encode_fields(fields)
[header, encode_key(name), tags, ?\s, fields]

case is_nil(unix_timestamp_ms) do
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use if?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if is a macro that just compiles down to a case statement, just went with the case instead.

Copy link
Owner

@lexmag lexmag Feb 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if would be more expressive in this case.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbavari then why not use if if this will be the same after compilation? If you would use Credo it would warn you that this is bad practice.

true -> [header, encode_key(name), tags, ?\s, fields]
false ->
# Convert time to nanoseconds, which is the precision influxdb uses
unix_timestamp_nano_secs = unix_timestamp_ms
|> Kernel.*(1_000_000)
|> Integer.to_string()

[header, encode_key(name), tags, ?\s, fields, ?\s,
unix_timestamp_nano_secs]
end
end

defp encode_tags([]), do: ""
Expand Down
11 changes: 11 additions & 0 deletions test/fluxter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ defmodule FluxterTest do
TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0)
assert_receive {:echo, "foo,bar=baz,qux=baz value=0i"}


timestamp_milli_secs = 1415521167028459
TestFluxter.write("foo", [bar: "baz", qux: "baz"], 0, timestamp_milli_secs)

timestamp_nanoseconds = (timestamp_milli_secs * 1_000_000)
|> Integer.to_string()

expected_line_msg = "foo,bar=baz,qux=baz value=0i #{timestamp_nanoseconds}"

assert_receive {:echo, ^expected_line_msg}

refute_receive _any
end

Expand Down