diff --git a/plugins/outputs/parquet/parquet.go b/plugins/outputs/parquet/parquet.go index 39e459cb1c4bf..7961e03c645d3 100644 --- a/plugins/outputs/parquet/parquet.go +++ b/plugins/outputs/parquet/parquet.go @@ -34,7 +34,7 @@ type metricGroup struct { type Parquet struct { Directory string `toml:"directory"` RotationInterval config.Duration `toml:"rotation_interval"` - TimestampFieldName *string `toml:"timestamp_field_name"` + TimestampFieldName string `toml:"timestamp_field_name"` Log telegraf.Logger `toml:"-"` metricGroups map[string]*metricGroup @@ -156,7 +156,7 @@ func (p *Parquet) rotateIfNeeded(name string) error { func (p *Parquet) createRecord(metrics []telegraf.Metric, builder *array.RecordBuilder, schema *arrow.Schema) (arrow.Record, error) { for index, col := range schema.Fields() { for _, m := range metrics { - if p.TimestampFieldName != nil && *p.TimestampFieldName != "" && col.Name == *p.TimestampFieldName { + if p.TimestampFieldName != "" && col.Name == p.TimestampFieldName { builder.Field(index).(*array.Int64Builder).Append(m.Time().UnixNano()) continue } @@ -265,9 +265,9 @@ func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error) }) } - if p.TimestampFieldName != nil && *p.TimestampFieldName != "" { + if p.TimestampFieldName != "" { fields = append(fields, arrow.Field{ - Name: *p.TimestampFieldName, + Name: p.TimestampFieldName, Type: arrow.PrimitiveTypes.Int64, }) } @@ -330,7 +330,7 @@ func goToArrowType(value interface{}) (arrow.DataType, error) { func init() { outputs.Add("parquet", func() telegraf.Output { return &Parquet{ - TimestampFieldName: &defaultTimestampFieldName, + TimestampFieldName: defaultTimestampFieldName, } }) } diff --git a/plugins/outputs/parquet/parquet_test.go b/plugins/outputs/parquet/parquet_test.go index 0c54a068f135b..e4ab820d3e4f8 100644 --- a/plugins/outputs/parquet/parquet_test.go +++ b/plugins/outputs/parquet/parquet_test.go @@ -126,7 +126,7 @@ func TestCases(t *testing.T) { testDir := t.TempDir() plugin := &Parquet{ Directory: testDir, - TimestampFieldName: &defaultTimestampFieldName, + TimestampFieldName: defaultTimestampFieldName, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -164,7 +164,7 @@ func TestRotation(t *testing.T) { plugin := &Parquet{ Directory: testDir, RotationInterval: config.Duration(1 * time.Second), - TimestampFieldName: &defaultTimestampFieldName, + TimestampFieldName: defaultTimestampFieldName, } require.NoError(t, plugin.Init()) @@ -210,3 +210,37 @@ func TestOmitTimestamp(t *testing.T) { require.Equal(t, 1, int(metadata.NumRows)) require.Equal(t, 1, metadata.Schema.NumColumns()) } + +func TestTimestampDifferentName(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{}, + map[string]interface{}{ + "value": 1.0, + }, + time.Now(), + ), + } + + testDir := t.TempDir() + plugin := &Parquet{ + Directory: testDir, + TimestampFieldName: "time", + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Close()) + + files, err := os.ReadDir(testDir) + require.NoError(t, err) + require.Len(t, files, 1) + reader, err := file.OpenParquetFile(filepath.Join(testDir, files[0].Name()), false) + require.NoError(t, err) + defer reader.Close() + + metadata := reader.MetaData() + require.Equal(t, 1, int(metadata.NumRows)) + require.Equal(t, 2, metadata.Schema.NumColumns()) +}