Skip to content

Commit

Permalink
Papa/sec 27 watcher info ife support (#1496)
Browse files Browse the repository at this point in the history
* feat: gather data for events

* feat: generating events from finalizations

* feat: add new events consumer to spent finalized ife-utxos

* test: add cabbage test

* feat: allow single eth event spend more utxos

Preparing for more spending events like start_ife, ife_piggybacked, ...
`start_ife` will spend all transaction's inputs

* refactor: move & rename bus events helper function

* feat: unify events handling in watcher_info, pass ife_started, test green

* test: simplify & speed up cabbage test - but ife has to be processed

* test: add failing test showing piggybacked output remains unspend

* feat: add TxOutput.get_by_output_id/2 and handle events with output id

* feat: publish event when ife tx output is piggybacked

* chore: rebase onto master

* test: align to comments

* chore: align to Pawel's comments
  • Loading branch information
pnowosie authored May 19, 2020
1 parent e8d486d commit 4ab2549
Show file tree
Hide file tree
Showing 15 changed files with 810 additions and 86 deletions.
32 changes: 31 additions & 1 deletion apps/omg_watcher/lib/omg_watcher/exit_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule OMG.Watcher.ExitProcessor do
alias OMG.Watcher.ExitProcessor.Core
alias OMG.Watcher.ExitProcessor.ExitInfo
alias OMG.Watcher.ExitProcessor.StandardExit
alias OMG.Watcher.ExitProcessor.Tools

use OMG.Utils.LoggerExt
require Utxo
Expand Down Expand Up @@ -340,6 +341,15 @@ defmodule OMG.Watcher.ExitProcessor do
end
)

# Prepare events data for internal bus
:ok =
events
|> Enum.map(fn %{call_data: %{input_utxos_pos: inputs}} = event ->
{event, inputs}
end)
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightExitStarted")

