Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[datadog_logs_custom_pipeline] Add reference table logs processor #1800

Merged
merged 5 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 101 additions & 44 deletions datadog/resource_datadog_logs_custom_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,61 @@ import (
var logCustomPipelineMutex = sync.Mutex{}

const (
tfArithmeticProcessor = "arithmetic_processor"
tfAttributeRemapperProcessor = "attribute_remapper"
tfCategoryProcessor = "category_processor"
tfDateRemapperProcessor = "date_remapper"
tfGeoIPParserProcessor = "geo_ip_parser"
tfGrokParserProcessor = "grok_parser"
tfLookupProcessor = "lookup_processor"
tfMessageRemapperProcessor = "message_remapper"
tfNestedPipelineProcessor = "pipeline"
tfServiceRemapperProcessor = "service_remapper"
tfStatusRemapperProcessor = "status_remapper"
tfStringBuilderProcessor = "string_builder_processor"
tfTraceIDRemapperProcessor = "trace_id_remapper"
tfURLParserProcessor = "url_parser"
tfUserAgentParserProcessor = "user_agent_parser"
tfArithmeticProcessor = "arithmetic_processor"
tfAttributeRemapperProcessor = "attribute_remapper"
tfCategoryProcessor = "category_processor"
tfDateRemapperProcessor = "date_remapper"
tfGeoIPParserProcessor = "geo_ip_parser"
tfGrokParserProcessor = "grok_parser"
tfLookupProcessor = "lookup_processor"
tfReferenceTableLookupProcessor = "reference_table_lookup_processor"
tfMessageRemapperProcessor = "message_remapper"
tfNestedPipelineProcessor = "pipeline"
tfServiceRemapperProcessor = "service_remapper"
tfStatusRemapperProcessor = "status_remapper"
tfStringBuilderProcessor = "string_builder_processor"
tfTraceIDRemapperProcessor = "trace_id_remapper"
tfURLParserProcessor = "url_parser"
tfUserAgentParserProcessor = "user_agent_parser"
// This type string is used to differentiate between LookupProcessor and ReferenceTableLookupProcessor, due to them sharing a `type` in the API.
ddReferenceTableLookupProcessor = "reference-table-" + string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR)
)

var tfProcessorTypes = map[string]string{
tfArithmeticProcessor: string(datadogV1.LOGSARITHMETICPROCESSORTYPE_ARITHMETIC_PROCESSOR),
tfAttributeRemapperProcessor: string(datadogV1.LOGSATTRIBUTEREMAPPERTYPE_ATTRIBUTE_REMAPPER),
tfCategoryProcessor: string(datadogV1.LOGSCATEGORYPROCESSORTYPE_CATEGORY_PROCESSOR),
tfDateRemapperProcessor: string(datadogV1.LOGSDATEREMAPPERTYPE_DATE_REMAPPER),
tfGeoIPParserProcessor: string(datadogV1.LOGSGEOIPPARSERTYPE_GEO_IP_PARSER),
tfGrokParserProcessor: string(datadogV1.LOGSGROKPARSERTYPE_GROK_PARSER),
tfLookupProcessor: string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR),
tfMessageRemapperProcessor: string(datadogV1.LOGSMESSAGEREMAPPERTYPE_MESSAGE_REMAPPER),
tfNestedPipelineProcessor: string(datadogV1.LOGSPIPELINEPROCESSORTYPE_PIPELINE),
tfServiceRemapperProcessor: string(datadogV1.LOGSSERVICEREMAPPERTYPE_SERVICE_REMAPPER),
tfStatusRemapperProcessor: string(datadogV1.LOGSSTATUSREMAPPERTYPE_STATUS_REMAPPER),
tfStringBuilderProcessor: string(datadogV1.LOGSSTRINGBUILDERPROCESSORTYPE_STRING_BUILDER_PROCESSOR),
tfTraceIDRemapperProcessor: string(datadogV1.LOGSTRACEREMAPPERTYPE_TRACE_ID_REMAPPER),
tfURLParserProcessor: string(datadogV1.LOGSURLPARSERTYPE_URL_PARSER),
tfUserAgentParserProcessor: string(datadogV1.LOGSUSERAGENTPARSERTYPE_USER_AGENT_PARSER),
tfArithmeticProcessor: string(datadogV1.LOGSARITHMETICPROCESSORTYPE_ARITHMETIC_PROCESSOR),
tfAttributeRemapperProcessor: string(datadogV1.LOGSATTRIBUTEREMAPPERTYPE_ATTRIBUTE_REMAPPER),
tfCategoryProcessor: string(datadogV1.LOGSCATEGORYPROCESSORTYPE_CATEGORY_PROCESSOR),
tfDateRemapperProcessor: string(datadogV1.LOGSDATEREMAPPERTYPE_DATE_REMAPPER),
tfGeoIPParserProcessor: string(datadogV1.LOGSGEOIPPARSERTYPE_GEO_IP_PARSER),
tfGrokParserProcessor: string(datadogV1.LOGSGROKPARSERTYPE_GROK_PARSER),
tfLookupProcessor: string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR),
tfReferenceTableLookupProcessor: ddReferenceTableLookupProcessor,
tfMessageRemapperProcessor: string(datadogV1.LOGSMESSAGEREMAPPERTYPE_MESSAGE_REMAPPER),
tfNestedPipelineProcessor: string(datadogV1.LOGSPIPELINEPROCESSORTYPE_PIPELINE),
tfServiceRemapperProcessor: string(datadogV1.LOGSSERVICEREMAPPERTYPE_SERVICE_REMAPPER),
tfStatusRemapperProcessor: string(datadogV1.LOGSSTATUSREMAPPERTYPE_STATUS_REMAPPER),
tfStringBuilderProcessor: string(datadogV1.LOGSSTRINGBUILDERPROCESSORTYPE_STRING_BUILDER_PROCESSOR),
tfTraceIDRemapperProcessor: string(datadogV1.LOGSTRACEREMAPPERTYPE_TRACE_ID_REMAPPER),
tfURLParserProcessor: string(datadogV1.LOGSURLPARSERTYPE_URL_PARSER),
tfUserAgentParserProcessor: string(datadogV1.LOGSUSERAGENTPARSERTYPE_USER_AGENT_PARSER),
}

