Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split into two crates/use async #102

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
622 changes: 486 additions & 136 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 2 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,2 @@
[package]
name = "ntripping"
version = "1.4.0"
authors = ["Swift Navigation <[email protected]>"]
edition = "2018"
publish = true

[dependencies]
curl = "^0.4.44"
clap = { version = "4", features = ["derive"] }
chrono = "0.4"

[build-dependencies]
vergen = "3"
[workspace]
members = ["ntripping", "ntripping-cli"]
21 changes: 21 additions & 0 deletions ntripping-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "ntripping-cli"
version = "1.4.0"
authors = ["Swift Navigation <[email protected]>"]
edition = "2021"
publish = true

[[bin]]
name = "ntripping"
path = "src/main.rs"

[dependencies]
chrono = "0.4.23"
clap = { version = "4", features = ["derive"] }
futures-util = "0.3.25"
ntripping = { path = "../ntripping" }
tokio = { version = "1.23.0", features = ["rt", "time", "io-std", "io-util"] }
tracing-subscriber = "0.3.16"

[build-dependencies]
vergen = "3"
File renamed without changes.
183 changes: 183 additions & 0 deletions ntripping-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::time::{Duration, SystemTime};

use chrono::{DateTime, Utc};
use clap::Parser;
use futures_util::{SinkExt, TryStreamExt};
use tokio::io::{self, AsyncWriteExt};
use tracing_subscriber::filter::LevelFilter;

use ntripping::{
sentence::{Cra, Gga, Sentence},
Auth, Client,
};

#[derive(Debug, clap::Parser)]
#[clap(
name = "ntripping",
about = "NTRIP command line client.",
version = env!("VERGEN_SEMVER_LIGHTWEIGHT"),
)]
struct Cli {
/// URL of the NTRIP caster
#[clap(long, default_value = "na.skylark.swiftnav.com:2101/CRS")]
url: String,

/// Receiver latitude to report, in degrees
#[clap(long, default_value_t = 37.77101999622968, allow_hyphen_values = true)]
lat: f64,

/// Receiver longitude to report, in degrees
#[clap(
long,
default_value_t = -122.40315159140708,
allow_hyphen_values = true
)]
lon: f64,

/// Receiver height to report, in meters
#[clap(long, default_value_t = -5.549358852471994, allow_hyphen_values = true)]
height: f64,

/// Client ID
#[clap(
long,
default_value = "00000000-0000-0000-0000-000000000000",
alias = "client"
)]
client_id: String,

/// Verbosity level, can be specified multiple times
#[clap(short, long, action = clap::ArgAction::Count)]
verbose: u8,

/// Receiver time to report, as a Unix time
#[clap(long)]
epoch: Option<u32>,

/// Username credentials
#[clap(long)]
username: Option<String>,

/// Password credentials
#[clap(long)]
password: Option<String>,

/// GGA update period, in seconds. 0 means to never send a GGA
#[clap(long, default_value_t = 10)]
gga_period: u64,

/// Set the ntrip-gga header
#[clap(long)]
gga_header: bool,

/// Request counter allows correlation between message sent and acknowledgment response from corrections stream
#[clap(long, default_value_t = 0)]
request_counter: u8,

/// Area ID to be used in generation of CRA message. If this flag is set, ntripping outputs messages of type CRA rather than the default GGA
#[clap(long)]
area_id: Option<u32>,

/// Field specifying which types of corrections are to be received
#[clap(long)]
corrections_mask: Option<u16>,

/// Solution ID, the identifier of the connection stream to reconnect to in the event of disconnections
#[clap(long)]
solution_id: Option<u8>,
}

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

async fn run(opt: Cli) -> Result<()> {
let now = || -> DateTime<Utc> {
if let Some(epoch) = opt.epoch {
SystemTime::UNIX_EPOCH + Duration::from_secs(epoch.into())
} else {
SystemTime::now()
}
.into()
};

let auth = if let (Some(username), Some(password)) = (opt.username, opt.password) {
Some(Auth::new(username, password))
} else {
None
};

let msg: Sentence = if let Some(area_id) = opt.area_id {
Cra::new()
.with_area_id(area_id)
.with_corrections_mask(opt.corrections_mask)
.with_solution_id(opt.solution_id)
.into()
} else {
Gga::new()
.with_time(now())
.with_lat(opt.lat)
.with_lon(opt.lon)
.with_height(opt.height)
.into()
};

let client = {
let client = Client::new().with_client_id(opt.client_id).with_auth(auth);
if opt.gga_header {
client.with_ntrip_gga(msg)
} else {
client
}
};

let (mut sink, mut stream) = {
let url = if !opt.url.starts_with("http://") && !opt.url.starts_with("https://") {
format!("https://{}", opt.url)
} else {
opt.url
};
let uri = url.parse()?;
client.connect(uri).await?.split()
};

let writer_task = tokio::spawn(async move {
let mut out = io::stdout();
while let Some(data) = stream.try_next().await? {
out.write_all(&data).await?;
}
Result::Ok(())
});

if opt.gga_period == 0 {
return writer_task.await?;
}

let mut request_counter = opt.request_counter;
let mut gga_interval = tokio::time::interval(Duration::from_secs(opt.gga_period));
loop {
if writer_task.is_finished() {
break;
}
gga_interval.tick().await;
let sentence = msg.with_time(now()).with_request_counter(request_counter);
let _ = sink.send(sentence).await;
request_counter = request_counter.wrapping_add(1);
}

Ok(())
}

