Skip to content

Commit

Permalink
Replace loggers by telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Oct 3, 2018
1 parent 69954b7 commit ee8a9ea
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 186 deletions.
4 changes: 2 additions & 2 deletions bench/support/repo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ Application.put_env(
)

defmodule Ecto.Bench.PgRepo do
use Ecto.Repo, otp_app: :ecto, adapter: Ecto.Adapters.Postgres, loggers: []
use Ecto.Repo, otp_app: :ecto, adapter: Ecto.Adapters.Postgres, log: false
end

defmodule Ecto.Bench.MySQLRepo do
use Ecto.Repo, otp_app: :ecto, adapter: Ecto.Adapters.MySQL, loggers: []
use Ecto.Repo, otp_app: :ecto, adapter: Ecto.Adapters.MySQL, log: false
end
15 changes: 7 additions & 8 deletions integration_test/cases/repo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,22 +1275,21 @@ defmodule Ecto.Integration.RepoTest do
## Logging

test "log entry logged on query" do
log = fn entry ->
log = fn latency, entry ->
assert %Ecto.LogEntry{result: {:ok, _}} = entry
assert is_integer(entry.query_time) and entry.query_time >= 0
assert is_integer(entry.decode_time) and entry.query_time >= 0
assert is_integer(entry.queue_time) and entry.queue_time >= 0
assert latency == entry.query_time + entry.decode_time + entry.queue_time
send(self(), :logged)
end
Process.put(:on_log, log)

Process.put(:telemetry, log)
_ = TestRepo.all(Post)
assert_received :logged
end

test "log entry not logged when log is false" do
Process.put(:on_log, fn _ -> flunk("logged") end)
TestRepo.insert!(%Post{title: "1"}, [log: false])
test "log entry with custom log level" do
assert ExUnit.CaptureLog.capture_log(fn ->
TestRepo.insert!(%Post{title: "1"}, [log: :error])
end) != ""
end

describe "upsert via insert" do
Expand Down
68 changes: 8 additions & 60 deletions integration_test/sql/transaction.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Ecto.Integration.TransactionTest do
use Ecto.Integration.Case, async: true

import Ecto.Query
import ExUnit.CaptureLog
alias Ecto.Integration.PoolRepo # Used for writes
alias Ecto.Integration.TestRepo # Used for reads

Expand Down Expand Up @@ -205,15 +204,19 @@ defmodule Ecto.Integration.TransactionTest do

## Logging

defp register_telemetry() do
Process.put(:telemetry, fn _, event -> send(self(), event) end)
end

test "log begin, commit and rollback" do
Process.put(:on_log, &send(self(), &1))
register_telemetry()
PoolRepo.transaction(fn ->
assert_received %Ecto.LogEntry{params: [], result: {:ok, _}} = entry
assert is_integer(entry.query_time) and entry.query_time >= 0
assert is_integer(entry.queue_time) and entry.queue_time >= 0

refute_received %Ecto.LogEntry{}
Process.put(:on_log, &send(self(), &1))
register_telemetry()
end)

assert_received %Ecto.LogEntry{params: [], result: {:ok, _}} = entry
Expand All @@ -222,7 +225,7 @@ defmodule Ecto.Integration.TransactionTest do

assert PoolRepo.transaction(fn ->
refute_received %Ecto.LogEntry{}
Process.put(:on_log, &send(self(), &1))
register_telemetry()
PoolRepo.rollback(:log_rollback)
end) == {:error, :log_rollback}
assert_received %Ecto.LogEntry{params: [], result: {:ok, _}} = entry
Expand All @@ -232,7 +235,7 @@ defmodule Ecto.Integration.TransactionTest do

test "log queries inside transactions" do
PoolRepo.transaction(fn ->
Process.put(:on_log, &send(self(), &1))
register_telemetry()
assert [] = PoolRepo.all(Trans)

assert_received %Ecto.LogEntry{params: [], result: {:ok, _}} = entry
Expand All @@ -241,59 +244,4 @@ defmodule Ecto.Integration.TransactionTest do
assert is_nil(entry.queue_time)
end)
end

@tag :strict_savepoint
test "log raises after begin, drops transaction" do
try do
Process.put(:on_log, fn _ -> raise UniqueError end)
PoolRepo.transaction(fn -> :ok end)
rescue
UniqueError -> :ok
end

# If it doesn't fail, the transaction was not closed properly.
catch_error(PoolRepo.query!("savepoint foobar"))
end

test "log raises after commit, does commit" do
try do
PoolRepo.transaction(fn ->
PoolRepo.insert!(%Trans{text: "10"})
Process.put(:on_log, fn _ -> raise UniqueError end)
end)
rescue
UniqueError -> :ok
end

assert [%Trans{text: "10"}] = PoolRepo.all(Trans)
end

test "log raises after rollback, does rollback" do
try do
PoolRepo.transaction(fn ->
PoolRepo.insert!(%Trans{text: "11"})
Process.put(:on_log, fn _ -> raise UniqueError end)
PoolRepo.rollback(:rollback)
end)
rescue
UniqueError -> :ok
end