var tfProcessors = map[string]*schema.Schema{
tfArithmeticProcessor: arithmeticProcessor,
tfAttributeRemapperProcessor: attributeRemapper,
tfCategoryProcessor: categoryProcessor,
tfDateRemapperProcessor: dateRemapper,
tfGeoIPParserProcessor: geoIPParser,
tfGrokParserProcessor: grokParser,
tfLookupProcessor: lookupProcessor,
tfMessageRemapperProcessor: messageRemapper,
tfServiceRemapperProcessor: serviceRemapper,
tfStatusRemapperProcessor: statusRemmaper,
tfStringBuilderProcessor: stringBuilderProcessor,
tfTraceIDRemapperProcessor: traceIDRemapper,
tfURLParserProcessor: urlParser,
tfUserAgentParserProcessor: userAgentParser,
tfArithmeticProcessor: arithmeticProcessor,
tfAttributeRemapperProcessor: attributeRemapper,
tfCategoryProcessor: categoryProcessor,
tfDateRemapperProcessor: dateRemapper,
tfGeoIPParserProcessor: geoIPParser,
tfGrokParserProcessor: grokParser,
tfLookupProcessor: lookupProcessor,
tfReferenceTableLookupProcessor: referenceTableLookupProcessor,
tfMessageRemapperProcessor: messageRemapper,
tfServiceRemapperProcessor: serviceRemapper,
tfStatusRemapperProcessor: statusRemmaper,
tfStringBuilderProcessor: stringBuilderProcessor,
tfTraceIDRemapperProcessor: traceIDRemapper,
tfURLParserProcessor: urlParser,
tfUserAgentParserProcessor: userAgentParser,
}

var ddProcessorTypes = map[string]string{
Expand All @@ -77,6 +82,7 @@ var ddProcessorTypes = map[string]string{
string(datadogV1.LOGSGEOIPPARSERTYPE_GEO_IP_PARSER): tfGeoIPParserProcessor,
string(datadogV1.LOGSGROKPARSERTYPE_GROK_PARSER): tfGrokParserProcessor,
string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR): tfLookupProcessor,
ddReferenceTableLookupProcessor: tfReferenceTableLookupProcessor,
string(datadogV1.LOGSMESSAGEREMAPPERTYPE_MESSAGE_REMAPPER): tfMessageRemapperProcessor,
string(datadogV1.LOGSPIPELINEPROCESSORTYPE_PIPELINE): tfNestedPipelineProcessor,
string(datadogV1.LOGSSERVICEREMAPPERTYPE_SERVICE_REMAPPER): tfServiceRemapperProcessor,
Expand Down Expand Up @@ -251,6 +257,22 @@ var lookupProcessor = &schema.Schema{
},
}

var referenceTableLookupProcessor = &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Description: "Reference Table Lookup Processor. Reference Tables are in public beta. More information can be found in the [official docs](https://docs.datadoghq.com/logs/processing/processors/?tab=ui#lookup-processor)",
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {Description: "Name of the processor", Type: schema.TypeString, Optional: true},
"is_enabled": {Description: "If the processor is enabled or not.", Type: schema.TypeBool, Optional: true},
"source": {Description: "Name of the source attribute used to do the lookup.", Type: schema.TypeString, Required: true},
"target": {Description: "Name of the attribute that contains the result of the lookup.", Type: schema.TypeString, Required: true},
"lookup_enrichment_table": {Description: "Name of the Reference Table for the source attribute and their associated target attribute values.", Type: schema.TypeString, Required: true},
},
},
}

