Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hex boosting support #716

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
637 changes: 502 additions & 135 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ debug = true

[workspace]
members = [
"boost_manager",
"db_store",
"denylist",
"file_store",
Expand Down
52 changes: 52 additions & 0 deletions boost_manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "boost-manager"
version = "0.1.0"
description = "Hex boosting manager"
edition.workspace = true
authors.workspace = true
license.workspace = true


[dependencies]
anyhow = {workspace = true}
anchor-lang = "0.28"
anchor-spl = "0.28"
axum = {version = "0", features = ["tracing"]}
bs58 = {workspace = true}
config = {workspace = true}
clap = {workspace = true}
thiserror = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
sqlx = {workspace = true}
base64 = {workspace = true}
sha2 = {workspace = true}
lazy_static = {workspace = true}
triggered = {workspace = true}
futures = {workspace = true}
futures-util = {workspace = true}
prost = {workspace = true}
once_cell = {workspace = true}
mobile-config = {path = "../mobile_config"}
file-store = {path = "../file_store"}
db-store = { path = "../db_store" }
poc-metrics = {path = "../metrics"}
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
metrics = {workspace = true }
metrics-exporter-prometheus = { workspace = true }
helium-proto = { workspace = true }
helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]}
rust_decimal = {workspace = true}
rust_decimal_macros = {workspace = true}
tonic = {workspace = true}
rand = {workspace = true}
async-trait = {workspace = true}
task-manager = { path = "../task_manager" }
http = {workspace = true}
http-serde = {workspace = true}
solana = {path = "../solana"}
solana-sdk = {workspace = true}
10 changes: 10 additions & 0 deletions boost_manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Boost Manager

### S3 Inputs