{:ok, statuses} = Eth.RootChain.get_in_flight_exit_structs(contract_ife_ids)
ife_contract_statuses = Enum.zip(statuses, contract_ife_ids)
{new_state, db_updates} = Core.new_in_flight_exits(state, events, ife_contract_statuses)
Expand Down Expand Up @@ -376,6 +386,12 @@ defmodule OMG.Watcher.ExitProcessor do
def handle_call({:piggyback_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized piggybacks: #{inspect(exits)}")
{new_state, db_updates} = Core.new_piggybacks(state, exits)

:ok =
exits
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightTxOutputPiggybacked")

{:reply, {:ok, db_updates}, new_state}
end

Expand Down Expand Up @@ -449,7 +465,8 @@ defmodule OMG.Watcher.ExitProcessor do
# necessary, so that the processor knows the current state of inclusion of exiting IFE txs
state2 = update_with_ife_txs_from_blocks(state)

{:ok, exiting_positions} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(state2, finalizations)
{:ok, exiting_positions, events_with_utxos} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(state2, finalizations)

# NOTE: it's not straightforward to track from utxo position returned when exiting utxo in State to ife id
# See issue #671 https://github.com/omisego/elixir-omg/issues/671
Expand All @@ -458,6 +475,11 @@ defmodule OMG.Watcher.ExitProcessor do

{:ok, state3, db_updates} = Core.finalize_in_flight_exits(state2, finalizations, invalidities)

:ok =
events_with_utxos
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightExitOutputWithdrawn")

{:reply, {:ok, state_db_updates ++ db_updates}, state3}
end

Expand Down Expand Up @@ -702,4 +724,12 @@ defmodule OMG.Watcher.ExitProcessor do
|> Map.put(:scheduled_finalization_time, scheduled_finalization_time)
|> Map.put(:block_timestamp, exit_block_timestamp)
end

defp publish_internal_bus_events([], _), do: :ok

defp publish_internal_bus_events(events_data, topic) when is_list(events_data) and is_binary(topic) do
{:watcher, topic}
|> OMG.Bus.Event.new(:data, events_data)
|> OMG.Bus.direct_local_broadcast()
end
end
29 changes: 19 additions & 10 deletions apps/omg_watcher/lib/omg_watcher/exit_processor/finalizations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,34 @@ defmodule OMG.Watcher.ExitProcessor.Finalizations do
end

@doc """
Returns a tuple of {:ok, map in-flight exit id => {finalized input exits, finalized output exits}}.
finalized input exits and finalized output exits structures both fit into `OMG.State.exit_utxos/1`.
Returns a tuple of `{:ok, %{ife_exit_id => {finalized_input_exits | finalized_output_exits}}, list(events_exits)}`.
Finalized input exits and finalized output exits structures both fit into `OMG.State.exit_utxos/1`.
Events exits list contains Ethereum's finalization events paired with utxos they exits. This data is needed to
broadcast information to the consumers about utxos that needs to marked as spend as the result of finalization.
When there are invalid finalizations, returns one of the following:
- {:inactive_piggybacks_finalizing, list of piggybacks that exit processor state is not aware of}
- {:unknown_in_flight_exit, set of in-flight exit ids that exit processor is not aware of}
"""
@spec prepare_utxo_exits_for_in_flight_exit_finalizations(Core.t(), [map()]) ::
{:ok, map()}
{:ok, map(), list()}
| {:inactive_piggybacks_finalizing, list()}
| {:unknown_in_flight_exit, MapSet.t(non_neg_integer())}
def prepare_utxo_exits_for_in_flight_exit_finalizations(%Core{in_flight_exits: ifes}, finalizations) do
finalizations = finalizations |> Enum.map(&ife_id_to_binary/1)

with {:ok, ifes_by_id} <- get_all_finalized_ifes_by_ife_contract_id(finalizations, ifes),
{:ok, []} <- known_piggybacks?(finalizations, ifes_by_id) do
exiting_positions_by_ife_id =
{exiting_positions_by_ife_id, events_with_positions} =
finalizations
|> Enum.reverse()
|> Enum.reduce(%{}, &prepare_utxo_exits_for_finalization(&1, &2, ifes_by_id))
|> Enum.reduce({%{}, []}, &combine_utxo_exits_with_finalization(&1, &2, ifes_by_id))

{:ok, exiting_positions_by_ife_id}
{
:ok,
exiting_positions_by_ife_id,
Enum.reject(events_with_positions, &Kernel.match?({_, []}, &1))
}
end
end

Expand Down Expand Up @@ -150,9 +156,9 @@ defmodule OMG.Watcher.ExitProcessor.Finalizations do
),
do: not InFlightExitInfo.is_active?(ifes_by_id[ife_id], {piggyback_type, output_index})

defp prepare_utxo_exits_for_finalization(
%{in_flight_exit_id: ife_id, output_index: output_index, omg_data: %{piggyback_type: piggyback_type}},
exiting_positions,
defp combine_utxo_exits_with_finalization(
%{in_flight_exit_id: ife_id, output_index: output_index, omg_data: %{piggyback_type: piggyback_type}} = event,
{exiting_positions, events_with_positions},
ifes_by_id
) do
ife = ifes_by_id[ife_id]
Expand All @@ -161,7 +167,10 @@ defmodule OMG.Watcher.ExitProcessor.Finalizations do
# figure out if there's any UTXOs really exiting from the `OMG.State` from this IFE's piggybacked input/output
exiting_positions_for_piggyback = get_exiting_positions(ife, output_index, piggyback_type)

Map.update(exiting_positions, ife_id, exiting_positions_for_piggyback, &(exiting_positions_for_piggyback ++ &1))
{
Map.update(exiting_positions, ife_id, exiting_positions_for_piggyback, &(exiting_positions_for_piggyback ++ &1)),
[{event, exiting_positions_for_piggyback} | events_with_positions]
}
end

defp get_exiting_positions(ife, output_index, :input) do
Expand Down
66 changes: 66 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/exit_processor/tools.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ defmodule OMG.Watcher.ExitProcessor.Tools do
alias OMG.Watcher.ExitProcessor.DoubleSpend
alias OMG.Watcher.ExitProcessor.KnownTx

require Utxo

@typep eth_event_t() :: %{root_chain_txhash: Crypto.hash_t(), log_index: non_neg_integer()}
@typep eth_event_with_exiting_positions_t() :: {eth_event_t(), list(Utxo.Position.t())} | eth_event_t()

# Intersects utxos, looking for duplicates. Gives full list of double-spends with indexes for
# a pair of transactions.
@spec double_spends_from_known_tx(list({Utxo.Position.t(), non_neg_integer()}), KnownTx.t()) ::
Expand Down Expand Up @@ -82,4 +87,65 @@ defmodule OMG.Watcher.ExitProcessor.Tools do
value -> {:ok, value}
end
end

@doc """
Transforms Ethereum events like InFlightExitStarted or InFlightExitOutputWithdrawn
to form that can be consumed by subscribers
"""
@spec to_bus_events_data(list(eth_event_with_exiting_positions_t())) ::
list(%{
call_data: map(),
root_chain_txhash: charlist(),
log_index: non_neg_integer()
})
def to_bus_events_data(eth_events_with_exiting_utxos) do
Enum.reduce(eth_events_with_exiting_utxos, [], &to_bus_events_reducer/2)
end

defp to_bus_events_reducer(
{%{root_chain_txhash: root_chain_txhash, log_index: log_index}, utxo_positions},
bus_events
) do
utxo_pos_transform = fn
Utxo.position(_, _, _) = u -> Utxo.Position.encode(u)
encoded when is_integer(encoded) -> encoded
end

utxo_positions
|> Enum.map(
&%{
call_data: %{utxo_pos: utxo_pos_transform.(&1)},
root_chain_txhash: root_chain_txhash,
log_index: log_index
}
)
|> Enum.concat(bus_events)
end

defp to_bus_events_reducer(%{omg_data: %{piggyback_type: :input}}, bus_events) do
# In-flight transaction's inputs are spend when IFE is started we are not interested with input piggybacks
bus_events
end

defp to_bus_events_reducer(
%{
root_chain_txhash: root_chain_txhash,
log_index: log_index,
omg_data: %{piggyback_type: :output},
tx_hash: txhash,
output_index: oindex
},
bus_events
) do
# Note: It cannot be deposit as it is piggyback to output, so output is created by in-flight transaction
# If transaction was included in plasma block, output is created and could be spend by this event
[
%{
call_data: %{txhash: txhash, oindex: oindex},
root_chain_txhash: root_chain_txhash,
log_index: log_index
}
| bus_events
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule OMG.Watcher.ExitProcessor.Core.StateInteractionTest do

alias OMG.Eth.Configuration
alias OMG.State
alias OMG.State.Transaction
alias OMG.TestHelper
alias OMG.Utxo
alias OMG.Watcher.Event
Expand Down Expand Up @@ -213,9 +214,14 @@ defmodule OMG.Watcher.ExitProcessor.Core.StateInteractionTest do

ife_id1 = <<ife_id1::192>>
ife_id2 = <<ife_id2::192>>
ife_tx1_output_pos = Utxo.position(1000, 0, 0)
ife_tx2_input_pos = Utxo.position(2, 0, 0)

{:ok, %{^ife_id1 => exiting_positions1, ^ife_id2 => exiting_positions2}} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
{:ok, %{^ife_id1 => exiting_positions1, ^ife_id2 => exiting_positions2},
[
{%{in_flight_exit_id: ^ife_id1}, [^ife_tx1_output_pos]},
{%{in_flight_exit_id: ^ife_id2}, [^ife_tx2_input_pos]}
]} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)

assert {:ok,
{[{:delete, :utxo, _}, {:delete, :utxo, _}], {[Utxo.position(1000, 0, 0), Utxo.position(2, 0, 0)], []}},
Expand All @@ -240,7 +246,7 @@ defmodule OMG.Watcher.ExitProcessor.Core.StateInteractionTest do
finalizations = [%{in_flight_exit_id: ife_id, output_index: 0, omg_data: %{piggyback_type: :output}}]
ife_id = <<ife_id::192>>

{:ok, %{^ife_id => exiting_positions}} =
{:ok, %{^ife_id => exiting_positions}, []} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)

{:ok, {_, {[], [] = invalidities}}, _} = State.Core.exit_utxos(exiting_positions, state)
Expand Down Expand Up @@ -272,8 +278,9 @@ defmodule OMG.Watcher.ExitProcessor.Core.StateInteractionTest do

finalizations = [%{in_flight_exit_id: ife_id, output_index: 0, omg_data: %{piggyback_type: :output}}]
ife_id = <<ife_id::192>>
[exiting_utxo] = Transaction.get_inputs(spending_tx)

{:ok, %{^ife_id => exiting_positions}} =
{:ok, %{^ife_id => exiting_positions}, [{%{in_flight_exit_id: ^ife_id}, [^exiting_utxo]}]} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)

{:ok, {_, {[], [_] = invalidities}}, _} = State.Core.exit_utxos(exiting_positions, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ defmodule OMG.Watcher.ExitProcessor.FinalizationsTest do
test "can process empty finalizations", %{processor_empty: empty, processor_filled: filled} do
assert {^empty, []} = Core.finalize_exits(empty, {[], []})
assert {^filled, []} = Core.finalize_exits(filled, {[], []})
assert {:ok, %{}} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(empty, [])
assert {:ok, %{}} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(filled, [])
assert {:ok, %{}, []} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(empty, [])
assert {:ok, %{}, []} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(filled, [])
assert {:ok, ^empty, []} = Core.finalize_in_flight_exits(empty, [], %{})
assert {:ok, ^filled, []} = Core.finalize_in_flight_exits(filled, [], %{})
end
Expand Down Expand Up @@ -74,8 +74,11 @@ defmodule OMG.Watcher.ExitProcessor.FinalizationsTest do
tx1_first_output = Utxo.position(tx1_blknum, 0, 0)
tx1_second_output = Utxo.position(tx1_blknum, 0, 1)

assert {:ok, %{^ife_id1 => [^tx1_first_output, ^tx1_second_output]}} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
assert {
:ok,
%{^ife_id1 => [^tx1_first_output, ^tx1_second_output]},
[{%{output_index: 0}, [^tx1_first_output]}, {%{output_index: 1}, [^tx1_second_output]}]
} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
end

test "doesn't signal non-included txs' outputs as exiting when piggybacked output exits",
Expand Down Expand Up @@ -103,7 +106,7 @@ defmodule OMG.Watcher.ExitProcessor.FinalizationsTest do

ife_id1 = <<ife_id1::192>>

assert {:ok, %{^ife_id1 => []}} =
assert {:ok, %{^ife_id1 => []}, []} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
end

Expand Down Expand Up @@ -137,16 +140,22 @@ defmodule OMG.Watcher.ExitProcessor.FinalizationsTest do
%{in_flight_exit_id: ife_id2, output_index: 0, omg_data: %{piggyback_type: :output}}
]

assert {:ok, %{}} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, [])
assert {:ok, %{}, []} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, [])

ife_id1 = <<ife_id1::192>>
ife_id2 = <<ife_id2::192>>

tx1_first_input = tx1 |> Transaction.get_inputs() |> hd()
tx2_first_output = Utxo.position(tx2_blknum, 1, 0)

assert {:ok, %{^ife_id1 => [^tx1_first_input], ^ife_id2 => [^tx2_first_output]}} =
Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
assert {
:ok,
%{^ife_id1 => [^tx1_first_input], ^ife_id2 => [^tx2_first_output]},
[
{%{in_flight_exit_id: ^ife_id1}, [^tx1_first_input]},
{%{in_flight_exit_id: ^ife_id2}, [^tx2_first_output]}
]
} = Core.prepare_utxo_exits_for_in_flight_exit_finalizations(processor, finalizations)
end

test "fails when unknown in-flight exit is being finalized", %{processor_empty: processor} do
Expand Down
Loading

0 comments on commit 4ab2549

Please sign in to comment.