diff --git a/.github/workflows/cd_dev.yaml b/.github/workflows/cd_dev.yaml index 024e7d2ace..a972813d4e 100644 --- a/.github/workflows/cd_dev.yaml +++ b/.github/workflows/cd_dev.yaml @@ -27,7 +27,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest] + os: [ubuntu-latest,macos-latest] cluster_type: [local,k8] env: SLEEP: 10 @@ -36,25 +36,32 @@ jobs: steps: - uses: actions/checkout@v2 - - uses: nolar/setup-k3d-k3s@v1 - name: Install Fluvio CLI - run: | - curl -fsS https://packages.fluvio.io/v1/install.sh | VERSION=latest bash + run: curl -fsS https://packages.fluvio.io/v1/install.sh | VERSION=latest bash + - name: Set up K3d for Ubuntu + uses: nolar/setup-k3d-k3s@v1 + if: ${{ matrix.os == 'ubuntu-latest' }} + - name: Set up K8 for ubuntu(kind) + if: ${{ matrix.os == 'ubuntu-latest' }} + run: ./k8-util/cluster/reset-k3d.sh + - name: Set up Minikube for Mac + if: ${{ matrix.os == 'macos-latest' }} + run: ./k8-util/cluster/start-minikube-mac.sh - name: Create Fluvio cluster and run E2E Test uses: nick-invision/retry@v2 with: timeout_minutes: 7 max_attempts: 3 command: | - ./k8-util/cluster/reset-k3d.sh export PATH=~/.fluvio/bin:$PATH if [[ ${{ matrix.cluster_type }} == 'local' ]]; then echo "Installing local cluster" + fluvio cluster delete fluvio cluster start --local else echo "Installing k8 cluster" + fluvio cluster delete fluvio cluster start --image-version latest - fi sleep 30 fluvio version diff --git a/k8-util/cluster/reset-minikube.sh b/k8-util/cluster/reset-minikube.sh index aeb954e4a9..e0e254d5d7 100755 --- a/k8-util/cluster/reset-minikube.sh +++ b/k8-util/cluster/reset-minikube.sh @@ -3,14 +3,23 @@ # it uses docker as default driver set -e set -x -ARG1=${1:-docker} K8_VERSION=${2:-1.21.2} +# set up default driver, use hyperkit for mac if [ "$(uname)" == "Darwin" ]; then -EXTRA_CONFIG=--extra-config=apiserver.service-node-port-range=32700-32800 --ports=127.0.0.1:32700-32800:32700-32800 + +DRIVER=${1:-hyperkit} +echo "Using driver: $DRIVER" +# for mac, if driver is docker, set up proxy +if [ ${DRIVER} == "docker" ]; then +EXTRA_CONFIG="--extra-config=apiserver.service-node-port-range=32700-32800 --ports=127.0.0.1:32700-32800:32700-32800" +fi + +else +DRIVER=${1:-docker} fi minikube delete -minikube start --driver $ARG1 --kubernetes-version=$K8_VERSION $EXTRA_CONFIG +minikube start --driver $DRIVER --kubernetes-version=$K8_VERSION $EXTRA_CONFIG # minikube start --extra-config=apiserver.v=10 \ No newline at end of file diff --git a/k8-util/cluster/start-minikube-mac.sh b/k8-util/cluster/start-minikube-mac.sh new file mode 100755 index 0000000000..7f234bddc2 --- /dev/null +++ b/k8-util/cluster/start-minikube-mac.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -x +brew install minikube +minikube config set memory 16384 +minikube start --driver virtualbox --kubernetes-version=1.21.2 +kubectl get nodes \ No newline at end of file diff --git a/src/cluster/src/cli/start/k8.rs b/src/cluster/src/cli/start/k8.rs index ef779d51c6..dad7fc1882 100644 --- a/src/cluster/src/cli/start/k8.rs +++ b/src/cluster/src/cli/start/k8.rs @@ -33,6 +33,7 @@ pub async fn process_k8( .chart_values(opt.k8_config.chart_values) .render_checks(true) .upgrade(upgrade) + .proxy_addr(opt.proxy_addr) .with_if(skip_sys, |b| b.install_sys(false)) .with_if(opt.skip_checks, |b| b.skip_checks(true)); diff --git a/src/cluster/src/cli/start/mod.rs b/src/cluster/src/cli/start/mod.rs index 06f89b54cc..2f4b51c663 100644 --- a/src/cluster/src/cli/start/mod.rs +++ b/src/cluster/src/cli/start/mod.rs @@ -123,6 +123,10 @@ pub struct StartOpt { /// Tries to setup necessary environment for cluster startup #[structopt(long)] pub setup: bool, + + /// Proxy address + #[structopt(long)] + pub proxy_addr: Option, } impl StartOpt { diff --git a/src/cluster/src/start/k8.rs b/src/cluster/src/start/k8.rs index 553fd1742c..1b38ae4b8a 100644 --- a/src/cluster/src/start/k8.rs +++ b/src/cluster/src/start/k8.rs @@ -684,6 +684,9 @@ impl ClusterInstaller { } }; + if let Some(proxy) = &self.config.proxy_addr { + println!("Using proxy addr: {}", proxy); + } self.install_app().await?; let namespace = &self.config.namespace; let (address, port) = self @@ -760,30 +763,37 @@ impl ClusterInstaller { chart_values.push(np_conf_path.to_path_buf()); debug!("Using NodePort service type"); - debug!("Getting external IP from K8s node"); - let kube_client = &self.kube_client; - debug!("Trying to query for Nodes"); + let external_addr = if let Some(addr) = &self.config.proxy_addr { + debug!(?addr, "use proxying"); + addr.to_owned() + } else { + debug!("Getting external IP from K8s node"); + let kube_client = &self.kube_client; - let nodes = kube_client.retrieve_items::("").await?; + debug!("Trying to query for Nodes"); - debug!("Results from Node query: {:#?}", &nodes); + let nodes = kube_client.retrieve_items::("").await?; - let mut node_addr: Vec = Vec::new(); - for n in nodes.items.into_iter().map(|x| x.status.addresses) { - node_addr.extend(n) - } + debug!("Results from Node query: {:#?}", &nodes); - debug!("Node Addresses: {:#?}", node_addr); + let mut node_addr: Vec = Vec::new(); + for n in nodes.items.into_iter().map(|x| x.status.addresses) { + node_addr.extend(n) + } - let external_addr = node_addr - .into_iter() - .find(|a| a.r#type == "InternalIP") - .ok_or_else(|| K8InstallError::Other("No nodes with InternalIP set".into()))?; + debug!("Node Addresses: {:#?}", node_addr); + + node_addr + .into_iter() + .find(|a| a.r#type == "InternalIP") + .ok_or_else(|| K8InstallError::Other("No nodes with InternalIP set".into()))? + .address + }; // Set this annotation w/ the external address by overriding this Helm chart value: let mut ingress_address = BTreeMap::new(); - ingress_address.insert("fluvio.io/ingress-address", external_addr.address); + ingress_address.insert("fluvio.io/ingress-address", external_addr); let mut service_annotation = BTreeMap::new(); service_annotation.insert("serviceAnnotations", ingress_address); @@ -791,6 +801,8 @@ impl ClusterInstaller { let mut helm_lb_config = BTreeMap::new(); helm_lb_config.insert("loadBalancer", service_annotation); + debug!(?helm_lb_config, "helm_lb_config"); + serde_yaml::to_writer(&np_addr_fd, &helm_lb_config) .map_err(|err| K8InstallError::Other(err.to_string()))?; Some((np_addr_fd, np_conf_path)) @@ -921,20 +933,27 @@ impl ClusterInstaller { LoadBalancerType::NodePort => { let node_port = node_port.ok_or_else(|| K8InstallError::Other("Expecting a NodePort port".into()))?; - debug!("k8 node query"); - let nodes = self.kube_client.retrieve_items::(ns).await?; - debug!("Output from k8 node query: {:#?}", &nodes); + let extern_addr = if let Some(addr) = &self.config.proxy_addr { + debug!(?addr,"using proxy"); + addr.to_owned() + } else { - let mut node_addr : Vec = Vec::new(); - for n in nodes.items.into_iter().map(|x| x.status.addresses ) { - node_addr.extend(n) - } + debug!("k8 node query"); + let nodes = self.kube_client.retrieve_items::(ns).await?; + debug!("Output from k8 node query: {:#?}", &nodes); + + let mut node_addr : Vec = Vec::new(); + for n in nodes.items.into_iter().map(|x| x.status.addresses ) { + node_addr.extend(n) + } + + // Return the first node with type "InternalIP" + node_addr.into_iter().find(|a| a.r#type == "InternalIP") + .ok_or_else(|| K8InstallError::Other("No nodes with InternalIP set".into()))?.address + }; - // Return the first node with type "InternalIP" - let external_addr = node_addr.into_iter().find(|a| a.r#type == "InternalIP") - .ok_or_else(|| K8InstallError::Other("No nodes with InternalIP set".into()))?; + return Ok(Some((format!("{}:{}",extern_addr,node_port),node_port))) - return Ok(Some((format!("{}:{}",external_addr.address,node_port),node_port))) }, LoadBalancerType::LoadBalancer => { let ingress_addr = service @@ -1023,12 +1042,20 @@ impl ClusterInstaller { /// Wait until the Fluvio SC public service appears in Kubernetes async fn wait_for_sc_port_check(&self, sock_addr_str: &str) -> Result<(), K8InstallError> { - info!(sock_addr = %sock_addr_str, "waiting for SC port check"); for i in 0..*MAX_SC_NETWORK_LOOP { - let sock_addr = self.wait_for_sc_dns(sock_addr_str).await?; - if TcpStream::connect(&*sock_addr).await.is_ok() { - info!(sock_addr = %sock_addr_str, "finished SC port check"); - return Ok(()); + if self.config.proxy_addr.is_none() { + debug!("resolving socket addr: {}", sock_addr_str); + let sock_addr = self.wait_for_sc_dns(sock_addr_str).await?; + if TcpStream::connect(&*sock_addr).await.is_ok() { + info!(sock_addr = %sock_addr_str, "finished SC port check"); + return Ok(()); + } + } else { + debug!("trying to connect to proxy: {}", sock_addr_str); + if TcpStream::connect(&*sock_addr_str).await.is_ok() { + info!(sock_addr = %sock_addr_str, "finished SC port check"); + return Ok(()); + } } info!( attempt = i, diff --git a/tests/runner/src/utils/setup/environment/k8.rs b/tests/runner/src/utils/setup/environment/k8.rs index 3e14877c20..f380172157 100644 --- a/tests/runner/src/utils/setup/environment/k8.rs +++ b/tests/runner/src/utils/setup/environment/k8.rs @@ -28,8 +28,15 @@ impl TestEnvironmentDriver for K8EnvironmentDriver { let mut builder = ClusterConfig::builder(version); if self.option.develop_mode() { builder.development().expect("should test in develop mode"); + } else { + // check if image version is specified + if let Some(image) = &self.option.image_version { + builder.image_tag(image); + } } + builder + .proxy_addr(self.option.proxy_addr.clone()) .spu_replicas(self.option.spu()) .skip_checks(self.option.skip_checks()) .save_profile(true);