Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming e2e tests #391

Merged
merged 1 commit into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ jobs:
env:
- TEST_GROUP=cassandra

- stage: build
name: "Run e2e streaming tests"
install:
- "./.travis/setupMinikube.sh"
script:
- "./.travis/rune2eTests.sh"
env:
- TEST_GROUP=streaming

- stage: deploy
name: "Publish latest image"
env:
Expand Down
4 changes: 4 additions & 0 deletions .travis/rune2eTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ then
echo "Running Cassandra Tests"
make cassandra
make e2e-tests-cassandra
elif [ "${TEST_GROUP}" = "streaming" ]
then
echo "Running Streaming Tests"
make e2e-tests-streaming
else
echo "Unknown TEST_GROUP [${TEST_GROUP}]"; exit 1
fi
Expand Down
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ VERSION_PKG ?= "github.com/jaegertracing/jaeger-operator/pkg/version"
JAEGER_VERSION ?= "$(shell grep -v '\#' jaeger.version)"
OPERATOR_VERSION ?= "$(shell git describe --tags)"
STORAGE_NAMESPACE ?= "${shell kubectl get sa default -o jsonpath='{.metadata.namespace}' || oc project -q}"
KAFKA_NAMESPACE ?= "kafka"
ES_OPERATOR_NAMESPACE = openshift-logging

LD_FLAGS ?= "-X $(VERSION_PKG).version=$(OPERATOR_VERSION) -X $(VERSION_PKG).buildDate=$(VERSION_DATE) -X $(VERSION_PKG).defaultJaeger=$(JAEGER_VERSION)"
Expand Down Expand Up @@ -104,6 +105,11 @@ e2e-tests-self-provisioned-es: prepare-e2e-tests deploy-es-operator
@echo Running Self provisioned Elasticsearch end-to-end tests...
@go test -tags=self_provisioned_elasticsearch ./test/e2e/... -kubeconfig $(KUBERNETES_CONFIG) -namespacedMan ../../deploy/test/namespace-manifests.yaml -globalMan ../../deploy/crds/jaegertracing_v1_jaeger_crd.yaml -root .

.PHONY: e2e-tests-streaming
e2e-tests-streaming: prepare-e2e-tests es kafka
@echo Running Streaming end-to-end tests...
@STORAGE_NAMESPACE=$(STORAGE_NAMESPACE) KAFKA_NAMESPACE=$(KAFKA_NAMESPACE) go test -tags=streaming ./test/e2e/... -kubeconfig $(KUBERNETES_CONFIG) -namespacedMan ../../deploy/test/namespace-manifests.yaml -globalMan ../../deploy/crds/jaegertracing_v1_jaeger_crd.yaml -root .

.PHONY: run
run: crd
@rm -rf /tmp/_cert*
Expand Down Expand Up @@ -137,6 +143,13 @@ storage:
@echo Creating namespace $(STORAGE_NAMESPACE)
@kubectl create namespace $(STORAGE_NAMESPACE) 2>&1 | grep -v "already exists" || true

.PHONY: kafka
kafka:
@echo Creating namespace $(KAFKA_NAMESPACE)
@kubectl create namespace $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true
@sed 's/namespace: .*/namespace: kafka/' ./test/kafka-operator.yml | kubectl -n $(KAFKA_NAMESPACE) apply -f - 2>&1 | grep -v "already exists" || true
@kubectl apply -f ./test/kafka.yml -n $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true