assert [] = PoolRepo.all(Trans)
end

test "log raises on nested transaction, does not drop the transaction" do
PoolRepo.transaction(fn ->
PoolRepo.insert!(%Trans{text: "8"})
Process.put(:on_log, fn _ -> raise UniqueError end)

assert capture_log(fn ->
PoolRepo.transaction(fn ->
assert [_] = PoolRepo.all(Trans)
end)
end) =~ "** (Ecto.Integration.TransactionTest.UniqueError) unique error"
end)

assert [_] = PoolRepo.all(Trans)
end
end
16 changes: 10 additions & 6 deletions integration_test/support/repo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ defmodule Ecto.Integration.Repo do
quote do
use Ecto.Repo, unquote(opts)

@query_event __MODULE__
|> Module.split()
|> Enum.map(& &1 |> Macro.underscore() |> String.to_atom())
|> Kernel.++([:query])

def init(_, opts) do
loggers = [Ecto.LogEntry, {Ecto.Integration.Repo, :log, [:on_log]}]
{:ok, Keyword.put(opts, :loggers, loggers)}
Telemetry.attach(__MODULE__, @query_event, Ecto.Integration.Repo, :handle_event)
{:ok, opts}
end
end
end

def log(entry, key) do
on_log = Process.delete(key) || fn _ -> :ok end
on_log.(entry)
entry
def handle_event(_event, latency, metadata, _config) do
handler = Process.delete(:telemetry) || fn _, _ -> :ok end
handler.(latency, metadata)
end
end
41 changes: 24 additions & 17 deletions lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ defmodule Ecto.Adapters.SQL do
end

defp sql_call(adapter_meta, callback, args, params, opts) do
%{pid: pool, loggers: loggers, sql: sql, opts: default_opts} = adapter_meta
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
conn = get_conn_or_pool(pool)
opts = with_log(loggers, params, opts ++ default_opts)
opts = with_log(telemetry, params, opts ++ default_opts)
args = args ++ [params, opts]
apply(sql, callback, [conn | args])
end
Expand Down Expand Up @@ -437,10 +437,13 @@ defmodule Ecto.Adapters.SQL do
"""
end

loggers = Keyword.fetch!(config, :loggers)
log = Keyword.get(config, :log, :debug)
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {log, telemetry_prefix ++ [:query]}

config = adapter_config(config)
opts = Keyword.take(config, [:timeout, :pool, :pool_size, :pool_timeout])
meta = %{loggers: loggers, sql: connection, opts: opts}
meta = %{telemetry: telemetry, sql: connection, opts: opts}
{:ok, connection.child_spec(config), meta}
end

Expand Down Expand Up @@ -581,8 +584,8 @@ defmodule Ecto.Adapters.SQL do

@doc false
def reduce(adapter_meta, statement, params, opts, acc, fun) do
%{pid: pool, loggers: loggers, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(loggers, params, opts ++ default_opts)
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)

case get_conn(pool) do
nil ->
Expand All @@ -597,8 +600,8 @@ defmodule Ecto.Adapters.SQL do

@doc false
def into(adapter_meta, statement, params, opts) do
%{pid: pool, loggers: loggers, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(loggers, params, opts ++ default_opts)
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)

case get_conn(pool) do
nil ->
Expand Down Expand Up @@ -712,14 +715,11 @@ defmodule Ecto.Adapters.SQL do

## Log

defp with_log(loggers, params, opts) do
case Keyword.pop(opts, :log, true) do
{true, opts} -> [log: &log(loggers, params, &1, opts)] ++ opts
{false, opts} -> opts
end
defp with_log(telemetry, params, opts) do
[log: &log(telemetry, params, &1, opts)] ++ opts
end

defp log(loggers, params, entry, opts) do
defp log({log, event_name}, params, entry, opts) do
%{
connection_time: query_time,
decode_time: decode_time,
Expand All @@ -744,7 +744,14 @@ defmodule Ecto.Adapters.SQL do
caller_pid: caller_pid
}

Ecto.LogEntry.apply(entry, loggers)
total = (query_time || 0) + (decode_time || 0) + (queue_time || 0)
Telemetry.execute(event_name, total, entry)

if level = Keyword.get(opts, :log, log) do
Ecto.LogEntry.log(entry, level)
end

entry
end

defp log_result({:ok, _query, res}), do: {:ok, res}
Expand All @@ -753,8 +760,8 @@ defmodule Ecto.Adapters.SQL do
## Connection helpers

defp checkout_or_transaction(fun, adapter_meta, opts, callback) do
%{pid: pool, loggers: loggers, opts: default_opts} = adapter_meta
opts = with_log(loggers, [], opts ++ default_opts)
%{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta
opts = with_log(telemetry, [], opts ++ default_opts)

callback = fn conn ->
previous_conn = put_conn(pool, conn)
Expand Down
Loading

0 comments on commit ee8a9ea

Please sign in to comment.