Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Commit

Permalink
Add StrictMode with event validation
Browse files Browse the repository at this point in the history
In `StrictMode` required fields are:

* Timestamp
* Datastream.Namespace
* Datastream.Dataset
* Datastream.Type
* Source.InputId
  • Loading branch information
rdner committed Aug 12, 2022
1 parent dcb8427 commit b85408e
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/spf13/cobra"

_ "github.com/elastic/elastic-agent-libs/logp/configure"
"github.com/elastic/elastic-agent-shipper/server"
"github.com/elastic/elastic-agent-shipper/controller"
)

// NewCommand returns a new command structure
Expand All @@ -41,7 +41,7 @@ func runCmd() *cobra.Command {
Use: "run",
Short: "Start the elastic-agent-shipper.",
Run: func(_ *cobra.Command, _ []string) {
if err := server.LoadAndRun(); err != nil {
if err := controller.LoadAndRun(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n\n", err)
os.Exit(1)
}
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-shipper/monitoring"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"
"github.com/elastic/go-ucfg/json"
)

Expand Down Expand Up @@ -43,6 +44,7 @@ type ShipperConfig struct {
Port int `config:"port"` //Port to listen on
Monitor monitoring.Config `config:"monitoring"` //Queue monitoring settings
Queue queue.Config `config:"queue"` //Queue settings
Server server.Config `config:"server"` //gRPC Server settings
}

// ReadConfig returns the populated config from the specified path
Expand All @@ -64,6 +66,7 @@ func ReadConfig() (ShipperConfig, error) {
Log: logp.DefaultConfig(logp.SystemdEnvironment),
Monitor: monitoring.DefaultConfig(),
Queue: queue.DefaultConfig(),
Server: server.DefaultConfig(),
}
err = raw.Unpack(&config)
if err != nil {
Expand All @@ -84,6 +87,7 @@ func ReadConfigFromJSON(raw string) (ShipperConfig, error) {
Log: logp.DefaultConfig(logp.SystemdEnvironment),
Monitor: monitoring.DefaultConfig(),
Queue: queue.DefaultConfig(),
Server: server.DefaultConfig(),
}
err = rawCfg.Unpack(&shipperConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package server
package controller

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package server
package controller

import (
"context"
Expand Down
5 changes: 3 additions & 2 deletions server/run.go → controller/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package server
package controller

import (
"context"
Expand All @@ -22,6 +22,7 @@ import (
"github.com/elastic/elastic-agent-shipper/monitoring"
"github.com/elastic/elastic-agent-shipper/output"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"

pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error {
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer := grpc.NewServer(opts...)
shipperServer, err := NewShipperServer(queue)
shipperServer, err := server.NewShipperServer(cfg.Server, queue)
if err != nil {
return fmt.Errorf("failed to initialise the server: %w", err)
}
Expand Down
20 changes: 20 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package server

type Config struct {
// StrictMode means that every incoming event will be validated against the
// list of required fields. This introduces some additional overhead but can
// be really handy for client developers on the debugging stage.
// Normally, it should be disabled during production use and enabled for testing.
StrictMode bool `config:"strict_mode"`
}

// DefaultConfig returns default configuration for the gRPC server
func DefaultConfig() Config {
return Config{
StrictMode: false,
}
}
77 changes: 76 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -52,11 +53,13 @@ type shipperServer struct {
ctx context.Context
stop func()

cfg Config

pb.UnimplementedProducerServer
}

// NewShipperServer creates a new server instance for handling gRPC endpoints.
func NewShipperServer(publisher Publisher) (ShipperServer, error) {
func NewShipperServer(cfg Config, publisher Publisher) (ShipperServer, error) {
if publisher == nil {
return nil, errors.New("publisher cannot be nil")
}
Expand All @@ -71,6 +74,7 @@ func NewShipperServer(publisher Publisher) (ShipperServer, error) {
logger: logp.NewLogger("shipper-server"),
publisher: publisher,
close: &sync.Once{},
cfg: cfg,
}

s.ctx, s.stop = context.WithCancel(context.Background())
Expand Down Expand Up @@ -103,6 +107,15 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis
return resp, status.Error(codes.FailedPrecondition, fmt.Sprintf("UUID does not match. Expected = %s, actual = %s", serv.uuid, req.Uuid))
}

if serv.cfg.StrictMode {
for _, e := range req.Events {
err := serv.validateEvent(e)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
}

for _, e := range req.Events {
_, err := serv.publisher.Publish(e)
if err == nil {
Expand Down Expand Up @@ -190,3 +203,65 @@ func (serv *shipperServer) Close() error {

return nil
}

func (serv *shipperServer) validateEvent(m *messages.Event) error {
var msgs []string

if err := m.Timestamp.CheckValid(); err != nil {
msgs = append(msgs, fmt.Sprintf("timestamp: %s", err))
}

if err := serv.validateDataStream(m.DataStream); err != nil {
msgs = append(msgs, fmt.Sprintf("datastream: %s", err))
}

if err := serv.validateSource(m.Source); err != nil {
msgs = append(msgs, fmt.Sprintf("source: %s", err))
}

if len(msgs) == 0 {
return nil
}

return errors.New(strings.Join(msgs, "; "))
}

func (serv *shipperServer) validateSource(s *messages.Source) error {
if s == nil {
return fmt.Errorf("cannot be nil")
}

var msgs []string
if s.InputId == "" {
msgs = append(msgs, "input_id is a required field")
}

if len(msgs) == 0 {
return nil
}

return errors.New(strings.Join(msgs, "; "))
}

func (serv *shipperServer) validateDataStream(ds *messages.DataStream) error {
if ds == nil {
return fmt.Errorf("cannot be nil")
}

var msgs []string
if ds.Dataset == "" {
msgs = append(msgs, "dataset is a required field")
}
if ds.Namespace == "" {
msgs = append(msgs, "namespace is a required field")
}
if ds.Type == "" {
msgs = append(msgs, "type is a required field")
}

if len(msgs) == 0 {
return nil
}

return errors.New(strings.Join(msgs, "; "))
}
Loading

0 comments on commit b85408e

Please sign in to comment.