Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental fixes for lock contention #945

Merged
merged 9 commits into from
Sep 6, 2023
6 changes: 3 additions & 3 deletions apps/explorer/lib/explorer/chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4425,7 +4425,7 @@ defmodule Explorer.Chain do
where: address_name.address_hash == ^address_hash,
# Enforce Name ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :address_hash, asc: :name],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

repo.update_all(
Expand Down Expand Up @@ -5270,7 +5270,7 @@ defmodule Explorer.Chain do
)
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
|> order_by(asc: :hash)
|> lock("FOR UPDATE")
|> lock("FOR NO KEY UPDATE")

hashes = Enum.map(transactions, & &1.hash)

Expand Down Expand Up @@ -5315,7 +5315,7 @@ defmodule Explorer.Chain do
end)
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
|> order_by(asc: :hash)
|> lock("FOR UPDATE")
|> lock("FOR NO KEY UPDATE")

Repo.update_all(
from(t in Transaction, join: s in subquery(query), on: t.hash == s.hash),
Expand Down
1 change: 0 additions & 1 deletion apps/explorer/lib/explorer/chain/address/coin_balance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ defmodule Explorer.Chain.Address.CoinBalance do
balance
|> cast(params, @allowed_fields)
|> validate_required(@required_fields)
|> foreign_key_constraint(:address_hash)
|> unique_constraint(:block_number, name: :address_coin_balances_address_hash_block_number_index)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ defmodule Explorer.Chain.Address.CoinBalanceDaily do
balance
|> cast(params, @allowed_fields)
|> validate_required(@required_fields)
|> foreign_key_constraint(:address_hash)
|> unique_constraint(:day, name: :address_coin_balances_daily_address_hash_day_index)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ defmodule Explorer.Chain.Address.CurrentTokenBalance do
token_balance
|> cast(attrs, @allowed_fields)
|> validate_required(@required_fields)
|> foreign_key_constraint(:address_hash)
|> foreign_key_constraint(:token_contract_address_hash)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ defmodule Explorer.Chain.Address.TokenBalance do
token_balance
|> cast(attrs, @allowed_fields)
|> validate_required(@required_fields)
|> foreign_key_constraint(:address_hash)
|> foreign_key_constraint(:token_contract_address_hash)
|> unique_constraint(:block_number, name: :token_balances_address_hash_block_number_index)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do

# Enforce ShareLocks tables order (see docs: sharelocks.md)
multi
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ ->
token_contract_address_hashes_and_ids =
changes_list
|> Enum.map(fn change ->
token_id = get_tokend_id(change)

{change.token_contract_address_hash, token_id}
end)
|> Enum.uniq()

Tokens.acquire_contract_address_tokens(repo, token_contract_address_hashes_and_ids)
end)
|> Multi.run(:address_current_token_balances, fn repo, _ ->
insert(repo, changes_list, insert_options)
end)
Expand All @@ -139,10 +127,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
end)
end

defp get_tokend_id(change) do
if Map.has_key?(change, :token_id), do: change.token_id, else: nil
end

@impl Import.Runner
def timeout, do: @timeout

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
where: t.created_contract_address_hash in ^ordered_created_contract_hashes,
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: t.hash,
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

try do
Expand Down
6 changes: 3 additions & 3 deletions apps/explorer/lib/explorer/chain/import/runner/blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select: transaction,
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

update_query =
Expand Down Expand Up @@ -284,7 +284,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select: block.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

{_, removed_consensus_block_hashes} =
Expand Down Expand Up @@ -427,7 +427,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where: bsdr.uncle_hash in ^uncle_hashes,
# Enforce SeconDegreeRelation ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :nephew_hash, asc: :uncle_hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

update_query =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
select: b.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: b.hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

