Skip to content

Commit

Permalink
serve communities
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyureka committed Feb 6, 2024
1 parent e1d9065 commit 70146dd
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 10 deletions.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

preBuild = ''
cp -r ${final.buildPackages.fernglas-frontend} ./static
cp ${final.buildPackages.communities-json} ./src/communities.json
'';

version =
Expand Down
176 changes: 166 additions & 10 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::store::{NetQuery, Query, QueryLimits, QueryResult, Store};
use axum::body::StreamBody;
use axum::extract::FromRef;
use axum::extract::{Query as AxumQuery, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
Expand All @@ -10,7 +11,11 @@ use hickory_resolver::config::LookupIpStrategy;
use hickory_resolver::TokioAsyncResolver;
use ipnet::IpNet;
use log::*;
use regex::Regex;
use regex::RegexSet;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::Infallible;
use std::net::IpAddr;
Expand All @@ -21,6 +26,8 @@ use std::sync::Arc;
#[cfg(feature = "embed-static")]
static STATIC_DIR: include_dir::Dir<'_> = include_dir::include_dir!("$CARGO_MANIFEST_DIR/static");

static COMMUNITIES_LIST: &[u8] = include_bytes!("communities.json");

fn default_asn_dns_zone() -> Option<String> {
Some("as{}.asn.cymru.com.".to_string())
}
Expand All @@ -36,6 +43,8 @@ pub struct ApiServerConfig {
/// Dns zone used for ASN lookups
#[serde(default = "default_asn_dns_zone")]
pub asn_dns_zone: Option<String>,
/// Path to alternative communities.json
communities_file: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
Expand All @@ -49,6 +58,10 @@ pub enum ApiResult {
asn: u32,
asn_name: String,
},
CommunityDescription {
community: String,
community_description: String,
},
}

// Make our own error that wraps `anyhow::Error`.
Expand Down Expand Up @@ -76,6 +89,32 @@ where
}
}

#[derive(Clone)]
struct AppState<T: Clone> {
cfg: Arc<ApiServerConfig>,
resolver: TokioAsyncResolver,
community_lists: Arc<CompiledCommunitiesLists>,
store: T,
}

impl<T: Clone> FromRef<AppState<T>> for Arc<ApiServerConfig> {
fn from_ref(app_state: &AppState<T>) -> Self {
app_state.cfg.clone()
}
}

impl<T: Clone> FromRef<AppState<T>> for TokioAsyncResolver {
fn from_ref(app_state: &AppState<T>) -> Self {
app_state.resolver.clone()
}
}

impl<T: Clone> FromRef<AppState<T>> for Arc<CompiledCommunitiesLists> {
fn from_ref(app_state: &AppState<T>) -> Self {
app_state.community_lists.clone()
}
}

async fn parse_or_resolve(resolver: &TokioAsyncResolver, name: String) -> anyhow::Result<IpNet> {
if let Ok(net) = name.parse() {
return Ok(net);
Expand All @@ -93,8 +132,82 @@ async fn parse_or_resolve(resolver: &TokioAsyncResolver, name: String) -> anyhow
.into())
}

#[derive(Deserialize)]
struct CommunitiesLists {
regular: CommunitiesList,
large: CommunitiesList,
}
impl CommunitiesLists {
fn compile(self) -> anyhow::Result<CompiledCommunitiesLists> {
Ok(CompiledCommunitiesLists {
regular: self.regular.compile()?,
large: self.large.compile()?,
})
}
}

struct CompiledCommunitiesLists {
regular: CompiledCommunitiesList,
large: CompiledCommunitiesList,
}

#[derive(Deserialize)]
struct CommunitiesList(HashMap<String, String>);

impl CommunitiesList {
fn compile(self) -> anyhow::Result<CompiledCommunitiesList> {
let mut sorted = self.0.into_iter().collect::<Vec<_>>();
sorted.sort_by(|a, b| a.0.len().cmp(&b.0.len()));
Ok(CompiledCommunitiesList {
regex_set: RegexSet::new(sorted.iter().map(|(regex, _desc)| format!("^{}$", regex)))?,
list: sorted
.into_iter()
.map(|(key, value)| Ok((Regex::new(&format!("^{}$", key))?, value)))
.collect::<anyhow::Result<_>>()?,
})
}
}

struct CompiledCommunitiesList {
regex_set: RegexSet,
list: Vec<(Regex, String)>,
}
impl CompiledCommunitiesList {
fn lookup(&self, community: &str) -> Option<Cow<str>> {
self.regex_set
.matches(community)
.iter()
.next()
.map(|index| {
let (regex, desc) = &self.list[index];
let mut desc_templated: Cow<str> = desc.into();
for (i, subcapture) in regex
.captures(community)
.unwrap()
.iter()
.skip(1)
.enumerate()
{
if let Some(subcapture) = subcapture {
let searchstr = format!("${}", i);
if desc_templated.contains(&searchstr) {
desc_templated =
desc_templated.replace(&searchstr, subcapture.into()).into()
}
}
}
desc_templated
})
}
}

