Skip to content

Commit

Permalink
feat: cache warmer for Users context (#2266)
Browse files Browse the repository at this point in the history
* feat: cache warmer for Users context

* docs: add spec and docs to list_ingesting_users/1

* feat: implement proper :cached format

* chore: typo

* feat: add db and cache health cehcks

* feat: refactor health checks to include caches

* chore: round uptime number

* perf: bump limit to 1k

* chore: version bump

* chore: revert get_by removal
  • Loading branch information
Ziinc authored Jan 2, 2025
1 parent 80a9d34 commit faed5c7
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 31 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.6
1.10.7
6 changes: 3 additions & 3 deletions lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ defmodule Logflare.Application do
defp get_children(:test) do
finch_pools() ++
[
Logflare.Repo,
Logflare.Vault,
ContextCache.Supervisor,
Counters,
RateCounters,
Logs.LogEvents.Cache,
ContextCache.Supervisor,
{Phoenix.PubSub, name: Logflare.PubSub},
PubSubRates,
Logs.RejectedLogEvents,
Logflare.Repo,
Logflare.Vault,
Logflare.Backends,
{Registry,
name: Logflare.V1SourceRegistry,
Expand Down
23 changes: 12 additions & 11 deletions lib/logflare/context_cache/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ defmodule Logflare.ContextCache.Supervisor do

@impl Supervisor
def init(_) do
res = Supervisor.init(get_children(@env), strategy: :one_for_one)
res
Supervisor.init(get_children(@env), strategy: :one_for_one)
end

defp get_children(:test) do
defp get_children(:test), do: list_caches()

defp get_children(_) do
list_caches() ++
[
ContextCache.TransactionBroadcaster,
ContextCache.CacheBuster
]
end

def list_caches do
[
ContextCache,
TeamUsers.Cache,
Expand All @@ -43,14 +52,6 @@ defmodule Logflare.ContextCache.Supervisor do
]
end

defp get_children(_) do
get_children(:test) ++
[
ContextCache.TransactionBroadcaster,
ContextCache.CacheBuster
]
end

@doc """
Returns the publisher :via name used for syn registry.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/google/bigquery/bigquery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ defmodule Logflare.Google.BigQuery do
access = access_emails ++ access_defaults

%Plan{name: plan} =
Users.Cache.get_by(id: user_id)
Users.Cache.get(user_id)
|> Billing.Cache.get_plan_by_user()

body = %Model.Dataset{
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/google/bigquery/gen_utils/gen_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Logflare.Google.BigQuery.GenUtils do
@spec get_project_id(atom()) :: String.t()
def get_project_id(source_id) when is_atom(source_id) do
%Source{user_id: user_id} = Sources.Cache.get_by(token: source_id)
%User{bigquery_project_id: project_id} = Users.Cache.get_by(id: user_id)
%User{bigquery_project_id: project_id} = Users.Cache.get(user_id)

project_id || env_project_id()
end
Expand All @@ -44,7 +44,7 @@ defmodule Logflare.Google.BigQuery.GenUtils do
bigquery_project_id: project_id,
bigquery_dataset_location: dataset_location,
bigquery_dataset_id: dataset_id
} = Users.Cache.get_by(id: user_id)
} = Users.Cache.get(user_id)

new_ttl =
cond do
Expand Down
21 changes: 21 additions & 0 deletions lib/logflare/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,25 @@ defmodule Logflare.Repo do
adapter: Ecto.Adapters.Postgres

use Scrivener
require Logger

def get_uptime do
query = "SELECT EXTRACT(epoch FROM (current_timestamp - pg_postmaster_start_time()));"

__MODULE__.query(query, [])
|> case do
{:ok,
%{
rows: [
[uptime]
]
}}
when is_number(uptime) ->
ceil(uptime)

{:error, _err} = err ->
Logger.warning("Could not get Postgres uptime, error: #{err}")
0
end
end
end
2 changes: 1 addition & 1 deletion lib/logflare/source/bigquery/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ defmodule Logflare.Source.BigQuery.Pipeline do

defp disconnect_backend_and_email(source_id, message) when is_atom(source_id) do
source = Sources.Cache.get_by(token: source_id)
user = Users.Cache.get_by(id: source.user_id)
user = Users.Cache.get(source.user_id)

defaults = %{
bigquery_dataset_location: nil,
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/source/email_notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Logflare.Source.EmailNotificationServer do
check_rate(state.notifications_every)

source = Sources.Cache.get_by_id(state.source_token)
user = Users.Cache.get_by(id: source.user_id)
user = Users.Cache.get(source.user_id)

if source.notifications.user_email_notifications do
AccountEmail.source_notification(user, rate, source) |> Mailer.deliver()
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Logflare.Source.RecentLogsServer do

require Logger

@touch_timer :timer.minutes(45)
@touch_timer :timer.seconds(1)
@broadcast_every 1_800

## Server
Expand Down
6 changes: 5 additions & 1 deletion lib/logflare/users/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Logflare.Users.Cache do

alias Logflare.Users
alias Logflare.Utils
import Cachex.Spec

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
Expand All @@ -16,6 +17,9 @@ defmodule Logflare.Users.Cache do
[
__MODULE__,
[
warmers: [
warmer(required: false, module: Users.CacheWarmer, name: Users.CacheWarmer)
],
hooks:
[
if(stats, do: Utils.cache_stats()),
Expand All @@ -29,8 +33,8 @@ defmodule Logflare.Users.Cache do
end

def get(id), do: apply_repo_fun(__ENV__.function, [id])
def get_by(keyword), do: apply_repo_fun(__ENV__.function, [keyword])

def get_by(keyword), do: apply_repo_fun(__ENV__.function, [keyword])
def get_by_and_preload(keyword), do: apply_repo_fun(__ENV__.function, [keyword])
def preload_defaults(user), do: apply_repo_fun(__ENV__.function, [user])
def preload_sources(user), do: apply_repo_fun(__ENV__.function, [user])
Expand Down
31 changes: 31 additions & 0 deletions lib/logflare/users/cache_warmer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Logflare.Users.CacheWarmer do
alias Logflare.Users

use Cachex.Warmer
@impl true
def execute(_state) do
users = Users.list_ingesting_users(limit: 1_000)

get_kv =
for u <- users do
[
{{:get, [u.id]}, u},
{{:get_by, [api_key: u.api_key]}, u}
]
end

preloaded_kv =
for {u, preloaded} <-
Enum.zip([
users,
Users.preload_defaults(users)
]) do
[
{{:get_by_and_preload, [api_key: u.api_key]}, preloaded},
{{:preload_defaults, [u]}, preloaded}
]
end

{:ok, List.flatten(get_kv ++ preloaded_kv) |> Enum.map(&{:cached, &1})}
end
end
27 changes: 26 additions & 1 deletion lib/logflare/users/users.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ defmodule Logflare.Users do
Repo.aggregate(User, :count)
end

@doc "Lists users with sources that are actively ingesting events"
@spec list_ingesting_users(keyword()) :: [User.t()]
def list_ingesting_users(limit: limit) do
from(u in User,
join: s in assoc(u, :sources),
where: s.log_events_updated_at >= ago(1, "day"),
order_by: {:desc, s.log_events_updated_at},
limit: ^limit,
select: u
)
|> Repo.all()
end

@doc """
Lists users and performs filtering based on filter keywords.
Expand Down Expand Up @@ -81,14 +94,26 @@ defmodule Logflare.Users do

def preload_defaults(nil), do: nil

def preload_defaults(user) do
def preload_defaults(%User{} = user) do
user
|> Repo.preload([:sources, :billing_account, :team])
|> Map.update!(:sources, fn sources ->
Enum.map(sources, &Sources.put_retention_days/1)
end)
end

def preload_defaults(users) when is_list(users) do
users
|> Repo.preload([:sources, :billing_account, :team])
|> Enum.map(fn user ->
user
|> maybe_put_bigquery_defaults()
|> Map.update!(:sources, fn sources ->
Enum.map(sources, &Sources.put_retention_days/1)
end)
end)
end

def preload_team(user) do
Repo.preload(user, :team)
end
Expand Down
33 changes: 29 additions & 4 deletions lib/logflare_web/controllers/health_check_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ defmodule LogflareWeb.HealthCheckController do
alias Logflare.Sources

def check(conn, _params) do
repo_uptime = Logflare.Repo.get_uptime()

caches = check_caches()

common_checks_ok? =
[
Sources.ingest_ets_tables_started?(),
Source.Supervisor.booting?() == false
Source.Supervisor.booting?() == false,
# checks that db can execute query and that repo is connected and up
repo_uptime > 0,
Enum.all?(Map.values(caches), &(&1 == :ok))
]
|> Enum.all?()

Expand All @@ -36,15 +43,16 @@ defmodule LogflareWeb.HealthCheckController do

response =
status
|> build_payload()
|> build_payload(repo_uptime: repo_uptime, caches: caches)
|> JSON.encode!()

conn
|> put_resp_content_type("application/json")
|> send_resp(code, response)
end

defp build_payload(status) when status in [:ok, :coming_up] do
defp build_payload(status, repo_uptime: repo_uptime, caches: caches)
when status in [:ok, :coming_up] do
nodes = Cluster.Utils.node_list_all()
proc_count = Process.list() |> Enum.count()

Expand All @@ -53,7 +61,24 @@ defmodule LogflareWeb.HealthCheckController do
proc_count: proc_count,
this_node: Node.self(),
nodes: nodes,
nodes_count: Enum.count(nodes)
nodes_count: Enum.count(nodes),
repo_uptime: repo_uptime,
caches: caches
}
end

defp check_caches() do
for cache <-
Logflare.ContextCache.Supervisor.list_caches() ++
[
Logflare.Logs.LogEvents.Cache
],
into: %{} do
# call is O(1)
case Cachex.size(cache) do
{:ok, _} -> {cache, :ok}
{:error, :no_cache} -> {cache, :no_cache}
end
end
end
end
23 changes: 21 additions & 2 deletions test/logflare/users/users_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Logflare.Users.CacheTest do
@moduledoc false
alias Logflare.Users
alias Logflare.User
alias Logflare.Users.CacheWarmer
use Logflare.DataCase

setup do
Expand All @@ -19,13 +20,31 @@ defmodule Logflare.Users.CacheTest do
end

describe "users cache" do
test "get_by_id/1", %{user: user} do
test "get/1", %{user: user} do
%_{id: user_id} =
Users.Cache.get_by(id: user.id)
Users.Cache.get(user.id)
|> Users.preload_defaults()
|> Map.update!(:sources, &Enum.map(&1, fn s -> %{s | rules: []} end))

assert %User{id: ^user_id} = user
end
end

test "warmer" do
assert {:ok, []} = CacheWarmer.execute(nil)
user = insert(:user)

insert(:source,
user: user,
log_events_updated_at: NaiveDateTime.shift(NaiveDateTime.utc_now(), hour: -2)
)

assert {:ok, pairs} = CacheWarmer.execute(nil)
assert {:ok, true} = Cachex.put_many(Users.Cache, pairs)

Logflare.Users
|> reject(:get, 1)

assert Users.Cache.get(user.id)
end
end
9 changes: 9 additions & 0 deletions test/logflare/users/users_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ defmodule Logflare.UsersTest do
{:ok, user: user, source: source}
end

describe "Users.list_ingesting_users/1" do
test "lists ingesting users based on source activity" do
assert [] = Users.list_ingesting_users(limit: 500)
user = insert(:user)
insert(:source, user: user, log_events_updated_at: NaiveDateTime.utc_now())
assert [_] = Users.list_ingesting_users(limit: 500)
end
end

describe "Users.list_users/1" do
test "lists all users created by a partner" do
[user | others] = insert_list(3, :user)
Expand Down
10 changes: 9 additions & 1 deletion test/logflare_web/controllers/health_check_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ defmodule LogflareWeb.HealthCheckControllerTest do

conn = get(conn, "/health")

assert %{"nodes" => [_], "nodes_count" => 1, "status" => "ok"} = json_response(conn, 200)
assert %{
"nodes" => [_],
"nodes_count" => 1,
"status" => "ok",
"caches" => %{
"Elixir.Logflare.Auth.Cache" => "ok"
}
} = json_response(conn, 200)
end

test "coming_up while RLS boot warming", %{conn: conn} do
Expand Down Expand Up @@ -77,6 +84,7 @@ defmodule LogflareWeb.HealthCheckControllerTest do
end

test "ok", %{conn: conn} do
# :timer.sleep(500)
SingleTenant.create_supabase_sources()
SingleTenant.create_supabase_endpoints()

Expand Down
Loading

0 comments on commit faed5c7

Please sign in to comment.