fn main() -> Result<()> {
let opt = Cli::parse();
tracing_subscriber::fmt::fmt()
.with_max_level(match opt.verbose {
Copy link
Contributor

@silverjam silverjam Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this to work, do we need to set RUST_LOG and add a logger (or is the tracing subscriber supposed to handle this)? env_logger? As of right now the -v option isn't showing anything for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't need it set, try -vv? I guess I have nothing logging at the debug level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looks like I do, and I see them with just -v

    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `target/debug/ntripping -v`
2022-12-15T22:54:51.013819Z DEBUG ntripping: connect addr=na.skylark.swiftnav.com:2101

Copy link
Contributor

@silverjam silverjam Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little concerned that we're losing some user facing features, like connection logging from cURL:

image

Versus:

image

So we're losing logging around a few things:

  • Response headers
  • GGA/CRA string sent up
  • Connection state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not rush on this probably should have made this pr a draft. the metrics project I wanna use this for just got pushed back again. Just have been missing rust a bit

I also still want to add

  • connection pooling so requests to different mountpoints but the same host share a connection
  • automatic retry, plus a way to hook into the retries (so you can, for example, have the retry send a CRA which a certain correction mask)
  • maybe sourcetable types? I was thinking it might be nice to just point this too-be-built app at a caster and have it discover what mountpoints to monitor

So definitely can make it feature complete with the old version. Could also try a version with curl, but crate we use now is pretty much all !Send + !Sync which is a little annoying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe https://docs.rs/isahc/latest/isahc/ would work as an alternative? It's purports to be a more "Rusty" wrapper of cURL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's no easy way to enable HTTP 0.9 in this library though, so might have to be hacked in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's the first library I tried but saw it had and old open issue about that, but maybe it wouldn't be too hard to add

0 => LevelFilter::WARN,
1 => LevelFilter::DEBUG,
_ => LevelFilter::TRACE,
})
.compact()
.init();
tokio::runtime::Builder::new_current_thread()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default behavior has changed, with no user provided switches we now attempt to connect to Skylark and return error 403. We should return the old behavior of doing nothing, or emit a help message if they don't specify anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current default behavior is to try and connect as well

➜  ntripping git:(steve/updates) ✗ ntripping --verbose
*   Trying 50.112.176.233:2101...
* Connected to na.skylark.swiftnav.com (50.112.176.233) port 2101 (#0)
> GET /CRS HTTP/1.1
Host: na.skylark.swiftnav.com:2101
User-Agent: NTRIP ntrip-client/1.0
Accept: */*
Ntrip-Version: Ntrip/2.0
X-SwiftNav-Client-Id: 00000000-0000-0000-0000-000000000000
Expect: 100-continue

* Mark bundle as not supporting multiuse
< HTTP/1.1 401 Unauthorized
< Connection: close
< 
{ * Closing connection 0
➜  ntripping git:(steve/updates) ✗ echo $?
0

But right now this version returns a non-zero status code

    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `target/debug/ntripping -v`
2022-12-15T22:56:06.736849Z DEBUG ntripping: connect addr=na.skylark.swiftnav.com:2101
2022-12-15T22:56:07.103324Z DEBUG hyper::proto::h1::io: flushed 215 bytes
2022-12-15T22:56:07.139357Z DEBUG hyper::proto::h1::io: parsed 1 headers
2022-12-15T22:56:07.139480Z DEBUG hyper::proto::h1::conn: incoming body is close-delimited
Error: BadStatus(401)
➜  ntripping git:(steve/updates) ✗ echo $?
1

But yeah the default behavior probably should exit successfully

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a public endpoint we could default to? Maybe something that returns a source table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current default behavior is to try and connect as well

Hrmm, guess I never noticed it was trying to connect because it was returning success

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a public endpoint we could default to? Maybe something that returns a source table?

This returns a source table

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, forgot that isn't protected by the auth

.enable_all()
.build()?
.block_on(run(opt))
}
16 changes: 16 additions & 0 deletions ntripping/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "ntripping"
version = "1.4.0"
authors = ["Swift Navigation <[email protected]>"]
edition = "2021"
publish = true

[dependencies]
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
futures-channel = { version = "0.3", features = ["sink"] }
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
hyper = { version = "1.0.0-rc.1", features = ["client", "http1"] }
tokio = { version = "1.23", features = ["net", "rt"] }
tracing = "0.1"
7 changes: 7 additions & 0 deletions ntripping/src/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#![allow(clippy::declare_interior_mutable_const)]

pub use hyper::http::header::*;

pub const NTRIP_GGA: HeaderName = HeaderName::from_static("ntrip-gga");
pub const NTRIP_VERSION: HeaderName = HeaderName::from_static("ntrip-version");
pub const SWIFT_CLIENT_ID: HeaderName = HeaderName::from_static("x-swiftnav-client-id");
Loading