async fn query<T: Store>(
State((cfg, resolver, store)): State<(Arc<ApiServerConfig>, TokioAsyncResolver, T)>,
State(AppState {
cfg,
resolver,
store,
community_lists,
}): State<AppState<T>>,
AxumQuery(query): AxumQuery<Query<String>>,
) -> Result<impl IntoResponse, AppError> {
trace!("request: {}", serde_json::to_string_pretty(&query).unwrap());
Expand Down Expand Up @@ -126,6 +239,8 @@ async fn query<T: Store>(
// for deduplicating the nexthop resolutions
let mut have_resolved = HashSet::new();
let mut have_asn = HashSet::new();
let mut have_community = HashSet::new();
let mut have_large_community = HashSet::new();

let stream = store
.get_routes(query)
Expand Down Expand Up @@ -182,6 +297,35 @@ async fn query<T: Store>(
}
}
}
for community in route.attrs.communities.into_iter().flat_map(|x| x) {
if have_community.insert(community) {
let community_str = format!("{}:{}", community.0, community.1);
if let Some(lookup) = community_lists.regular.lookup(&community_str) {
futures.push(Box::pin(futures_util::future::ready(Some(
ApiResult::CommunityDescription {
community: community_str,
community_description: lookup.to_string(),
},
))));
}
}
}
for large_community in route.attrs.large_communities.into_iter().flat_map(|x| x) {
if have_large_community.insert(large_community) {
let large_community_str = format!(
"{}:{}:{}",
large_community.0, large_community.1, large_community.2
);
if let Some(lookup) = community_lists.large.lookup(&large_community_str) {
futures.push(Box::pin(futures_util::future::ready(Some(
ApiResult::CommunityDescription {
community: large_community_str,
community_description: lookup.to_string(),
},
))));
}
}
}

futures
})
Expand All @@ -194,22 +338,35 @@ async fn query<T: Store>(
Ok(StreamBody::new(stream))
}

async fn routers<T: Store>(
State((_, _, store)): State<(Arc<ApiServerConfig>, TokioAsyncResolver, T)>,
) -> impl IntoResponse {
async fn routers<T: Store>(State(AppState { store, .. }): State<AppState<T>>) -> impl IntoResponse {
serde_json::to_string(&store.get_routers()).unwrap()
}

fn make_api<T: Store>(cfg: ApiServerConfig, store: T) -> anyhow::Result<Router> {
async fn make_api<T: Store>(cfg: ApiServerConfig, store: T) -> anyhow::Result<Router> {
let resolver = {
let (rcfg, mut ropts) = hickory_resolver::system_conf::read_system_conf()?;
ropts.ip_strategy = LookupIpStrategy::Ipv6thenIpv4; // strange people set strange default settings
TokioAsyncResolver::tokio(rcfg, ropts)
};

let community_lists: CommunitiesLists = if let Some(ref path) = cfg.communities_file {
let path = path.clone();
serde_json::from_slice(&tokio::task::spawn_blocking(move || std::fs::read(path)).await??)?
} else {
serde_json::from_slice(COMMUNITIES_LIST)?
};

let community_lists = Arc::new(community_lists.compile()?);

Ok(Router::new()
.route("/query", get(query::<T>))
.route("/routers", get(routers::<T>))
.with_state((Arc::new(cfg), resolver, store)))
.with_state(AppState {
cfg: Arc::new(cfg),
resolver,
store,
community_lists,
}))
}

/// This handler serializes the metrics into a string for Prometheus to scrape
Expand All @@ -222,8 +379,8 @@ pub async fn get_metrics() -> (StatusCode, String) {

#[cfg(feature = "embed-static")]
async fn static_path(axum::extract::Path(path): axum::extract::Path<String>) -> impl IntoResponse {
use axum::body::Full;
use axum::body::Empty;
use axum::body::Full;
use axum::http::header;
use axum::http::header::HeaderValue;

Expand Down Expand Up @@ -259,11 +416,10 @@ pub async fn run_api_server<T: Store>(
}

router = router
.nest("/api", make_api(cfg.clone(), store)?)
.nest("/api", make_api(cfg.clone(), store).await?)
.route("/metrics", get(get_metrics));

let make_service = router
.into_make_service();
let make_service = router.into_make_service();

axum::Server::bind(&cfg.bind)
.serve(make_service)
Expand Down

0 comments on commit 70146dd

Please sign in to comment.