Skip to content

Commit

Permalink
Merge pull request #91 from rgdoliveira/sync_main
Browse files Browse the repository at this point in the history
Sync main branch with Apache main branch
  • Loading branch information
rgdoliveira authored Nov 6, 2024
2 parents 66b7b14 + 8c5a919 commit a2fae79
Show file tree
Hide file tree
Showing 39 changed files with 373 additions and 145 deletions.
4 changes: 2 additions & 2 deletions api/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/apache/incubator-kie-kogito-serverless-operator/api

go 1.22.8
go 1.22.0

require (
github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.2
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
knative.dev/pkg v0.0.0-20231023151236-29775d7c9e5c
Expand Down
2 changes: 1 addition & 1 deletion api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1 h1:9NnaYGhSKZj19rRC8gTZ3IVJJ4EjFG0LJuQe/bt510Q=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
Expand Down
2 changes: 1 addition & 1 deletion bddframework/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/apache/incubator-kie-kogito-serverless-operator/bddframework

go 1.22.8
go 1.22.0

// Internal dependencies
replace github.com/apache/incubator-kie-kogito-serverless-operator => ../
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ data:
# If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data
# Index Service reducing the number of produced events. Set to false to send individual events.
kogitoEventsGrouping: true
# If true, the accumulated workflow status change events will be sent in binary mode. (reduces the evens size)
kogitoEventsGroupingBinary: true
# If true, the accumulated workflow status change events, when sent in binary mode, will be gzipped at the cost of
# some performance.
kogitoEventsGroupingCompress: false
kind: ConfigMap
metadata:
name: sonataflow-operator-controllers-config
7 changes: 6 additions & 1 deletion config/manager/controllers_cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ postgreSQLPersistenceExtensions:
version: 999-20240912-SNAPSHOT
# If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data
# Index Service reducing the number of produced events. Set to false to send individual events.
kogitoEventsGrouping: true
kogitoEventsGrouping: true
# If true, the accumulated workflow status change events will be sent in binary mode. (reduces the evens size)
kogitoEventsGroupingBinary: true
# If true, the accumulated workflow status change events, when sent in binary mode, will be gzipped at the cost of
# some performance.
kogitoEventsGroupingCompress: false
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/apache/incubator-kie-kogito-serverless-operator

go 1.22.8
go 1.22.0

// Internal dependencies
replace (
Expand All @@ -24,7 +24,7 @@ require (
github.com/openshift/client-go v0.0.0-20240528061634-b054aa794d87
github.com/pkg/errors v0.9.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.55.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.2
github.com/stretchr/testify v1.9.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 h1:uIkTLo0AGRc8l7h5l9r+GcYi9qfVP
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1 h1:9NnaYGhSKZj19rRC8gTZ3IVJJ4EjFG0LJuQe/bt510Q=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1/go.mod h1:gl5WYsxKseaozFkHJwWNO5EGcG7Zztqk2HGuqeCovj4=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2/go.mod h1:WGJR0YhXp035Y/toMKwHeIT5/EDEkDEDozn6VIGSUqI=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
Expand Down
4 changes: 2 additions & 2 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.22.8
go 1.22.0

use (
.
Expand All @@ -15,7 +15,7 @@ replace (
github.com/openshift/client-go => github.com/openshift/client-go v0.0.0-20230503144108-75015d2347cb

// Main dependencies sync
github.com/serverlessworkflow/sdk-go/v2 => github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 => github.com/serverlessworkflow/sdk-go/v2 v2.4.2
k8s.io/api => k8s.io/api v0.31.1
k8s.io/apimachinery => k8s.io/apimachinery v0.31.1
k8s.io/client-go => k8s.io/client-go v0.31.1
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/sclevine/spec v1.2.0 h1:1Jwdf9jSfDl9NVmt8ndHqbTZ7XCCPbh1jI3hkDBHVYA=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2/go.mod h1:WGJR0YhXp035Y/toMKwHeIT5/EDEkDEDozn6VIGSUqI=
github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7 h1:80VN+vGkqM773Br/uNNTSheo3KatTgV8IpjIKjvVLng=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
Expand Down
2 changes: 1 addition & 1 deletion images/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

- name: operator-builder
version: 999.0.0-snapshot
from: 'golang:1.22.8'
from: 'golang:1.22'
description: Builder Image for the Operator

args:
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/cfg/controllers_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type ControllersCfg struct {
BuilderConfigMapName string `yaml:"builderConfigMapName,omitempty"`
PostgreSQLPersistenceExtensions []GAV `yaml:"postgreSQLPersistenceExtensions,omitempty"`
KogitoEventsGrouping bool `yaml:"kogitoEventsGrouping,omitempty"`
KogitoEventsGroupingBinary bool `yaml:"KogitoEventsGroupingBinary,omitempty"`
KogitoEventsGroupingCompress bool `yaml:"KogitoEventsGroupingCompress,omitempty"`
}

// InitializeControllersCfg initializes the platform configuration for this instance.
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/cfg/controllers_cfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestInitializeControllersCfgAt_ValidFile(t *testing.T) {
Version: "999-SNAPSHOT",
}, postgresExtensions[2])
assert.True(t, cfg.KogitoEventsGrouping)
assert.True(t, cfg.KogitoEventsGroupingBinary)
assert.False(t, cfg.KogitoEventsGroupingCompress)
}

func TestInitializeControllersCfgAt_FileNotFound(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/cfg/testdata/controllers-cfg-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ postgreSQLPersistenceExtensions:
- groupId: org.kie
artifactId: kie-addons-quarkus-persistence-jdbc
version: 999-SNAPSHOT
kogitoEventsGrouping: true
kogitoEventsGrouping: true
kogitoEventsGroupingBinary: true
kogitoEventsGroupingCompress: false
6 changes: 3 additions & 3 deletions internal/controller/platform/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package platform
import (
"context"

"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"

v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
corev1 "k8s.io/api/core/v1"
)

// Action --.
Expand All @@ -38,7 +38,7 @@ type Action interface {
CanHandle(platform *v08.SonataFlowPlatform) bool

// executes the handling function
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error)
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error)
}

type baseAction struct {
Expand Down
6 changes: 4 additions & 2 deletions internal/controller/platform/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package platform
import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
)
Expand All @@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform *v08.SonataFlowPlatform) bool {
return platform.Status.IsCreating()
}

func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) {
func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) {
//TODO: Perform the actions needed for the Platform creation
platform.Status.Manager().MarkTrue(api.SucceedConditionType)

return platform, nil
return platform, nil, nil
}
16 changes: 8 additions & 8 deletions internal/controller/platform/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform *operatorapi.SonataFlowPlatfo
return platform.Status.GetTopLevelCondition().IsUnknown() || platform.Status.IsDuplicated()
}

