Skip to content

Commit

Permalink
Create queries in a safe way
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Jul 7, 2023
1 parent f09054f commit 610a5fc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
37 changes: 22 additions & 15 deletions plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ const (
maxBuffers = 5
)

const createTableCommand = `.create-merge table ['TABLE'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const createTableMappingCommand = `.create-or-alter table ['TABLE'] ingestion json mapping 'TABLEMAPPING' '[{"column":"fields", ` +
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` +
`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", ` +
`"Properties":{"Path":"$[\'timestamp\']"}}]'`
const managedIngestion = "managed"
const queuedIngestion = "queued"

Expand Down Expand Up @@ -166,7 +160,7 @@ func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.Fil
}

length := len(metricsArray)
adx.Log.Debugf("Writing %s metrics to table %q", length, tableName)
adx.Log.Debugf("Writing %d metrics to table %q", length, tableName)
reader := bytes.NewReader(metricsArray)
mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON)
if metricIngestor != nil {
Expand Down Expand Up @@ -202,17 +196,11 @@ func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context,
return nil
}

createStmt := kql.New(createTableCommand)
createParams := kusto.QueryParameters(kql.NewParameters().AddString("TABLE", tableName))
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt, createParams); err != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil {
return err
}

createTableMappingStmt := kql.New(createTableMappingCommand)
createTableMappingParams := kusto.QueryParameters(
kql.NewParameters().AddString("TABLE", tableName).AddString("TABLEMAPPING", tableName+"_mapping"),
)
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingStmt, createTableMappingParams); err != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil {
return err
}

Expand Down Expand Up @@ -276,3 +264,22 @@ func createIngestorByTable(client *kusto.Client, database string, tableName stri
}
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion)
}

func createTableCommand(table string) kusto.Statement {
builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`)

return builder
}

func createTableMappingCommand(table string) kusto.Statement {
builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `)
builder.AddLiteral(`'[{"column":"fields", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`)

return builder
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ func TestInitBlankEndpointData(t *testing.T) {
require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error())
}

func TestQueryConstruction(t *testing.T) {
const tableName = "mytable"
const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const expectedMapping = `.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
require.Equal(t, expectedCreate, createTableCommand(tableName).String())
require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String())
}

type fakeIngestor struct {
actualOutputMetric map[string]interface{}
}
Expand Down

0 comments on commit 610a5fc

Please sign in to comment.