From 929d30d314eec8c122ff88dff131af63feddb7ef Mon Sep 17 00:00:00 2001 From: Szymon Bylica <142112966+sbylica-splunk@users.noreply.github.com> Date: Thu, 21 Mar 2024 12:27:13 +0100 Subject: [PATCH] Keepalive timeout config (#361) * Added config for keepalive timer --- README.md | 2 ++ eventrouter/default.go | 1 - eventsink/splunk.go | 26 ++++++++++++----------- eventwriter/splunk_event.go | 22 +++++++++++-------- splunknozzle/config.go | 16 +++++++++----- splunknozzle/nozzle.go | 42 ++++++++++++++++++++----------------- 6 files changed, 63 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 71e39f13..27ee7687 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,8 @@ This is recommended for dev environments only. * `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled). * `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled * `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below +* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false) +* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s) __About app cache params:__ diff --git a/eventrouter/default.go b/eventrouter/default.go index 08071e8b..689d9099 100644 --- a/eventrouter/default.go +++ b/eventrouter/default.go @@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error { // Ignore this event since we are not interested return nil } - _ = r.sink.Write(msg) return nil diff --git a/eventsink/splunk.go b/eventsink/splunk.go index ceb7c835..18331dfe 100644 --- a/eventsink/splunk.go +++ b/eventsink/splunk.go @@ -22,18 +22,20 @@ import ( const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4" type SplunkConfig struct { - FlushInterval time.Duration - QueueSize int // consumer queue buffer size - BatchSize int - Retries int // No of retries to post events to HEC before dropping events - Hostname string - SubscriptionID string - ExtraFields map[string]string - TraceLogging bool - UUID string - Logger lager.Logger - StatusMonitorInterval time.Duration - LoggingIndex string + FlushInterval time.Duration + QueueSize int // consumer queue buffer size + BatchSize int + Retries int // No of retries to post events to HEC before dropping events + Hostname string + SubscriptionID string + ExtraFields map[string]string + TraceLogging bool + UUID string + Logger lager.Logger + StatusMonitorInterval time.Duration + LoggingIndex string + RefreshSplunkConnection bool + KeepAliveTimer time.Duration } type ParseConfig = fevents.Config diff --git a/eventwriter/splunk_event.go b/eventwriter/splunk_event.go index d891733d..f82c5c35 100644 --- a/eventwriter/splunk_event.go +++ b/eventwriter/splunk_event.go @@ -18,13 +18,15 @@ import ( var keepAliveTimer = time.Now() type SplunkConfig struct { - Host string - Token string - Index string - Fields map[string]string - SkipSSL bool - Debug bool - Version string + Host string + Token string + Index string + Fields map[string]string + SkipSSL bool + Debug bool + Version string + RefreshSplunkConnection bool + KeepAliveTimer time.Duration Logger lager.Logger } @@ -121,8 +123,10 @@ func (s *SplunkEvent) send(postBody *[]byte) error { responseBody, _ := io.ReadAll(resp.Body) return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody)) } else { - if time.Now().After(keepAliveTimer) { - keepAliveTimer = time.Now().Add(5 * time.Second) + if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) { + if s.config.KeepAliveTimer > 0 { + keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer) + } } else { //Draining the response buffer, so that the same connection can be reused the next time if _, err := io.Copy(io.Discard, resp.Body); err != nil { diff --git a/splunknozzle/config.go b/splunknozzle/config.go index a0d2c438..d0ec491f 100644 --- a/splunknozzle/config.go +++ b/splunknozzle/config.go @@ -42,11 +42,13 @@ type Config struct { WantedEvents string `json:"wanted-events"` ExtraFields string `json:"extra-fields"` - FlushInterval time.Duration `json:"flush-interval"` - QueueSize int `json:"queue-size"` - BatchSize int `json:"batch-size"` - Retries int `json:"retries"` - HecWorkers int `json:"hec-workers"` + FlushInterval time.Duration `json:"flush-interval"` + QueueSize int `json:"queue-size"` + BatchSize int `json:"batch-size"` + Retries int `json:"retries"` + HecWorkers int `json:"hec-workers"` + RefreshSplunkConnection bool `json:"refresh-splunk-connection"` + KeepAliveTimer time.Duration `json:"keep-alive-timer"` Version string `json:"version"` Branch string `json:"branch"` @@ -133,6 +135,10 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config { OverrideDefaultFromEnvar("HEC_RETRIES").Default("5").IntVar(&c.Retries) kingpin.Flag("hec-workers", "How many workers (concurrency) when post data to HEC"). OverrideDefaultFromEnvar("HEC_WORKERS").Default("8").IntVar(&c.HecWorkers) + kingpin.Flag("refresh-splunk-connection", "Periodically refresh connection to Splunk"). + OverrideDefaultFromEnvar("REFRESH_SPLUNK_CONNECTION").Default("false").BoolVar(&c.RefreshSplunkConnection) + kingpin.Flag("keep-alive-timer", "Interval used to close and refresh connection to Splunk"). + OverrideDefaultFromEnvar("KEEP_ALIVE_TIMER").Default("30s").DurationVar(&c.KeepAliveTimer) kingpin.Flag("enable-event-tracing", "Enable event trace logging: Adds splunk trace logging fields to events. uuid, subscription-id, nozzle event counter"). OverrideDefaultFromEnvar("ENABLE_EVENT_TRACING").Default("false").BoolVar(&c.TraceLogging) diff --git a/splunknozzle/nozzle.go b/splunknozzle/nozzle.go index 303c637f..90fa8253 100644 --- a/splunknozzle/nozzle.go +++ b/splunknozzle/nozzle.go @@ -86,13 +86,15 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err // EventWriter for writing events writerConfig := &eventwriter.SplunkConfig{ - Host: s.config.SplunkHost, - Token: s.config.SplunkToken, - Index: s.config.SplunkIndex, - SkipSSL: s.config.SkipSSLSplunk, - Debug: s.config.Debug, - Logger: s.logger, - Version: s.config.Version, + Host: s.config.SplunkHost, + Token: s.config.SplunkToken, + Index: s.config.SplunkIndex, + SkipSSL: s.config.SkipSSLSplunk, + Debug: s.config.Debug, + Logger: s.logger, + Version: s.config.Version, + RefreshSplunkConnection: s.config.RefreshSplunkConnection, + KeepAliveTimer: s.config.KeepAliveTimer, } var writers []eventwriter.Writer @@ -112,18 +114,20 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err nozzleUUID := uuid.New().String() sinkConfig := &eventsink.SplunkConfig{ - FlushInterval: s.config.FlushInterval, - QueueSize: s.config.QueueSize, - BatchSize: s.config.BatchSize, - Retries: s.config.Retries, - Hostname: s.config.JobHost, - SubscriptionID: s.config.SubscriptionID, - TraceLogging: s.config.TraceLogging, - ExtraFields: parsedExtraFields, - UUID: nozzleUUID, - Logger: s.logger, - LoggingIndex: s.config.SplunkLoggingIndex, - StatusMonitorInterval: s.config.StatusMonitorInterval, + FlushInterval: s.config.FlushInterval, + QueueSize: s.config.QueueSize, + BatchSize: s.config.BatchSize, + Retries: s.config.Retries, + Hostname: s.config.JobHost, + SubscriptionID: s.config.SubscriptionID, + TraceLogging: s.config.TraceLogging, + ExtraFields: parsedExtraFields, + UUID: nozzleUUID, + Logger: s.logger, + LoggingIndex: s.config.SplunkLoggingIndex, + StatusMonitorInterval: s.config.StatusMonitorInterval, + RefreshSplunkConnection: s.config.RefreshSplunkConnection, + KeepAliveTimer: s.config.KeepAliveTimer, } LowerAddAppInfo := strings.ToLower(s.config.AddAppInfo)