Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: sources boot optimizations, health checks #1874

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions lib/logflare/source/rate_counter_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Logflare.Source.RateCounterServer do
use GenServer

require Logger
alias __MODULE__, as: RCS
alias Logflare.Source.RateCounterServer
alias Logflare.Google.BigQuery.GenUtils
alias Logflare.Source
alias Logflare.Source.Data
Expand Down Expand Up @@ -63,21 +63,21 @@ defmodule Logflare.Source.RateCounterServer do
bigquery_project_id = GenUtils.get_project_id(source_id)
init_counters(source_id, bigquery_project_id)

get_data_from_ets(source_id)
|> broadcast()
RateCounterServer.get_data_from_ets(source_id)
|> RateCounterServer.broadcast()

{:noreply, source_id}
end

def handle_info(:put_rate, source_id) when is_atom(source_id) do
{:ok, new_count} = get_insert_count(source_id)
state = get_data_from_ets(source_id)
%RCS{} = state = update_state(state, new_count)
state = RateCounterServer.get_data_from_ets(source_id)
%RateCounterServer{} = state = update_state(state, new_count)

update_ets_table(state)

if should_broadcast?(source_id) do
broadcast(state)
RateCounterServer.broadcast(state)
end

put_current_rate()
Expand All @@ -95,22 +95,22 @@ defmodule Logflare.Source.RateCounterServer do

@spec new(atom) :: __MODULE__.t()
def new(source_id) when is_atom(source_id) do
%RCS{begin_time: System.monotonic_time(), source_id: source_id}
%RateCounterServer{begin_time: System.monotonic_time(), source_id: source_id}
end

@spec update_state(RCS.t(), non_neg_integer) :: RCS.t()
def update_state(%RCS{} = state, new_count) do
@spec update_state(RateCounterServer.t(), non_neg_integer) :: RateCounterServer.t()
def update_state(%RateCounterServer{} = state, new_count) do
state
|> update_current_rate(new_count)
|> update_max_rate()
|> update_buckets()
end

def update_ets_table(%RCS{} = state) do
def update_ets_table(%RateCounterServer{} = state) do
insert_to_ets_table(state.source_id, state)
end

def state_to_external(%RCS{} = state) do
def state_to_external(%RateCounterServer{} = state) do
%{
last_rate: lr,
max_rate: mr,
Expand All @@ -126,15 +126,15 @@ defmodule Logflare.Source.RateCounterServer do
%{last_rate: lr, average_rate: bucket.average, max_rate: mr, limiter_metrics: limiter_metrics}
end

def update_max_rate(%RCS{max_rate: mx, last_rate: lr} = s) do
def update_max_rate(%RateCounterServer{max_rate: mx, last_rate: lr} = s) do
%{s | max_rate: Enum.max([mx, lr])}
end

def update_current_rate(%RCS{} = state, new_count) do
def update_current_rate(%RateCounterServer{} = state, new_count) do
%{state | last_rate: new_count - state.count, count: new_count}
end

def update_buckets(%RCS{} = state) do
def update_buckets(%RateCounterServer{} = state) do
Map.update!(state, :buckets, fn buckets ->
for {length, bucket} <- buckets, into: Map.new() do
# TODO: optimize by not recalculating total sum and average
Expand All @@ -156,7 +156,7 @@ defmodule Logflare.Source.RateCounterServer do
@spec get_rate(atom) :: integer
def get_rate(source_id) when is_atom(source_id) do
source_id
|> get_data_from_ets()
|> RateCounterServer.get_data_from_ets()
|> Map.get(:last_rate)
end

Expand All @@ -166,7 +166,7 @@ defmodule Logflare.Source.RateCounterServer do
@spec get_avg_rate(atom) :: integer
def get_avg_rate(source_id) when is_atom(source_id) do
source_id
|> get_data_from_ets()
|> RateCounterServer.get_data_from_ets()
|> Map.get(:buckets)
|> Map.get(@default_bucket_width)
|> Map.get(:average)
Expand All @@ -175,13 +175,13 @@ defmodule Logflare.Source.RateCounterServer do
@spec get_max_rate(atom) :: integer
def get_max_rate(source_id) when is_atom(source_id) do
source_id
|> get_data_from_ets()
|> RateCounterServer.get_data_from_ets()
|> Map.get(:max_rate)
end

def should_broadcast?(source_id) when is_atom(source_id) do
source_id
|> get_data_from_ets()
|> RateCounterServer.get_data_from_ets()
|> Map.get(:buckets)
|> Map.get(@default_bucket_width)
|> Map.get(:queue)
Expand All @@ -192,14 +192,14 @@ defmodule Logflare.Source.RateCounterServer do
def get_rate_metrics(source_id, bucket \\ :default)
when bucket == :default and is_atom(source_id) do
source_id
|> get_data_from_ets()
|> RateCounterServer.get_data_from_ets()
|> Map.get(:buckets)
|> Map.get(@default_bucket_width)
|> Map.drop([:queue])
end

defp setup_ets_table(source_id) when is_atom(source_id) do
initial = RCS.new(source_id)
initial = RateCounterServer.new(source_id)

insert_to_ets_table(source_id, initial)
end
Expand All @@ -208,15 +208,15 @@ defmodule Logflare.Source.RateCounterServer do
def get_data_from_ets(source_id) do
if ets_table_is_undefined?(source_id) do
Logger.error("RateCounterServer: ETS table #{name(source_id)} is undefined")
data = [{source_id, RCS.new(source_id)}]
data = [{source_id, RateCounterServer.new(source_id)}]
data[source_id]
else
data = :ets.lookup(@ets_table_name, source_id)

if data[source_id] do
data[source_id]
else
data = [{source_id, RCS.new(source_id)}]
data = [{source_id, RateCounterServer.new(source_id)}]
data[source_id]
end
end
Expand Down Expand Up @@ -246,7 +246,7 @@ defmodule Logflare.Source.RateCounterServer do
String.to_atom("#{source_id}" <> "-rate")
end

def broadcast(%RCS{} = state) do
def broadcast(%RateCounterServer{} = state) do
shard = :erlang.phash2(state.source_id, @pool_size)
local_rates = %{Node.self() => state_to_external(state)}

Expand Down Expand Up @@ -302,8 +302,16 @@ defmodule Logflare.Source.RateCounterServer do

defp init_counters(source_id, bigquery_project_id) when is_atom(source_id) do
log_count = Data.get_log_count(source_id, bigquery_project_id)
Counters.delete(source_id)
Counters.create(source_id)

try do
Counters.delete(source_id)
rescue
_e in ArgumentError ->
:noop
after
Counters.create(source_id)
end

Counters.increment_ets_count(source_id, 0)
Counters.increment_bq_count(source_id, log_count)
end
Expand Down
Loading