From c2cb8f724d1f7086ab390b388ee1a72835c05a23 Mon Sep 17 00:00:00 2001 From: Alan Chen Date: Tue, 4 Jun 2024 16:09:26 -0700 Subject: [PATCH] add: sdf publish support (#4053) * fix: fluvio_hub_util, make push_package_api pub * add: debug lines * fluvio_hub_util: allow ':' for sdf namespaces * add(hub-util): sdf endpoint consts and refactor --- crates/fluvio-hub-protocol/src/constants.rs | 11 +++++++ crates/fluvio-hub-protocol/src/lib.rs | 3 +- .../fluvio-hub-protocol/src/package_meta.rs | 4 +-- .../fluvio-hub-util/src/package_meta_ext.rs | 13 +------- crates/fluvio-hub-util/src/utils.rs | 33 ++++++++++++++++++- 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/crates/fluvio-hub-protocol/src/constants.rs b/crates/fluvio-hub-protocol/src/constants.rs index 6bf7cc4dce..e426e5d61b 100644 --- a/crates/fluvio-hub-protocol/src/constants.rs +++ b/crates/fluvio-hub-protocol/src/constants.rs @@ -16,6 +16,12 @@ pub const HUB_API_LIST_META: &str = concatcp!(HUB_API_V, "/list_with_meta"); pub const HUB_API_CONN_PKG: &str = concatcp!(HUB_API_V, "/connector/pkg"); pub const HUB_API_CONN_LIST: &str = concatcp!(HUB_API_V, "/connector/list"); +// sdf specific api +pub const HUB_API_SDF_PKG: &str = concatcp!(HUB_API_V, "/sdf/pkg"); +pub const HUB_API_SDF_LIST: &str = concatcp!(HUB_API_V, "/sdf/list"); +pub const HUB_API_SDF_PKG_PUB: &str = concatcp!(HUB_API_V, "/sdf/pkg/pub/pkg"); +pub const HUB_API_SDF_DATAFLOW_PUB: &str = concatcp!(HUB_API_V, "/sdf/pkg/pub/dataflow"); + pub const HUB_MANIFEST_BLOB: &str = "manifest.tar.gz"; pub const HUB_PACKAGE_EXT: &str = "ipkg"; pub const HUB_PACKAGE_META: &str = "package-meta.yaml"; @@ -27,3 +33,8 @@ pub const HUB_SIGNFILE_BASE: &str = "signature"; pub const DEF_CARGO_TOML_PATH: &str = "Cargo.toml"; pub const DEF_HUB_INIT_DIR: &str = ".hub"; pub const DEF_HUB_PKG_META: &str = concatcp!(DEF_HUB_INIT_DIR, "/", HUB_PACKAGE_META); // .hub/package-meta.yaml + +// This is required in sdf hub package_meta manifests +pub const SDF_PKG_KIND: &str = "sdf-kind"; +pub const SDF_PKG_KIND_DATAFLOW: &str = "dataflow"; +pub const SDF_PKG_KIND_PACKAGE: &str = "pkg"; diff --git a/crates/fluvio-hub-protocol/src/lib.rs b/crates/fluvio-hub-protocol/src/lib.rs index 773c60b109..079ee71950 100644 --- a/crates/fluvio-hub-protocol/src/lib.rs +++ b/crates/fluvio-hub-protocol/src/lib.rs @@ -5,4 +5,5 @@ pub mod constants; pub mod infinyon_tok; pub use errors::{Result, HubError}; -pub use package_meta::{PackageMeta, PkgTag, PkgVisibility, validate_noleading_punct}; +pub use package_meta::{PackageMeta, PkgTag, PkgVisibility}; +pub use package_meta::{validate_allowedchars, validate_noleading_punct}; diff --git a/crates/fluvio-hub-protocol/src/package_meta.rs b/crates/fluvio-hub-protocol/src/package_meta.rs index a2052c50f0..dbe60b098f 100644 --- a/crates/fluvio-hub-protocol/src/package_meta.rs +++ b/crates/fluvio-hub-protocol/src/package_meta.rs @@ -236,10 +236,10 @@ pub fn validate_lowercase(val: &str, name: &str) -> String { pub fn validate_allowedchars(val: &str, name: &str) -> String { let good_chars = val .chars() - .all(|ch| matches!(ch, 'a'..='z' | '0'..='9' | '-' | '_')); + .all(|ch| matches!(ch, 'a'..='z' | '0'..='9' | ':' | '-' | '_')); if !good_chars { - format!("{name} {val} should be alphanumeric, '-' or '_'\n") + format!("{name} {val} should be alphanumeric, ':', '-' or '_'\n") } else { String::new() } diff --git a/crates/fluvio-hub-util/src/package_meta_ext.rs b/crates/fluvio-hub-util/src/package_meta_ext.rs index f6482639f0..38934a0c23 100644 --- a/crates/fluvio-hub-util/src/package_meta_ext.rs +++ b/crates/fluvio-hub-util/src/package_meta_ext.rs @@ -6,6 +6,7 @@ use tracing::debug; use fluvio_hub_protocol::{PackageMeta, HubError}; use fluvio_hub_protocol::constants::HUB_PACKAGE_META; +use fluvio_hub_protocol::validate_allowedchars; use crate::package_get_topfile; @@ -85,18 +86,6 @@ pub fn validate_lowercase(val: &str, name: &str) -> String { } } -pub fn validate_allowedchars(val: &str, name: &str) -> String { - let good_chars = val - .chars() - .all(|ch| matches!(ch, 'a'..='z' | '0'..='9' | '-' | '_')); - - if !good_chars { - format!("{name} {val} should be alphanumeric, '-' or '_'\n") - } else { - String::new() - } -} - /// certain output files are transformed in name vs their package name /// eg. a cargo package named example-smartmodule generates /// a release file of example_smartmodule.wasm diff --git a/crates/fluvio-hub-util/src/utils.rs b/crates/fluvio-hub-util/src/utils.rs index 525949ae54..b92e1cf578 100644 --- a/crates/fluvio-hub-util/src/utils.rs +++ b/crates/fluvio-hub-util/src/utils.rs @@ -168,7 +168,35 @@ pub async fn push_package_conn(pkgpath: &str, access: &HubAccess, target: &str) push_package_api(&url, pkgpath, access).await } -async fn push_package_api(put_url: &str, pkgpath: &str, access: &HubAccess) -> Result<()> { +/// push package to connector api +pub async fn push_package_sdf(pkgpath: &str, access: &HubAccess) -> Result<()> { + use crate::{ + SDF_PKG_KIND, SDF_PKG_KIND_DATAFLOW, SDF_PKG_KIND_PACKAGE, HUB_API_SDF_DATAFLOW_PUB, + HUB_API_SDF_PKG_PUB, + }; + + info!("sdf package push form: {pkgpath}"); + let pm = package_get_meta(pkgpath)?; + let Some(sdf_kind) = pm.tag_get(SDF_PKG_KIND) else { + let msg = format!("Invalid sdf hub package_meta: missing tag for {SDF_PKG_KIND}"); + return Err(HubError::PackagePublish(msg)); + }; + let sdf_kind = sdf_kind.first().cloned().unwrap_or_default().value; + let endpoint = match sdf_kind.as_str() { + SDF_PKG_KIND_DATAFLOW => HUB_API_SDF_DATAFLOW_PUB, + SDF_PKG_KIND_PACKAGE => HUB_API_SDF_PKG_PUB, + _ => { + let msg = format!("Invalid sdf hub package_meta {SDF_PKG_KIND}: {sdf_kind}"); + return Err(HubError::PackagePublish(msg)); + } + }; + let host = &access.remote; + let url = format!("{host}/{endpoint}/{}/{}/{}", pm.group, pm.name, pm.version); + debug!(url, "package url"); + push_package_api(&url, pkgpath, access).await +} + +pub async fn push_package_api(put_url: &str, pkgpath: &str, access: &HubAccess) -> Result<()> { let pm = package_get_meta(pkgpath)?; packagename_validate(&pm.name)?; @@ -185,8 +213,11 @@ async fn push_package_api(put_url: &str, pkgpath: &str, access: &HubAccess) -> R ))); } + tracing::debug!("get action token"); let pkg_bytes = std::fs::read(pkgpath)?; let actiontoken = access.get_publish_token().await?; + + tracing::debug!(url = put_url, "put package"); let req = http::Request::put(put_url) .header("Authorization", &actiontoken) .header(http::header::CONTENT_TYPE, mime::OCTET_STREAM.as_str())