Skip to content

Commit

Permalink
Handle multiple network endpoints and choose a healthy one with offch…
Browse files Browse the repository at this point in the history
…ain indexing enabled (#1693)

* add simple `EndpointManager` class

* [EndpointManager] minor fixes

* [EndpointManager] Extract EndpointChecker

* [EndpointManager] fmt

* [EndpointManager] fix: change abstract class to mixin

* minor fixes

* [config/networks] add endpoints and rename them

* [api] use `EndpointManager` in API

* [api] check for offchain indexing in the healthcheck

* [dart_api] proper condition of offchain indexing enablement log.

* [dart_api] remove unused code

* [EndpointManager] randomize order of healthcheck.

* [const/network] better naming convention for address field

* [EndpointManager] make randomization optional

* fmt

* update pubspec_overrides.yaml
  • Loading branch information
clangenb authored Jul 16, 2024
1 parent 68e3ec7 commit 68ad019
Show file tree
Hide file tree
Showing 12 changed files with 589 additions and 43 deletions.
41 changes: 19 additions & 22 deletions app/lib/config/networks/networks.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import 'dart:io';

import 'package:encointer_wallet/config/consts.dart';
import 'package:ew_endpoint_manager/endpoint_manager.dart';

class NetworkEndpoint {
NetworkEndpoint({required this.name, required this.address});
class NetworkEndpoint with Endpoint {
NetworkEndpoint({required this.name, required String address}) : _address = address;

final String name;
final String address;
final String _address;

@override
String address() => _address;
}

const String gesellId = 'nctr-gsl';
Expand Down Expand Up @@ -75,15 +79,13 @@ enum Network {
};
}

/// Exists for simple reverse compatibility.
/// Will be remove in the course of https://github.com/encointer/encointer-wallet-flutter/issues/1603.
String value() {
String defaultEndpoint() {
return switch (this) {
encointerKusama => networkEndpoints().first.address,
encointerRococo => networkEndpoints().first.address,
gesell => networkEndpoints().first.address,
encointerKusama => networkEndpoints().first.address(),
encointerRococo => networkEndpoints().first.address(),
gesell => networkEndpoints().first.address(),
// only dev network refers to the local one
gesellDev => networkEndpoints().first.address,
gesellDev => networkEndpoints().first.address(),
};
}

Expand All @@ -98,31 +100,26 @@ enum Network {
}

List<NetworkEndpoint> gesellEndpoints() {
return [
NetworkEndpoint(name: 'Encointer Gesell (Hosted by Encointer Association)', address: 'wss://gesell.encointer.org')
];
return [NetworkEndpoint(name: 'Encointer Association', address: 'wss://gesell.encointer.org')];
}

List<NetworkEndpoint> gesellDevEndpoints() {
return [
NetworkEndpoint(
name: 'Encointer Gesell Local DevNet',
address: 'ws://${Platform.isAndroid ? androidLocalHost : iosLocalHost}:9944')
NetworkEndpoint(name: 'Local DevNet', address: 'ws://${Platform.isAndroid ? androidLocalHost : iosLocalHost}:9944')
];
}

List<NetworkEndpoint> rococoEndpoints() {
return [
NetworkEndpoint(
name: 'Encointer Lietaer on Rococo (Hosted by Encointer Association)',
address: 'wss://rococo.api.encointer.org')
NetworkEndpoint(name: 'Encointer Association', address: 'wss://rococo.api.encointer.org'),
];
}

List<NetworkEndpoint> kusamaEndpoints() {
return [
NetworkEndpoint(
name: 'Encointer Network on Kusama (Hosted by Encointer Association)',
address: 'wss://kusama.api.encointer.org')
NetworkEndpoint(name: 'Encointer Association', address: 'wss://kusama.api.encointer.org'),
NetworkEndpoint(name: 'Dwellir', address: 'wss://encointer-kusama-rpc.dwellir.com'),
NetworkEndpoint(name: 'IBP1', address: 'wss://sys.ibp.network/encointer-kusama'),
NetworkEndpoint(name: 'IBP2', address: 'wss://sys.dotters.network/encointer-kusama'),
];
}
47 changes: 40 additions & 7 deletions app/lib/service/substrate_api/api.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:encointer_wallet/config/networks/networks.dart';
import 'package:encointer_wallet/store/app.dart';
import 'package:encointer_wallet/mocks/ipfs_api.dart';
import 'package:encointer_wallet/service/ipfs/ipfs_api.dart';
Expand All @@ -9,6 +10,7 @@ import 'package:encointer_wallet/service/substrate_api/chain_api.dart';
import 'package:encointer_wallet/service/substrate_api/core/dart_api.dart';
import 'package:encointer_wallet/service/substrate_api/encointer/encointer_api.dart';
import 'package:encointer_wallet/service/substrate_api/core/reconnecting_ws_provider.dart';
import 'package:ew_endpoint_manager/endpoint_manager.dart';
import 'package:ew_http/ew_http.dart';
import 'package:ew_polkadart/ew_polkadart.dart';
import 'package:encointer_wallet/service/log/log_service.dart';
Expand All @@ -18,6 +20,31 @@ import 'package:encointer_wallet/service/log/log_service.dart';
/// `late final` because it will be initialized exactly once in lib/app.dart.
late Api webApi;

class NetworkEndpointChecker with EndpointChecker<NetworkEndpoint> {
// Trivial check if we can connect to an endpoint.
@override
Future<bool> checkHealth(NetworkEndpoint endpoint) async {
Log.d('[NetworkEndpointChecker] Checking health of: ${endpoint.address()}', 'Api');

final provider = WsProvider(Uri.parse(endpoint.address()));
final ready = await provider.ready();

Log.d('[NetworkEndpointChecker] Endpoint ${endpoint.address()} ready: $ready', 'Api');

if (!ready) {
await provider.disconnect();
return false;
}

final offchainIndexing = await SubstrateDartApi(provider).offchainIndexingEnabled();
Log.d('[NetworkEndpointChecker] Endpoint ${endpoint.address()} offchainIndexingEnabled: $offchainIndexing', 'Api');

await provider.disconnect();
// only allow nodes that have offchain indexing enabled
return offchainIndexing;
}
}

class Api {
Api(
this.store,
Expand All @@ -34,7 +61,9 @@ class Api {
EwHttp ewHttp, {
bool isIntegrationTest = false,
}) {
final provider = ReconnectingWsProvider(Uri.parse(store.settings.currentNetwork.value()), autoConnect: false);
// Initialize with default endpoint, will check for healthiness later.
final provider =
ReconnectingWsProvider(Uri.parse(store.settings.currentNetwork.defaultEndpoint()), autoConnect: false);
return Api(
store,
provider,
Expand Down Expand Up @@ -79,13 +108,17 @@ class Api {
});
}

Future<void> _connect() {
Log.d('[webApi] Connecting to endpoint: ${store.settings.currentNetwork.value()}', 'Api');
Future<void> _connect() async {
Log.p('[webApi] Looking for a healthy endpoint...', 'Api');
final manager =
EndpointManager.withEndpoints(NetworkEndpointChecker(), store.settings.currentNetwork.networkEndpoints());
final endpoint = await manager.pollHealthyEndpoint(randomize: true);

Log.p('[webApi] Connecting to healthy endpoint: ${endpoint.address()}', 'Api');

store.settings.setNetworkLoading(true);

final endpoint = store.settings.currentNetwork.value();
return provider.connectToNewEndpoint(Uri.parse(endpoint)).then((voidValue) async {
return provider.connectToNewEndpoint(Uri.parse(endpoint.address())).then((voidValue) async {
Log.p('[webApi] channel is ready...');
if (await isConnected()) {
return _onConnected();
Expand All @@ -99,7 +132,7 @@ class Api {
}

Future<void> _onConnected() async {
Log.d('[webApi] Connected to endpoint: ${store.settings.currentNetwork.value()}', 'Api');
Log.d('[webApi] Connected to endpoint: ${provider.url}', 'Api');

if (store.account.currentAddress.isNotEmpty) {
await store.encointer.initializeUninitializedStores(store.account.currentAddress);
Expand All @@ -118,7 +151,7 @@ class Api {

store.settings.setNetworkLoading(false);

Log.d('[webApi] Obtained basic network data: ${store.settings.currentNetwork.value()}');
Log.d('[webApi] Obtained basic network data: ${provider.url}');

// need to do this from here as we can't access instance fields in constructor.
account.setFetchAccountData(fetchAccountData);
Expand Down
24 changes: 12 additions & 12 deletions app/lib/service/substrate_api/core/dart_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,15 @@ class SubstrateDartApi {
/// Websocket client used to connect to the node.
final Provider _provider;

/// The rpc methods exposed by the connected node.
RpcMethods? _rpc;

/// Address of the node we connect to including ws(s).
String? _endpoint;

/// Returns the rpc nodes of the connected node or an empty list otherwise.
Future<RpcMethods> rpcMethods() async {
return rpc<Map<String, dynamic>>('rpc_methods', []).then(RpcMethods.fromJson);
}

/// Gets address of the node we connect to including ws(s).
String? get endpoint => _endpoint;

Future<void> connect(String endpoint) async {
try {
_rpc = await rpc<Map<String, dynamic>>('rpc_methods', []).then(RpcMethods.fromJson);

// Sanity check that we are running against valid node with offchain indexing enabled
if (!_rpc!.methods!.contains('encointer_getReputations')) {
if (!(await offchainIndexingEnabled())) {
Log.d(
"rpc_methods does not contain 'getReputations'. Are the following flags passed"
" to the node? \n '--enable-offchain-indexing true --rpc-methods unsafe'",
Expand All @@ -44,6 +33,17 @@ class SubstrateDartApi {
}
}

Future<bool> offchainIndexingEnabled() async {
try {
// Check reputation of Alice. This will return an exception if offchain
// indexing is disabled.
await rpc<List<dynamic>>('encointer_getReputations', ['5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY']);
return Future.value(true);
} catch (e) {
return Future.value(false);
}
}

/// Queries the rpc of the node.
///
/// Hints:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class ReconnectingWsProvider extends Provider {
autoConnect: autoConnect,
);

final Uri url;
Uri url;
WsProvider provider;

Future<void> connectToNewEndpoint(Uri url) async {
await disconnect();
this.url = url;
provider = WsProvider(url);
await provider.ready();
}
Expand Down
7 changes: 7 additions & 0 deletions app/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ packages:
relative: true
source: path
version: "0.1.0+1"
ew_endpoint_manager:
dependency: "direct main"
description:
path: "../packages/endpoint_manager"
relative: true
source: path
version: "0.1.0+1"
ew_http:
dependency: "direct main"
description:
Expand Down
2 changes: 2 additions & 0 deletions app/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ dependencies:
pointycastle: ^3.7.3
connectivity_plus: ^5.0.2
timezone: ^0.9.2
ew_endpoint_manager:
path: ../packages/endpoint_manager
ew_encointer_utils:
path: ../packages/ew_encointer_utils/
ew_storage:
Expand Down
4 changes: 3 additions & 1 deletion app/pubspec_overrides.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# melos_managed_dependency_overrides: ew_encointer_utils,ew_http,ew_keyring,ew_polkadart,ew_primitives,ew_storage,ew_substrate_fixed,polkadart,polkadart_scale_codec,substrate_metadata,polkadart_keyring
# melos_managed_dependency_overrides: ew_encointer_utils,ew_http,ew_keyring,ew_polkadart,ew_primitives,ew_storage,ew_substrate_fixed,polkadart,polkadart_scale_codec,substrate_metadata,polkadart_keyring,ew_endpoint_manager
dependency_overrides:
ew_encointer_utils:
path: ../packages/ew_encointer_utils
ew_endpoint_manager:
path: ../packages/endpoint_manager
ew_http:
path: ../packages/ew_http
ew_keyring:
Expand Down
7 changes: 7 additions & 0 deletions packages/endpoint_manager/analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include: package:very_good_analysis/analysis_options.3.1.0.yaml

linter:
rules:
public_member_api_docs: false
lines_longer_than_80_chars: false
sort_pub_dependencies: false
71 changes: 71 additions & 0 deletions packages/endpoint_manager/lib/endpoint_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import 'dart:math';

mixin Endpoint {
String address();
}

mixin EndpointChecker<E extends Endpoint> {
Future<bool> checkHealth(E endpoint);
}

class EndpointManager<C extends EndpointChecker, E extends Endpoint> {
EndpointManager(this._checker);

EndpointManager.withEndpoints(this._checker, List<E> endpoints) {
for (final e in endpoints) {
this.endpoints[e.address()] = e;
}
}

Map<String, E> endpoints = {};

final EndpointChecker _checker;

void addEndpoint(E endpoint) {
endpoints[endpoint.address()] = endpoint;
}

void removeEndpoint(E endpoint) {
endpoints.remove(endpoint.address());
}

List<E> getEndpoints() {
return endpoints.values.toList();
}

/// Returns the first endpoint that is healthy where the checks are optionally run in random order.
///
/// Will return null if all endpoints are unhealthy.
Future<E?> getHealthyEndpoint({bool randomize = false}) {
final values = endpoints.values.toList();

if (randomize) {
values.shuffle(Random());
}

return firstWhereAsync(values, _checker.checkHealth);
}

/// Returns a future that completes once a healthy endpoint has been found.
Future<E> pollHealthyEndpoint({bool randomize = false}) async {
E? endpoint;

while ((endpoint = await getHealthyEndpoint(randomize: randomize)) == null) {
await Future<void>.delayed(const Duration(seconds: 5));
}

return endpoint!;
}
}

Future<T?> firstWhereAsync<T>(
Iterable<T> items,
Future<bool> Function(T) test,
) async {
for (final item in items) {
if (await test(item).timeout(const Duration(seconds: 2), onTimeout: () => false)) {
return item;
}
}
return null;
}
Loading

0 comments on commit 68ad019

Please sign in to comment.