| File Type | Pattern | |
| :--- |:-----------------------| :-- |
| RewardManifest | reward_manifest.\* | [Proto](https://github.com/helium/proto/blob/149997d2a74e08679e56c2c892d7e46f2d0d1c46/src/reward_manifest.proto#L5) |
| MobileRewardShare | mobile_reward_share.\* | [Proto](https://github.com/helium/proto/blob/149997d2a74e08679e56c2c892d7e46f2d0d1c46/src/service/poc_lora.proto#L171) |


27 changes: 27 additions & 0 deletions boost_manager/migrations/1_setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This extension gives us `uuid_generate_v1mc()` which generates UUIDs that cluster better than `gen_random_uuid()`
-- while still being difficult to predict and enumerate.
-- Also, while unlikely, `gen_random_uuid()` can in theory produce collisions which can trigger spurious errors on
-- insertion, whereas it's much less likely with `uuid_generate_v1mc()`.
create extension if not exists "uuid-ossp";

create or replace function set_updated_at()
returns trigger as
$$
begin
NEW.updated_at = now();
return NEW;
end;
$$ language plpgsql;

create or replace function trigger_updated_at(tablename regclass)
returns void as
$$
begin
execute format('CREATE TRIGGER set_updated_at
BEFORE UPDATE
ON %s
FOR EACH ROW
WHEN (OLD is distinct from NEW)
EXECUTE FUNCTION set_updated_at();', tablename);
end;
$$ language plpgsql;
4 changes: 4 additions & 0 deletions boost_manager/migrations/2_meta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create table meta (
key text primary key not null,
value text
);
20 changes: 20 additions & 0 deletions boost_manager/migrations/3_activated_hexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TYPE onchain_status AS ENUM (
'queued',
'pending',
'success',
'failed',
'cancelled'
);

create table activated_hexes (
location bigint primary key not null,
activation_ts timestamptz not null,
boosted_hex_pubkey text not null,
boost_config_pubkey text not null,
status onchain_status not null,
txn_id text,
retries integer not null default 0,
inserted_at timestamptz default now(),
updated_at timestamptz default now()
);

7 changes: 7 additions & 0 deletions boost_manager/migrations/4_files_processed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE files_processed (
file_name VARCHAR PRIMARY KEY,
file_type VARCHAR NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
processed_at TIMESTAMPTZ NOT NULL,
process_name text not null default 'default'
);
44 changes: 44 additions & 0 deletions boost_manager/pkg/settings-template.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
log = "boost_manager=info,solana=debug"

# Cache location for generated boost manager outputs; Required
cache = "/tmp/oracles/boost-manager"

start_after = 1702602001

enable_solana_integration = true

activation_check_interval = 30

[solana]
# Solana RPC. This may contain a secret
rpc_url = "https://api.devnet.solana.com"
# Path to the keypair used to sign data credit burn solana transactions
start_authority_keypair = ""
# Public key of the hex boost authority
hexboost_authority_pubkey = ""
# Solana cluster to use. "devnet" or "mainnet"
cluster = "devnet"

#
[database]
url = "postgresql://postgres:postgres@localhost:5432/hexboosting"
# Max connections to the database.
max_connections = 10

[verifier]
bucket = "mobile-verified"

[output]
bucket = "mobile-verified"

[mobile_config_client]
url = "http://localhost:6090"
config_pubkey = ""
signing_keypair = ""


[metrics]

# Endpoint for metrics. Default below
#
endpoint = "127.0.0.1:19001"
153 changes: 153 additions & 0 deletions boost_manager/src/activator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use crate::{db, telemetry};
use anyhow::Result;
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore,
};
use futures::{future::LocalBoxFuture, stream, StreamExt, TryFutureExt, TryStreamExt};
use helium_proto::{
services::poc_mobile::{
mobile_reward_share::Reward as MobileReward, BoostedHex as BoostedHexProto,
MobileRewardShare,
},
Message,
};
use mobile_config::{
boosted_hex_info::BoostedHexes,
client::{hex_boosting_client::HexBoostingInfoResolver, ClientError},
};
use poc_metrics::record_duration;
use sqlx::{Pool, Postgres, Transaction};
use std::str::FromStr;
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct Activator<A> {
pool: Pool<Postgres>,
verifier_store: FileStore,
receiver: Receiver<FileInfoStream<RewardManifest>>,
hex_boosting_client: A,
}

impl<A> ManagedTask for Activator<A>
where
A: HexBoostingInfoResolver<Error = ClientError>,
{
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl<A> Activator<A>
where
A: HexBoostingInfoResolver<Error = ClientError>,
{
pub async fn new(
pool: Pool<Postgres>,
receiver: Receiver<FileInfoStream<RewardManifest>>,
hex_boosting_client: A,
verifier_store: FileStore,
) -> Result<Self> {
Ok(Self {
pool,
receiver,
hex_boosting_client,
verifier_store,
})
}

pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting Activator");
loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
msg = self.receiver.recv() => if let Some(file_info_stream) = msg {
let key = &file_info_stream.file_info.key.clone();
tracing::info!(file = %key, "Received reward manifest file");

let mut txn = self.pool.begin().await?;
let mut stream = file_info_stream.into_stream(&mut txn).await?;

while let Some(reward_manifest) = stream.next().await {
record_duration!(
"reward_index_duration",
self.handle_rewards(&mut txn, reward_manifest).await?
)
}
txn.commit().await?;
tracing::info!(file = %key, "Completed processing reward file");
telemetry::last_reward_processed_time(&self.pool, Utc::now()).await?;
}
}
}
tracing::info!("stopping Activator");
Ok(())
}

async fn handle_rewards(
&mut self,
txn: &mut Transaction<'_, Postgres>,
manifest: RewardManifest,
) -> Result<()> {
// get latest boosted hexes info from mobile config
let boosted_hexes = BoostedHexes::get_all(&self.hex_boosting_client).await?;

// get the rewards file from the manifest
let manifest_time = manifest.end_timestamp;
let reward_files = stream::iter(
manifest
.written_files
.into_iter()
.map(|file_name| FileInfo::from_str(&file_name)),
)
.boxed();

// read in the rewards file
let mut reward_shares = self.verifier_store.source_unordered(5, reward_files);

while let Some(msg) = reward_shares.try_next().await? {
let share = MobileRewardShare::decode(msg)?;
if let Some(MobileReward::RadioReward(r)) = share.reward {
for hex in r.boosted_hexes.into_iter() {
process_boosted_hex(txn, manifest_time, &boosted_hexes, &hex).await?
}
}
}
Ok(())
}
}

pub async fn process_boosted_hex(
txn: &mut Transaction<'_, Postgres>,
manifest_time: DateTime<Utc>,
boosted_hexes: &BoostedHexes,
hex: &BoostedHexProto,
) -> Result<()> {
match boosted_hexes.hexes.get(&hex.location) {
Some(info) => {
if info.start_ts.is_none() {
db::insert_activated_hex(
txn,
hex.location,
&info.boosted_hex_pubkey,
&info.boost_config_pubkey,
manifest_time,
)
.await?;
}
}
None => {
tracing::warn!(hex = %hex.location, "got an invalid boosted hex");
}
}
Ok(())
}
Loading
Loading