Skip to content

Commit

Permalink
Keepalive timeout config (#361)
Browse files Browse the repository at this point in the history
* Added config for keepalive timer
  • Loading branch information
sbylica-splunk authored Mar 21, 2024
1 parent a68a289 commit 929d30d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 46 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ This is recommended for dev environments only.
* `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled).
* `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled
* `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below
* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false)
* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s)

__About app cache params:__

Expand Down
1 change: 0 additions & 1 deletion eventrouter/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error {
// Ignore this event since we are not interested
return nil
}

_ = r.sink.Write(msg)

return nil
Expand Down
26 changes: 14 additions & 12 deletions eventsink/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"

type SplunkConfig struct {
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration
}

type ParseConfig = fevents.Config
Expand Down
22 changes: 13 additions & 9 deletions eventwriter/splunk_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
var keepAliveTimer = time.Now()

type SplunkConfig struct {
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration

Logger lager.Logger
}
Expand Down Expand Up @@ -121,8 +123,10 @@ func (s *SplunkEvent) send(postBody *[]byte) error {
responseBody, _ := io.ReadAll(resp.Body)
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
} else {
if time.Now().After(keepAliveTimer) {
keepAliveTimer = time.Now().Add(5 * time.Second)
if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) {
if s.config.KeepAliveTimer > 0 {
keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer)
}
} else {
//Draining the response buffer, so that the same connection can be reused the next time
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions splunknozzle/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ type Config struct {
WantedEvents string `json:"wanted-events"`
ExtraFields string `json:"extra-fields"`

FlushInterval time.Duration `json:"flush-interval"`
QueueSize int `json:"queue-size"`
BatchSize int `json:"batch-size"`
Retries int `json:"retries"`
HecWorkers int `json:"hec-workers"`
FlushInterval time.Duration `json:"flush-interval"`
QueueSize int `json:"queue-size"`
BatchSize int `json:"batch-size"`
Retries int `json:"retries"`
HecWorkers int `json:"hec-workers"`
RefreshSplunkConnection bool `json:"refresh-splunk-connection"`
KeepAliveTimer time.Duration `json:"keep-alive-timer"`

Version string `json:"version"`
Branch string `json:"branch"`
Expand Down Expand Up @@ -133,6 +135,10 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config {
OverrideDefaultFromEnvar("HEC_RETRIES").Default("5").IntVar(&c.Retries)
kingpin.Flag("hec-workers", "How many workers (concurrency) when post data to HEC").
OverrideDefaultFromEnvar("HEC_WORKERS").Default("8").IntVar(&c.HecWorkers)
kingpin.Flag("refresh-splunk-connection", "Periodically refresh connection to Splunk").
OverrideDefaultFromEnvar("REFRESH_SPLUNK_CONNECTION").Default("false").BoolVar(&c.RefreshSplunkConnection)
kingpin.Flag("keep-alive-timer", "Interval used to close and refresh connection to Splunk").
OverrideDefaultFromEnvar("KEEP_ALIVE_TIMER").Default("30s").DurationVar(&c.KeepAliveTimer)

kingpin.Flag("enable-event-tracing", "Enable event trace logging: Adds splunk trace logging fields to events. uuid, subscription-id, nozzle event counter").
OverrideDefaultFromEnvar("ENABLE_EVENT_TRACING").Default("false").BoolVar(&c.TraceLogging)
Expand Down
42 changes: 23 additions & 19 deletions splunknozzle/nozzle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err

// EventWriter for writing events
writerConfig := &eventwriter.SplunkConfig{
Host: s.config.SplunkHost,
Token: s.config.SplunkToken,
Index: s.config.SplunkIndex,
SkipSSL: s.config.SkipSSLSplunk,
Debug: s.config.Debug,
Logger: s.logger,
Version: s.config.Version,
Host: s.config.SplunkHost,
Token: s.config.SplunkToken,
Index: s.config.SplunkIndex,
SkipSSL: s.config.SkipSSLSplunk,
Debug: s.config.Debug,
Logger: s.logger,
Version: s.config.Version,
RefreshSplunkConnection: s.config.RefreshSplunkConnection,
KeepAliveTimer: s.config.KeepAliveTimer,
}

var writers []eventwriter.Writer
Expand All @@ -112,18 +114,20 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err
nozzleUUID := uuid.New().String()

sinkConfig := &eventsink.SplunkConfig{
FlushInterval: s.config.FlushInterval,
QueueSize: s.config.QueueSize,
BatchSize: s.config.BatchSize,
Retries: s.config.Retries,
Hostname: s.config.JobHost,
SubscriptionID: s.config.SubscriptionID,
TraceLogging: s.config.TraceLogging,
ExtraFields: parsedExtraFields,
UUID: nozzleUUID,
Logger: s.logger,
LoggingIndex: s.config.SplunkLoggingIndex,
StatusMonitorInterval: s.config.StatusMonitorInterval,
FlushInterval: s.config.FlushInterval,
QueueSize: s.config.QueueSize,
BatchSize: s.config.BatchSize,
Retries: s.config.Retries,
Hostname: s.config.JobHost,
SubscriptionID: s.config.SubscriptionID,
TraceLogging: s.config.TraceLogging,
ExtraFields: parsedExtraFields,
UUID: nozzleUUID,
Logger: s.logger,
LoggingIndex: s.config.SplunkLoggingIndex,
StatusMonitorInterval: s.config.StatusMonitorInterval,
RefreshSplunkConnection: s.config.RefreshSplunkConnection,
KeepAliveTimer: s.config.KeepAliveTimer,
}

LowerAddAppInfo := strings.ToLower(s.config.AddAppInfo)
Expand Down

0 comments on commit 929d30d

Please sign in to comment.