Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keepalive timeout config #361

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ This is recommended for dev environments only.
* `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled).
* `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled
* `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below
* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false)
* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s)

__About app cache params:__

Expand Down
1 change: 0 additions & 1 deletion eventrouter/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error {
// Ignore this event since we are not interested
return nil
}

_ = r.sink.Write(msg)

return nil
Expand Down
26 changes: 14 additions & 12 deletions eventsink/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"

type SplunkConfig struct {
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration
}

type ParseConfig = fevents.Config
Expand Down
22 changes: 13 additions & 9 deletions eventwriter/splunk_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
var keepAliveTimer = time.Now()

type SplunkConfig struct {
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration

Logger lager.Logger
}
Expand Down Expand Up @@ -121,8 +123,10 @@ func (s *SplunkEvent) send(postBody *[]byte) error {
responseBody, _ := io.ReadAll(resp.Body)
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
} else {
if time.Now().After(keepAliveTimer) {
keepAliveTimer = time.Now().Add(5 * time.Second)
if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) {
if s.config.KeepAliveTimer > 0 {
keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer)
}
} else {
//Draining the response buffer, so that the same connection can be reused the next time
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions splunknozzle/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ type Config struct {
WantedEvents string `json:"wanted-events"`
ExtraFields string `json:"extra-fields"`

FlushInterval time.Duration `json:"flush-interval"`
QueueSize int `json:"queue-size"`
BatchSize int `json:"batch-size"`
Retries int `json:"retries"`
HecWorkers int `json:"hec-workers"`
FlushInterval time.Duration `json:"flush-interval"`
QueueSize int `json:"queue-size"`
BatchSize int `json:"batch-size"`
Retries int `json:"retries"`
HecWorkers int `json:"hec-workers"`
RefreshSplunkConnection bool `json:"refresh-splunk-connection"`
KeepAliveTimer time.Duration `json:"keep-alive-timer"`

Version string `json:"version"`
Branch string `json:"branch"`
Expand Down Expand Up @@ -133,6 +135,10 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config {
OverrideDefaultFromEnvar("HEC_RETRIES").Default("5").IntVar(&c.Retries)
kingpin.Flag("hec-workers", "How many workers (concurrency) when post data to HEC").
OverrideDefaultFromEnvar("HEC_WORKERS").Default("8").IntVar(&c.HecWorkers)
kingpin.Flag("refresh-splunk-connection", "Periodically refresh connection to Splunk").
OverrideDefaultFromEnvar("REFRESH_SPLUNK_CONNECTION").Default("false").BoolVar(&c.RefreshSplunkConnection)
kingpin.Flag("keep-alive-timer", "Interval used to close and refresh connection to Splunk").
OverrideDefaultFromEnvar("KEEP_ALIVE_TIMER").Default("30s").DurationVar(&c.KeepAliveTimer)

kingpin.Flag("enable-event-tracing", "Enable event trace logging: Adds splunk trace logging fields to events. uuid, subscription-id, nozzle event counter").
OverrideDefaultFromEnvar("ENABLE_EVENT_TRACING").Default("false").BoolVar(&c.TraceLogging)
Expand Down
42 changes: 23 additions & 19 deletions splunknozzle/nozzle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err

// EventWriter for writing events
writerConfig := &eventwriter.SplunkConfig{
Host: s.config.SplunkHost,
Token: s.config.SplunkToken,
Index: s.config.SplunkIndex,
SkipSSL: s.config.SkipSSLSplunk,
Debug: s.config.Debug,
Logger: s.logger,
Version: s.config.Version,
Host: s.config.SplunkHost,
Token: s.config.SplunkToken,
Index: s.config.SplunkIndex,
SkipSSL: s.config.SkipSSLSplunk,
Debug: s.config.Debug,
Logger: s.logger,
Version: s.config.Version,
RefreshSplunkConnection: s.config.RefreshSplunkConnection,
KeepAliveTimer: s.config.KeepAliveTimer,
}

var writers []eventwriter.Writer
Expand All @@ -112,18 +114,20 @@ func (s *SplunkFirehoseNozzle) EventSink(cache cache.Cache) (eventsink.Sink, err
nozzleUUID := uuid.New().String()

sinkConfig := &eventsink.SplunkConfig{
FlushInterval: s.config.FlushInterval,
QueueSize: s.config.QueueSize,
BatchSize: s.config.BatchSize,
Retries: s.config.Retries,
Hostname: s.config.JobHost,
SubscriptionID: s.config.SubscriptionID,
TraceLogging: s.config.TraceLogging,
ExtraFields: parsedExtraFields,
UUID: nozzleUUID,
Logger: s.logger,
LoggingIndex: s.config.SplunkLoggingIndex,
StatusMonitorInterval: s.config.StatusMonitorInterval,
FlushInterval: s.config.FlushInterval,
QueueSize: s.config.QueueSize,
BatchSize: s.config.BatchSize,
Retries: s.config.Retries,
Hostname: s.config.JobHost,
SubscriptionID: s.config.SubscriptionID,
TraceLogging: s.config.TraceLogging,
ExtraFields: parsedExtraFields,
UUID: nozzleUUID,
Logger: s.logger,
LoggingIndex: s.config.SplunkLoggingIndex,
StatusMonitorInterval: s.config.StatusMonitorInterval,
RefreshSplunkConnection: s.config.RefreshSplunkConnection,
KeepAliveTimer: s.config.KeepAliveTimer,
}

LowerAddAppInfo := strings.ToLower(s.config.AddAppInfo)
Expand Down
Loading