Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-architecting around gateways and sensors #92

Merged
merged 37 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
12af598
added gateway for processing outside events, making gateway long runn…
VaibhavPage Aug 7, 2018
4bb4457
refactoring k8s manifests
VaibhavPage Aug 22, 2018
95c2548
adding s3 gateway
VaibhavPage Aug 22, 2018
5839ee9
making s3 and webhook gateway mult configurable
VaibhavPage Aug 23, 2018
c3542da
refactor of calendar, s3 and webhook gateway done. started with streams
VaibhavPage Aug 23, 2018
543f698
added nats gateway
VaibhavPage Aug 24, 2018
44956f4
renamed signals to gateways. moved out-of-box gateways to core.
VaibhavPage Aug 24, 2018
78843be
refactor gateways
VaibhavPage Aug 24, 2018
cdc20f3
removing signals folder
VaibhavPage Aug 24, 2018
2ab9407
added kafka gateway
VaibhavPage Aug 24, 2018
7caee70
gateway and sensor dirs under a common controllers and clients directory
VaibhavPage Aug 27, 2018
2db02fe
adding resource, amqp and mqtt gateways. refactored sensor watcher
VaibhavPage Aug 28, 2018
62ccce9
adding gateway examples, readme etc
VaibhavPage Aug 28, 2018
721bac5
adding gateway docs. refactor gateway and sensor examples
VaibhavPage Aug 29, 2018
57791a9
added watcher for sensor. Completion count for sensor. Updated gatewa…
VaibhavPage Aug 30, 2018
ee5fefb
sensor's signal are gateway-name/configuration-name
VaibhavPage Aug 30, 2018
a039fa1
making gateway processor as gRPC server
VaibhavPage Sep 4, 2018
16d175a
giving user options to implement gateway either by mimicing core gate…
VaibhavPage Sep 5, 2018
66d3247
gateway spec changed. divided extending gateways into 3 parts. starte…
VaibhavPage Sep 5, 2018
28db2b8
updated gateway examples with deploySpec. started writing custom gate…
VaibhavPage Sep 6, 2018
11110d3
adding wiki for writing custom gateways
VaibhavPage Sep 6, 2018
8947fe5
updating wiki. grpc gateway updated DialOptions
VaibhavPage Sep 7, 2018
6828b2e
added REST gateways. wiki to follow
VaibhavPage Sep 8, 2018
50ec67c
refactor rest gateways
VaibhavPage Sep 10, 2018
b8c79c0
added k8 events for gateway. Gateway configuration state is now maint…
VaibhavPage Sep 11, 2018
99b04d5
minor refactor in k8 events
VaibhavPage Sep 11, 2018
655028a
added gateways as watchers for gateway along with sensors. started ad…
VaibhavPage Sep 12, 2018
3b0c7b0
added k8 event for escalation. nodes are deleted from gateway resourc…
VaibhavPage Sep 14, 2018
b43c934
added unit tests. refactored sensor notification handler. added k8 ev…
VaibhavPage Sep 17, 2018
970c63b
adding test cases for sensor controller, operator and notification ha…
VaibhavPage Sep 17, 2018
9ec8c44
updating webhook gateway
VaibhavPage Sep 18, 2018
6433b77
fixed issue with syncing gateway nodes and gateway configmap
VaibhavPage Sep 20, 2018
6691b07
test: cleaning up test cases
VaibhavPage Sep 21, 2018
d08ac54
updating dependencies
VaibhavPage Sep 21, 2018
92b31a5
update docker registry to argoproj
VaibhavPage Sep 22, 2018
ac90e6b
exposing gateway and sensor validate method
VaibhavPage Sep 27, 2018
edfa3a1
update dependencies
VaibhavPage Sep 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ required = [

[[constraint]]
name = "github.com/ghodss/yaml"
version = "1.0.0"
branch = "master"

[[constraint]]
name = "github.com/minio/minio-go"
Expand All @@ -49,6 +49,10 @@ required = [
name = "github.com/Shopify/sarama"
version = "1.16.0"

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.1.4"

[[override]]
branch = "release-1.10"
name = "k8s.io/api"
Expand Down
20 changes: 0 additions & 20 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"math/rand"
"net/http"
"time"
)

// DefaultConfigMapName returns a formulated name for a configmap name based on the sensor-controller deployment name
Expand Down Expand Up @@ -96,21 +94,3 @@ func SendErrorResponse(writer http.ResponseWriter) {
writer.WriteHeader(http.StatusBadRequest)
writer.Write([]byte(ErrorResponse))
}

// RandomStringGenerator generates a random string
func RandomStringGenerator() string {
var n int
for {
n = rand.Intn(10)
if n != 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
2 changes: 1 addition & 1 deletion controllers/gateway/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (goc *gwOperationCtx) operate() error {
goc.log.Info().Msg("operating on the gateway...")

// performs a basic validation on gateway resource.
err := goc.validate()
err := Validate(goc.gw)
if err != nil {
goc.log.Error().Err(err).Msg("gateway validation failed")
goc.markGatewayPhase(v1alpha1.NodePhaseError, "validation failed")
Expand Down
13 changes: 7 additions & 6 deletions controllers/gateway/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (
)

// Validates the gateway resource.
func (goc *gwOperationCtx) validate() error {
if goc.gw.Spec.DeploySpec == nil {
// Exporting this function so that external APIs can use this to validate gateway resource.
func Validate(gw *v1alpha1.Gateway) error {
if gw.Spec.DeploySpec == nil {
return fmt.Errorf("gateway deploy specification is not specified")
}
if goc.gw.Spec.Type == "" {
if gw.Spec.Type == "" {
return fmt.Errorf("gateway type is not specified")
}
if goc.gw.Spec.Version == "" {
if gw.Spec.Version == "" {
return fmt.Errorf("gateway version is not specified")
}
switch goc.gw.Spec.DispatchMechanism {
switch gw.Spec.DispatchMechanism {
case v1alpha1.HTTPGateway:
if goc.gw.Spec.Watchers == nil || (goc.gw.Spec.Watchers.Gateways == nil && goc.gw.Spec.Watchers.Sensors == nil) {
if gw.Spec.Watchers == nil || (gw.Spec.Watchers.Gateways == nil && gw.Spec.Watchers.Sensors == nil) {
return fmt.Errorf("no associated watchers with gateway")
}
case v1alpha1.NATSGateway:
Expand Down
2 changes: 1 addition & 1 deletion controllers/gateway/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func TestGwOperationCtx_validate(t *testing.T) {
gateway, err := getGateway()
assert.Nil(t, err)
goc := newGatewayOperationCtx(gateway, fakeController)
err = goc.validate()
err = Validate(goc.gw)
assert.Nil(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably beef up this unit tests at some point later.. we can make a note of it and address in a future ticket.

}
2 changes: 1 addition & 1 deletion controllers/sensor/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (soc *sOperationCtx) operate() error {
// non nil err indicates failed validation
// we do not want to requeue a sensor in this case
// since validation will fail every time
err := validateSensor(soc.s)
err := ValidateSensor(soc.s)
if err != nil {
soc.log.Error().Err(err).Msg("failed to validate sensor")
soc.markSensorPhase(v1alpha1.NodePhaseError, true, err.Error())
Expand Down
5 changes: 3 additions & 2 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// validateSensor accepts a sensor and performs validation against it
// ValidateSensor accepts a sensor and performs validation against it
// we return an error so that it can be logged as a message on the sensor status
// the error is ignored by the operation context as subsequent re-queues would produce the same error.
func validateSensor(s *v1alpha1.Sensor) error {
// Exporting this function so that external APIs can use this to validate sensor resource.
func ValidateSensor(s *v1alpha1.Sensor) error {
if err := validateSignals(s.Spec.Signals); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import (
func Test_validateSensor(t *testing.T) {
sensor, err := getSensor()
assert.Nil(t, err)
err = validateSensor(sensor)
err = ValidateSensor(sensor)
assert.Nil(t, err)
}
1 change: 0 additions & 1 deletion examples/sensors/calendar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ spec:
repeat: true
signals:
- name: calendar-gateway/calendar.fooConfig
- name:
triggers:
- name: calendar-workflow-trigger
resource:
Expand Down
2 changes: 1 addition & 1 deletion gateways/core/artifact/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s3ce *s3ConfigExecutor) StartConfig(config *gateways.ConfigContext) error
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-name", config.Data.Src).Msg("parsing configuration...")

Expand Down
2 changes: 1 addition & 1 deletion gateways/core/calendar/calendar.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ce *calendarConfigExecutor) StartConfig(config *gateways.ConfigContext) er
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-name", config.Data.Src).Msg("parsing configuration...")

Expand Down
2 changes: 1 addition & 1 deletion gateways/core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (rce *resourceConfigExecutor) StartConfig(config *gateways.ConfigContext) e
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-key", config.Data.Src).Msg("parsing configuration...")

Expand Down
2 changes: 1 addition & 1 deletion gateways/core/stream/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (ace *amqpConfigExecutor) StartConfig(config *gateways.ConfigContext) error
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-key", config.Data.Src).Msg("parsing configuration...")

Expand Down
2 changes: 1 addition & 1 deletion gateways/core/stream/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (kce *kafkaConfigExecutor) StartConfig(config *gateways.ConfigContext) erro
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

var s *stream.Stream
err = yaml.Unmarshal([]byte(config.Data.Config), &s)
Expand Down
2 changes: 1 addition & 1 deletion gateways/core/stream/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (mce *mqttConfigExecutor) StartConfig(config *gateways.ConfigContext) error
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-key", config.Data.Src).Msg("parsing configuration...")
var wg sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion gateways/core/stream/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (nce *natsConfigExecutor) StartConfig(config *gateways.ConfigContext) error
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-key", config.Data.Src).Msg("parsing configuration...")

Expand Down
2 changes: 1 addition & 1 deletion gateways/core/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (wce *webhookConfigExecutor) StartConfig(config *gateways.ConfigContext) er
var errMessage string

// mark final gateway state
defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

gatewayConfig.Log.Info().Str("config-name", config.Data.Src).Msg("parsing configuration...")

Expand Down
6 changes: 3 additions & 3 deletions gateways/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,16 @@ func (gc *GatewayConfig) GetK8Event(reason string, action v1alpha1.NodePhase, co
}

// GatewayCleanup marks configuration as non-active and marks final gateway state
func (gc *GatewayConfig) GatewayCleanup(config *ConfigContext, errMessage string, err error) {
func (gc *GatewayConfig) GatewayCleanup(config *ConfigContext, errMessage *string, err error) {
var event *corev1.Event
// mark configuration as deactivated so gateway processor client won't run configStopper in case if there
// was configuration error.
config.Active = false
// check if gateway configuration is in error condition.
if err != nil {
gc.Log.Error().Err(err).Str("config-key", config.Data.Src).Msg(errMessage)
gc.Log.Error().Err(err).Str("config-key", config.Data.Src).Msg(*errMessage)
// create k8 event for error state
event = gc.GetK8Event(errMessage, v1alpha1.NodePhaseError, config.Data)
event = gc.GetK8Event(*errMessage, v1alpha1.NodePhaseError, config.Data)
} else {
// gateway successfully completed/deactivated this configuration.
gc.Log.Info().Str("config-key", config.Data.Src).Msg("configuration completed")
Expand Down
2 changes: 1 addition & 1 deletion gateways/grpc/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (gce *grpcConfigExecutor) StartConfig(config *gateways.ConfigContext) error
var err error
var errMessage string

defer gatewayConfig.GatewayCleanup(config, errMessage, err)
defer gatewayConfig.GatewayCleanup(config, &errMessage, err)

opts := []grpc.DialOption{
grpc.WithInsecure(),
Expand Down
2 changes: 1 addition & 1 deletion gateways/rest/calendar/calendar.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func runGateway(config *gateways.ConfigContext) error {
var errMessage string

// mark final gateway state
defer httpGatewayServerConfig.GwConfig.GatewayCleanup(config, errMessage, err)
defer httpGatewayServerConfig.GwConfig.GatewayCleanup(config, &errMessage, err)

httpGatewayServerConfig.GwConfig.Log.Info().Str("config-name", config.Data.Src).Msg("parsing calendar schedule...")

Expand Down
6 changes: 4 additions & 2 deletions gateways/rest/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (hce *httpConfigExecutor) StartConfig(config *gateways.ConfigContext) error
Config: config.Data.Config,
}, httpGatewayServerConfig.ConfigActivateEndpoint)
if err != nil {
httpGatewayServerConfig.GwConfig.GatewayCleanup(config, "failed to send new configuration to gateway processor server", err)
errMsg := "failed to send new configuration to gateway processor server"
httpGatewayServerConfig.GwConfig.GatewayCleanup(config, &errMsg, err)
}
return err
}
Expand All @@ -38,7 +39,8 @@ func (hce *httpConfigExecutor) StopConfig(config *gateways.ConfigContext) error
Config: config.Data.Config,
}, httpGatewayServerConfig.ConfigurationDeactivateEndpoint)
if err != nil {
httpGatewayServerConfig.GwConfig.GatewayCleanup(config, "failed to send configuration to stop to gateway processor server", err)
errMsg := "failed to send configuration to stop to gateway processor server"
httpGatewayServerConfig.GwConfig.GatewayCleanup(config, &errMsg, err)
}
return err
}
Expand Down