Skip to content

Commit

Permalink
APIGOV-27681 - event generator updates for processing (#864)
Browse files Browse the repository at this point in the history
* create event report interface and builder

* use new event report builder

* linting fixes

* update gitignore

* enhance event report for single log events

* use enhanced event report interfaces

* updates from tests

* fix message

* return event from process summary

* updates to event report handling
  • Loading branch information
jcollins-axway authored Jan 16, 2025
1 parent 59072bc commit cdee035
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 102 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ main-resources.json
sub-resources.json
vendor
sample.sequence
**/data/cache
220 changes: 144 additions & 76 deletions pkg/transaction/eventgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transaction

import (
"encoding/json"
"errors"
"strings"
"time"

Expand All @@ -16,7 +17,7 @@ import (
"github.com/Axway/agent-sdk/pkg/transaction/metric"
"github.com/Axway/agent-sdk/pkg/transaction/models"
transutil "github.com/Axway/agent-sdk/pkg/transaction/util"
"github.com/Axway/agent-sdk/pkg/util/errors"
sdkErrors "github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -28,6 +29,7 @@ type EventGenerator interface {
CreateEvent(logEvent LogEvent, eventTime time.Time, metaData common.MapStr, fields common.MapStr, privateData interface{}) (event beat.Event, err error) // DEPRECATED
CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, fields common.MapStr, privateData interface{}) (events []beat.Event, err error)
SetUseTrafficForAggregation(useTrafficForAggregation bool)
CreateFromEventReport(eventReport EventReport) (events []beat.Event, err error)
}

// Generator - Create the events to be published to Condor
Expand Down Expand Up @@ -59,19 +61,127 @@ func (e *Generator) SetUseTrafficForAggregation(useTrafficForAggregation bool) {

// CreateEvent - Creates a new event to be sent to Amplify Observability, expects sampling is handled by agent
func (e *Generator) CreateEvent(logEvent LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) (beat.Event, error) {
// if CreateEvent is being used, sampling will not work, so all events need to be sent
if metaData == nil {
metaData = common.MapStr{}
builder := NewEventReportBuilder().
SetEventTime(eventTime).
SetMetadata(metaData).
SetFields(eventFields).
SetPrivateData(privateData).
SetForceSample()

// set the proper log event type
if logEvent.TransactionSummary != nil {
builder = builder.SetSummaryEvent(logEvent)
} else {
builder = builder.SetDetailEvents([]LogEvent{logEvent}).SetSkipMetricTracking()
}
metaData.Put(sampling.SampleKey, true)

if logEvent.TransactionSummary != nil {
report, err := builder.Build()
if err != nil {
return beat.Event{}, err
}

events, err := e.CreateFromEventReport(report)
if err != nil {
return beat.Event{}, err
}
if len(events) == 0 {
return beat.Event{}, errors.New("an event was not created")
}
if len(events) > 1 {
return events[0], errors.New("unexpectedly, more than one event was created, only returning the first")
}

// will only ever have 1 beat event returned
return events[0], nil
}

// CreateEvents - Creates new events to be sent to Amplify Observability
func (e *Generator) CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
report, err := NewEventReportBuilder().
SetSummaryEvent(summaryEvent).
SetDetailEvents(detailEvents).
SetEventTime(eventTime).
SetMetadata(metaData).
SetFields(eventFields).
SetPrivateData(privateData).
Build()
if err != nil {
return []beat.Event{}, err
}

return e.CreateFromEventReport(report)
}

// CreateEvent - Creates a new event to be sent to Amplify Observability, expects sampling is handled by agent
func (e *Generator) CreateFromEventReport(eventReport EventReport) ([]beat.Event, error) {
events := make([]beat.Event, 0)
logger := e.logger

// add logging fields from summary event
if eventReport.GetSummaryEvent() != (LogEvent{}) {
logger.WithField("transactionID", eventReport.GetSummaryEvent().TransactionID)
} else if len(eventReport.GetDetailEvents()) > 0 {
logger.WithField("transactionID", eventReport.GetDetailEvents()[0].TransactionID)
}

bytes := e.getBytesSent(eventReport.GetDetailEvents())
if eventReport.ShouldTrackMetrics() && eventReport.GetSummaryEvent() != (LogEvent{}) {
e.trackMetrics(eventReport.GetSummaryEvent(), int64(bytes))
}

if eventReport.ShouldOnlyTrackMetrics() {
logger.Trace("not generating events, only tracking for metrics")
return events, nil
}

// See if the uri is in the api exceptions list
if e.isInAPIExceptionsList(eventReport.GetDetailEvents()) {
logger.Debug("found api path in traceability api exceptions list, ignore transaction event")
return events, nil
}

//set up the sampling metadata if set to force it
metadata := eventReport.GetMetadata()
if eventReport.ShouldForceSample() {
logger.Trace("sampling event")
metadata = SetSampleInMetadata(metadata)
}

//if no summary is sent then prepare the array of TransactionEvents for publishing
if eventReport.GetSummaryEvent() == (LogEvent{}) {
return e.handleTransactionEvents(eventReport.GetDetailEvents(), eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
}

// Check to see if marketplace provisioning/subs is enabled
newSummaryEvent, err := e.processTxnSummary(eventReport.GetSummaryEvent())
if err != nil {
logger.WithError(err).Trace("handling summary event")
return events, err
}

if eventReport.ShouldHandleSampling() && !eventReport.ShouldForceSample() {
shouldSample, err := sampling.ShouldSampleTransaction(e.createSamplingTransactionDetails(eventReport.GetSummaryEvent()))
if err != nil || !shouldSample {
// do not need to create the event structure if it will not be sampled
return events, err
}
metadata = SetSampleInMetadata(metadata)
}

newEvent, err := e.createEvent(newSummaryEvent, eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
if err != nil {
logger.WithError(err).Trace("handling summary event")
return events, err
}

e.processTxnSummary(logEvent)
e.trackMetrics(logEvent, 0)
detailEvents, err := e.handleTransactionEvents(eventReport.GetDetailEvents(), eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
if err != nil {
logger.WithError(err).Trace("handling detail event(s)")
return events, err
}

return e.createEvent(logEvent, eventTime, metaData, eventFields, privateData)
events = append(events, newEvent)
return append(events, detailEvents...), nil
}

func (e *Generator) trackMetrics(summaryEvent LogEvent, bytes int64) {
Expand All @@ -93,7 +203,7 @@ func (e *Generator) trackMetrics(summaryEvent LogEvent, bytes int64) {
appDetails := models.AppDetails{}
if summaryEvent.TransactionSummary.Application != nil {
appDetails.Name = summaryEvent.TransactionSummary.Application.Name
appDetails.ID = strings.TrimLeft(summaryEvent.TransactionSummary.Application.ID, SummaryEventApplicationIDPrefix)
appDetails.ID = strings.ReplaceAll(summaryEvent.TransactionSummary.Application.ID, SummaryEventApplicationIDPrefix, "")
}

collector := metric.GetMetricCollector()
Expand All @@ -118,11 +228,8 @@ func (e *Generator) createEvent(logEvent LogEvent, eventTime time.Time, metaData
return event, err
}

eventData := eventFields
// No need to get the other field data if not being sampled
if sampled, found := metaData[sampling.SampleKey]; found && sampled.(bool) {
eventData, err = e.createEventData(serializedLogEvent, eventFields)
}
eventData, err := e.createEventData(serializedLogEvent, eventFields)
if err != nil {
return event, err
}
Expand All @@ -135,61 +242,14 @@ func (e *Generator) createEvent(logEvent LogEvent, eventTime time.Time, metaData
}, nil
}

// CreateEvents - Creates new events to be sent to Amplify Observability
func (e *Generator) CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
events := make([]beat.Event, 0)

// See if the uri is in the api exceptions list
if e.isInAPIExceptionsList(detailEvents) {
e.logger.Debug("Found api path in traceability api exceptions list. Ignore transaction event")
return events, nil
func (e *Generator) getBytesSent(detailEvents []LogEvent) int {
if len(detailEvents) == 0 || detailEvents[0].TransactionEvent == nil || detailEvents[0].TransactionEvent.Protocol == nil {
return 0
}

// Check to see if marketplace provisioning/subs is enabled
err := e.processTxnSummary(summaryEvent)
if err != nil {
return nil, err
}

//if no summary is sent then prepare the array of TransactionEvents for publishing
if summaryEvent == (LogEvent{}) {
return e.handleTransactionEvents(detailEvents, eventTime, metaData, eventFields, privateData)
}

shouldSample, err := sampling.ShouldSampleTransaction(e.createSamplingTransactionDetails(summaryEvent))
if err != nil {
return events, err
if httpEvent, ok := detailEvents[0].TransactionEvent.Protocol.(*Protocol); ok {
return httpEvent.BytesSent
}
if shouldSample {
if metaData == nil {
metaData = common.MapStr{}
}
metaData.Put(sampling.SampleKey, true)
}

newEvent, err := e.createEvent(summaryEvent, eventTime, metaData, eventFields, privateData)

if err != nil {
return events, err
}

events = append(events, newEvent)
for _, event := range detailEvents {
newEvent, err := e.createEvent(event, eventTime, metaData, eventFields, privateData)
if err == nil {
events = append(events, newEvent)
}
}

bytes := 0
if len(detailEvents) > 0 {
if httpEvent, ok := detailEvents[0].TransactionEvent.Protocol.(*Protocol); ok {
bytes = httpEvent.BytesSent
}
}
e.trackMetrics(summaryEvent, int64(bytes))

return events, nil
return 0
}

func (e *Generator) handleTransactionEvents(detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
Expand All @@ -200,32 +260,32 @@ func (e *Generator) handleTransactionEvents(detailEvents []LogEvent, eventTime t
}
metaData.Put(sampling.SampleKey, true)
newEvent, err := e.createEvent(event, eventTime, metaData, eventFields, privateData)
if err == nil {
events = append(events, newEvent)
if err != nil {
return nil, err
}
events = append(events, newEvent)
}

return events, nil

}

func (e *Generator) processTxnSummary(summaryEvent LogEvent) error {
func (e *Generator) processTxnSummary(summaryEvent LogEvent) (LogEvent, error) {
// only process if there is a central client and marketplace subs are enabled
if agent.GetCentralClient() == nil {
return nil
return summaryEvent, nil
}
if summaryEvent.TransactionSummary != nil {
txnSummary := e.updateTxnSummaryByAccessRequest(summaryEvent)
if txnSummary != nil {
jsonData, err := json.Marshal(&txnSummary)
if err != nil {
return err
return summaryEvent, err
}
e.logger.Trace(string(jsonData))
summaryEvent.TransactionSummary = txnSummary
}
}
return nil
return summaryEvent, nil
}

// updateTxnSummaryByAccessRequest - get the consumer information to add to transaction event. If we don't have any
Expand Down Expand Up @@ -329,7 +389,7 @@ func (e *Generator) createSamplingTransactionDetails(summaryEvent LogEvent) samp
func (e *Generator) isInAPIExceptionsList(logEvents []LogEvent) bool {

// Sanity check.
if len(logEvents) == 0 {
if len(logEvents) == 0 || logEvents[0].TransactionEvent == nil || logEvents[0].TransactionEvent.Protocol == nil {
return false
}

Expand Down Expand Up @@ -362,7 +422,7 @@ func (e *Generator) healthcheck(name string) *hc.Status {
if err != nil {
status = &hc.Status{
Result: hc.FAIL,
Details: errors.Wrap(apic.ErrAuthenticationCall, err.Error()).Error(),
Details: sdkErrors.Wrap(apic.ErrAuthenticationCall, err.Error()).Error(),
}
}

Expand Down Expand Up @@ -403,6 +463,14 @@ func (e *Generator) createEventFields() (fields map[string]string, err error) {
return
}

func SetSampleInMetadata(metadata common.MapStr) common.MapStr {
if metadata == nil {
metadata = common.MapStr{}
}
metadata.Put(sampling.SampleKey, true)
return metadata
}

// updateWithProviderDetails -
func updateWithProviderDetails(accessRequest *management.AccessRequest, managedApp *v1.ResourceInstance, summaryEvent *Summary, log log.FieldLogger) *Summary {

Expand Down
Loading

0 comments on commit cdee035

Please sign in to comment.