func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
duplicate, err := action.isPrimaryDuplicate(ctx, platform)
if err != nil {
return nil, err
return nil, nil, err
}
if duplicate {
// another platform already present in the namespace
if !platform.Status.IsDuplicated() {
plat := platform.DeepCopy()
plat.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "")
return plat, nil
return plat, nil, nil
}

return nil, nil
return nil, nil, nil
}

if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
return nil, err
return nil, nil, err
}
// nolint: staticcheck
if platform.Spec.Build.Config.BuildStrategy == operatorapi.OperatorBuildStrategy {
Expand All @@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
klog.V(log.I).InfoS("Create persistent volume claim")
err := createPersistentVolumeClaim(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
// Create the Kaniko warmer pod that caches the base image into the SonataFlow builder volume
klog.V(log.I).InfoS("Create Kaniko cache warmer pod")
err = createKanikoCacheWarmerPod(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
platform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "")
} else {
Expand All @@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
}
platform.Status.Version = metadata.SpecVersion

return platform, nil
return platform, nil, nil
}

// TODO: move this to Kaniko packages based on the platform context
Expand Down
56 changes: 33 additions & 23 deletions internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}

func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
return nil, nil, err
}

psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, event, err
}
}

psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, event, err
}
}

return platform, nil
return platform, nil, nil
}

func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateService(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}
Expand Down Expand Up @@ -159,6 +159,7 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
MatchLabels: selectorLbl,
},
Replicas: &replicas,
Strategy: psh.GetDeploymentStrategy(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: lbl,
Expand Down Expand Up @@ -200,6 +201,9 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error {
knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec)
err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride)
// mergo.Merge algorithm is not setting the serviceDeployment.Spec.Replicas when the
// *serviceDeploymentSpec.Replicas is 0. Making impossible to scale to zero. Ensure the value.
serviceDeployment.Spec.Replicas = serviceDeploymentSpec.Replicas
if err != nil {
return err
}
Expand Down Expand Up @@ -307,24 +311,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context, c client.Client, platfo
return nil
}

func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
lbl, _ := getLabels(platform, psh)
objs, err := psh.GenerateKnativeResources(platform, lbl)
objs, event, err := psh.GenerateKnativeResources(platform, lbl)
if err != nil {
return err
return event, err
}
// Create or update triggers
for _, obj := range objs {
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
} else {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil {
return err
return nil, err
}
}
trigger := &eventingv1.Trigger{
Expand All @@ -335,21 +339,21 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
addToSonataFlowPlatformTriggerList(platform, trigger)
}
}

if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
return err
return nil, err
}

// Create or update sinkbindings
for _, obj := range objs {
if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
Expand All @@ -359,18 +363,24 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return err
return nil, err
}
if !kSinkInjected {
return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
msg := fmt.Sprintf("waiting for K_SINK injection for service %s to complete", psh.GetServiceName())
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: services.WaitingKnativeEventing,
Message: msg,
}
return event, fmt.Errorf(msg)
}
}
}
return nil
return nil, nil
}

func addToSonataFlowPlatformTriggerList(platform *operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
Expand Down
Loading

0 comments on commit a2fae79

Please sign in to comment.