.PHONY: clean
clean:
@rm -f deploy/test/*.yaml
Expand Down
97 changes: 97 additions & 0 deletions test/e2e/streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package e2e

import (
goctx "context"
"fmt"
"testing"

framework "github.com/operator-framework/operator-sdk/pkg/test"
"github.com/operator-framework/operator-sdk/pkg/test/e2eutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
)

func SimpleStreaming(t *testing.T) {
ctx := prepare(t)
defer ctx.Cleanup()

if err := simpleStreaming(t, framework.Global, ctx); err != nil {
t.Fatal(err)
}
}

func simpleStreaming(t *testing.T, f *framework.Framework, ctx *framework.TestCtx) error {
namespace, err := ctx.GetNamespace()
if err != nil {
return fmt.Errorf("could not get namespace: %v", err)
}

// create jaeger custom resource
exampleJaeger := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Kind: "Jaeger",
APIVersion: "jaegertracing.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "simple-streaming",
Namespace: namespace,
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.producer.topic": "jaeger-spans",
"kafka.producer.brokers": "my-cluster-kafka-brokers.kafka:9092",
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.consumer.topic": "jaeger-spans",
"kafka.consumer.brokers": "my-cluster-kafka-brokers.kafka:9092",
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"es.server-urls": esServerUrls,
}),
},
},
}
err = f.Client.Create(goctx.TODO(), exampleJaeger, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
if err != nil {
return err
}

err = e2eutil.WaitForDeployment(t, f.KubeClient, namespace, "simple-streaming-collector", 1, retryInterval, timeout)
if err != nil {
return err
}

err = e2eutil.WaitForDeployment(t, f.KubeClient, namespace, "simple-streaming-query", 1, retryInterval, timeout)
if err != nil {
return err
}
queryPod, err := GetPod(namespace, "simple-streaming-query", "jaegertracing/jaeger-query", f.KubeClient)
if err != nil {
return err
}
collectorPod, err := GetPod(namespace, "simple-streaming-collector", "jaegertracing/jaeger-collector", f.KubeClient)
if err != nil {
return err
}
portForw, closeChan, err := CreatePortForward(namespace, queryPod.Name, []string{"16686"}, f.KubeConfig)
if err != nil {
return err
}
defer portForw.Close()
defer close(closeChan)
portForwColl, closeChanColl, err := CreatePortForward(namespace, collectorPod.Name, []string{"14268"}, f.KubeConfig)
if err != nil {
return err
}
defer portForwColl.Close()
defer close(closeChanColl)
return SmokeTest("http://localhost:16686/api/traces", "http://localhost:14268/api/traces", "foobar", retryInterval, timeout)
}
39 changes: 39 additions & 0 deletions test/e2e/suite_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// +build streaming

package e2e

import (
"testing"

framework "github.com/operator-framework/operator-sdk/pkg/test"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/jaegertracing/jaeger-operator/pkg/apis"
"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
)

func TestElasticsearch(t *testing.T) {
assert.NoError(t, framework.AddToFrameworkScheme(apis.AddToScheme, &v1.JaegerList{
TypeMeta: metav1.TypeMeta{
Kind: "Jaeger",
APIVersion: "jaegertracing.io/v1",
},
}))

// Don't start tests until elasticsearch is ready
err := WaitForStatefulset(t, framework.Global.KubeClient, storageNamespace, "elasticsearch", retryInterval, timeout)
if err != nil {
t.Fatal(err)
}

// Don't start tests until elasticsearch is ready
err = WaitForStatefulset(t, framework.Global.KubeClient, kafkaNamespace, "my-cluster-kafka", retryInterval, timeout)
if err != nil {
t.Fatal(err)
}

t.Run("streaming", func(t *testing.T) {
t.Run("simple-streaming", SimpleStreaming)
})
}
31 changes: 15 additions & 16 deletions test/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
osv1 "github.com/openshift/api/route/v1"
osv1sec "github.com/openshift/api/security/v1"
framework "github.com/operator-framework/operator-sdk/pkg/test"
"github.com/operator-framework/operator-sdk/pkg/test/e2eutil"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
osv1 "github.com/openshift/api/route/v1"
osv1sec "github.com/openshift/api/security/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
"github.com/jaegertracing/jaeger-operator/pkg/apis"
"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
)

var (
retryInterval = time.Second * 5
timeout = time.Minute * 2
storageNamespace = os.Getenv("STORAGE_NAMESPACE")
kafkaNamespace = os.Getenv("KAFKA_NAMESPACE")
esServerUrls = "http://elasticsearch." + storageNamespace + ".svc:9200"
cassandraServiceName = "cassandra." + storageNamespace + ".svc"
ctx *framework.TestCtx
fw *framework.Framework
namespace string
t *testing.T
ctx *framework.TestCtx
fw *framework.Framework
namespace string
t *testing.T
)

// GetPod returns pod name
Expand Down Expand Up @@ -118,7 +119,6 @@ func addToFrameworkSchemeForSmokeTests(t *testing.T) {
}
}


type resp struct {
Data []trace `json:"data"`
}
Expand All @@ -134,10 +134,9 @@ type span struct {
}

type services struct {
Data []string `json:"data"`
total int `json:"total"`
limit int `json:"limit"`
offset int `json:offset`
Data []string `json:"data"`
total int `json:"total"`
limit int `json:"limit"`
offset int `json:offset`
errors interface{} `json:"errors"`
}

Loading