var messageRemapper = &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Expand Down Expand Up @@ -511,6 +533,9 @@ func buildTerraformProcessor(ddProcessor datadogV1.LogsProcessor) (map[string]in
} else if ddProcessor.LogsLookupProcessor != nil {
tfProcessor = buildTerraformLookupProcessor(ddProcessor.LogsLookupProcessor)
processorType = string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR)
} else if ddProcessor.ReferenceTableLogsLookupProcessor != nil {
tfProcessor = buildTerraformReferenceTableLookupProcessor(ddProcessor.ReferenceTableLogsLookupProcessor)
processorType = ddReferenceTableLookupProcessor
} else if ddProcessor.LogsPipelineProcessor != nil {
tfProcessor, err = buildTerraformNestedPipeline(ddProcessor.LogsPipelineProcessor)
processorType = string(datadogV1.LOGSPIPELINEPROCESSORTYPE_PIPELINE)
Expand Down Expand Up @@ -570,6 +595,16 @@ func buildTerraformLookupProcessor(ddLookup *datadogV1.LogsLookupProcessor) map[
return tfProcessor
}

func buildTerraformReferenceTableLookupProcessor(ddLookup *datadogV1.ReferenceTableLogsLookupProcessor) map[string]interface{} {
return map[string]interface{}{
"source": ddLookup.GetSource(),
"target": ddLookup.GetTarget(),
"lookup_enrichment_table": ddLookup.GetLookupEnrichmentTable(),
"name": ddLookup.GetName(),
"is_enabled": ddLookup.GetIsEnabled(),
}
}

func buildTerraformNestedPipeline(ddNested *datadogV1.LogsPipelineProcessor) (map[string]interface{}, error) {
tfProcessors, err := buildTerraformProcessors(ddNested.GetProcessors())
if err != nil {
Expand Down Expand Up @@ -775,6 +810,8 @@ func buildDatadogProcessor(ddProcessorType string, tfProcessor map[string]interf
ddProcessor = datadogV1.LogsGrokParserAsLogsProcessor(buildDatadogGrokParser(tfProcessor))
case string(datadogV1.LOGSLOOKUPPROCESSORTYPE_LOOKUP_PROCESSOR):
ddProcessor = datadogV1.LogsLookupProcessorAsLogsProcessor(buildDatadogLookupProcessor(tfProcessor))
case ddReferenceTableLookupProcessor:
ddProcessor = datadogV1.ReferenceTableLogsLookupProcessorAsLogsProcessor(buildDatadogReferenceTableLookupProcessor(tfProcessor))
case string(datadogV1.LOGSPIPELINEPROCESSORTYPE_PIPELINE):
ddNestedPipeline, err := buildDatadogNestedPipeline(tfProcessor)
if err != nil {
Expand Down Expand Up @@ -865,6 +902,26 @@ func buildDatadogLookupProcessor(tfProcessor map[string]interface{}) *datadogV1.
return ddLookupProcessor
}

func buildDatadogReferenceTableLookupProcessor(tfProcessor map[string]interface{}) *datadogV1.ReferenceTableLogsLookupProcessor {
ddLookupProcessor := datadogV1.NewReferenceTableLogsLookupProcessorWithDefaults()
if tfSource, exists := tfProcessor["source"].(string); exists {
ddLookupProcessor.SetSource(tfSource)
}
if tfTarget, exists := tfProcessor["target"].(string); exists {
ddLookupProcessor.SetTarget(tfTarget)
}
if tfName, exists := tfProcessor["name"].(string); exists {
ddLookupProcessor.SetName(tfName)
}
if tfIsEnabled, exists := tfProcessor["is_enabled"].(bool); exists {
ddLookupProcessor.SetIsEnabled(tfIsEnabled)
}
if tfReferenceTable, exists := tfProcessor["lookup_enrichment_table"].(string); exists {
ddLookupProcessor.SetLookupEnrichmentTable(tfReferenceTable)
}
return ddLookupProcessor
}

func buildDatadogNestedPipeline(tfProcessor map[string]interface{}) (*datadogV1.LogsPipelineProcessor, error) {
ddNestedPipeline := datadogV1.NewLogsPipelineProcessorWithDefaults()
if tfFilter, exist := tfProcessor["filter"].([]interface{}); exist && len(tfFilter) > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2021-03-12T17:12:44.557609-05:00
2023-03-06T13:31:10.412492-05:00
Loading