res = repo.all(query)
Expand All @@ -256,7 +256,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
select: pending_ops.block_hash,
# Enforce PendingBlockOperation ShareLocks order (see docs: sharelocks.md)
order_by: [asc: pending_ops.block_hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

res = repo.all(query)
Expand All @@ -272,7 +272,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
select: map(t, [:hash, :block_hash, :block_number, :cumulative_gas_used]),
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: [asc: t.hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

{:ok, repo.all(query)}
Expand Down
79 changes: 10 additions & 69 deletions apps/explorer/lib/explorer/chain/import/runner/tokens.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,6 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
@type holder_count :: non_neg_integer()
@type token_holder_count :: %{contract_address_hash: Hash.Address.t(), count: holder_count()}

def acquire_contract_address_tokens(repo, contract_address_hashes_and_token_ids) do
initial_query_no_token_id =
from(token in Token,
select: token
)

initial_query_with_token_id =
from(token in Token,
left_join: instance in Token.Instance,
on: token.contract_address_hash == instance.token_contract_address_hash,
select: token
)

{query_no_token_id, query_with_token_id} =
contract_address_hashes_and_token_ids
|> Enum.reduce({initial_query_no_token_id, initial_query_with_token_id}, fn {contract_address_hash, token_id},
{query_no_token_id,
query_with_token_id} ->
if is_nil(token_id) do
{from(
token in query_no_token_id,
or_where: token.contract_address_hash == ^contract_address_hash
), query_with_token_id}
else
{query_no_token_id,
from(
[token, instance] in query_with_token_id,
or_where: token.contract_address_hash == ^contract_address_hash and instance.token_id == ^token_id
)}
end
end)

final_query_no_token_id =
if query_no_token_id == initial_query_no_token_id do
nil
else
from(
token in query_no_token_id,
# Enforce Token ShareLocks order (see docs: sharelocks.md)
order_by: [
token.contract_address_hash
],
lock: "FOR UPDATE"
)
end

final_query_with_token_id =
if query_with_token_id == initial_query_with_token_id do
nil
else
from(
[token, instance] in query_with_token_id,
# Enforce Token ShareLocks order (see docs: sharelocks.md)
order_by: [
token.contract_address_hash,
instance.token_id
],
lock: "FOR UPDATE"
)
end

tokens_no_token_id = (final_query_no_token_id && repo.all(final_query_no_token_id)) || []
tokens_with_token_id = (final_query_with_token_id && repo.all(final_query_with_token_id)) || []
tokens = tokens_no_token_id ++ tokens_with_token_id

{:ok, tokens}
end

def update_holder_counts_with_deltas(repo, token_holder_count_deltas, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
Expand All @@ -102,6 +34,15 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
end)
|> Enum.unzip()

token_query =
from(
token in Token,
where: token.contract_address_hash in ^hashes,
select: token.contract_address_hash,
order_by: token.contract_address_hash,
lock: "FOR NO KEY UPDATE"
)

query =
from(
token in Token,
Expand All @@ -112,8 +53,8 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
^deltas
),
on: token.contract_address_hash == deltas.contract_address_hash,
where: token.contract_address_hash in subquery(token_query),
where: not is_nil(token.holder_count),
# ShareLocks order already enforced by `acquire_contract_address_tokens` (see docs: sharelocks.md)
update: [
set: [
holder_count: token.holder_count + deltas.delta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
where: block.hash in ^block_hashes,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

try do
Expand Down
6 changes: 0 additions & 6 deletions apps/explorer/lib/explorer/chain/internal_transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,6 @@ defmodule Explorer.Chain.InternalTransaction do
|> validate_call_error_or_result()
|> check_constraint(:call_type, message: ~S|can't be blank when type is 'call'|, name: :call_has_call_type)
|> check_constraint(:input, message: ~S|can't be blank when type is 'call'|, name: :call_has_call_type)
|> foreign_key_constraint(:from_address_hash)
|> foreign_key_constraint(:to_address_hash)
|> foreign_key_constraint(:transaction_hash)
|> unique_constraint(:index)
end
Expand All @@ -474,8 +472,6 @@ defmodule Explorer.Chain.InternalTransaction do
|> validate_required(@create_required_fields)
|> validate_create_error_or_result()
|> check_constraint(:init, message: ~S|can't be blank when type is 'create'|, name: :create_has_init)
|> foreign_key_constraint(:created_contract_address_hash)
|> foreign_key_constraint(:from_address_hash)
|> foreign_key_constraint(:transaction_hash)
|> unique_constraint(:index)
end
Expand All @@ -488,8 +484,6 @@ defmodule Explorer.Chain.InternalTransaction do
changeset
|> cast(attrs, @selfdestruct_allowed_fields)
|> validate_required(@selfdestruct_required_fields)
|> foreign_key_constraint(:from_address_hash)
|> foreign_key_constraint(:to_address_hash)
|> unique_constraint(:index)
end

Expand Down
1 change: 0 additions & 1 deletion apps/explorer/lib/explorer/chain/token.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ defmodule Explorer.Chain.Token do
token
|> cast(params, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:contract_address)
|> trim_name()
|> sanitize_token_input(:name)
|> sanitize_token_input(:symbol)
Expand Down
3 changes: 0 additions & 3 deletions apps/explorer/lib/explorer/chain/token_transfer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ defmodule Explorer.Chain.TokenTransfer do
struct
|> cast(params, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:from_address)
|> foreign_key_constraint(:to_address)
|> foreign_key_constraint(:token_contract_address)
|> foreign_key_constraint(:transaction)

# |> foreign_key_constraint(:block)
Expand Down
1 change: 1 addition & 0 deletions apps/indexer/lib/indexer/fetcher/coin_balance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ defmodule Indexer.Fetcher.CoinBalance do
|> Enum.map(fn %{address_hash: address_hash, block_number: block_number, value: value} ->
%{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value}
end)
|> Enum.sort_by(& &1.hash)
end

def import_fetched_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do
Expand Down
45 changes: 18 additions & 27 deletions apps/indexer/lib/indexer/fetcher/empty_blocks_sanitizer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,15 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
defp sanitize_empty_blocks(json_rpc_named_arguments) do
Telemetry.event(:empty_block_sanitize)

unprocessed_non_empty_blocks_from_db = unprocessed_non_empty_blocks_query_list(limit())

uniq_block_hashes = unprocessed_non_empty_blocks_from_db

if Enum.count(uniq_block_hashes) > 0 do
Repo.update_all(
from(
block in Block,
where: block.hash in ^uniq_block_hashes
),
set: [is_empty: false, updated_at: Timex.now()]
)
end
unprocessed_non_empty_blocks_query = unprocessed_non_empty_blocks_query(limit())

Repo.update_all(
from(
block in Block,
where: block.hash in subquery(unprocessed_non_empty_blocks_query)
),
set: [is_empty: false, updated_at: Timex.now()]
)

unprocessed_empty_blocks_from_db = unprocessed_empty_blocks_query_list(limit())

Expand Down Expand Up @@ -144,25 +140,20 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
where: block.consensus == true,
order_by: [asc: block.hash],
limit: ^limit,
offset: 1000,
lock: "FOR UPDATE"
offset: 1000
)
end

defp unprocessed_non_empty_blocks_query_list(limit) do
defp unprocessed_non_empty_blocks_query(limit) do
blocks_query = consensus_blocks_with_nil_is_empty_query(limit)

query =
from(q in subquery(blocks_query),
inner_join: transaction in Transaction,
on: q.number == transaction.block_number,
select: q.hash,
distinct: q.hash,
order_by: [asc: q.hash]
)

query
|> Repo.all(timeout: :infinity)
from(q in subquery(blocks_query),
inner_join: transaction in Transaction,
on: q.number == transaction.block_number,
select: q.hash,
order_by: [asc: q.hash],
lock: fragment("FOR NO KEY UPDATE OF ?", q)
)
end

defp unprocessed_empty_blocks_query_list(limit) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
where: block.hash in ^hashes,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash],
lock: "FOR UPDATE"
lock: "FOR NO KEY UPDATE"
)

Repo.update_all(
Expand Down
2 changes: 2 additions & 0 deletions apps/indexer/test/indexer/block/fetcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ defmodule Indexer.Block.FetcherTest do
end
end

# celo - skip flakey test
@tag :skip
@tag :no_geth
test "inserts an entry to unlocked celo in case of a gold_unlocked event", %{
block_fetcher: %Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher
Expand Down