Skip to content

Commit

Permalink
Switch timestamp field from pointer to string
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj committed Jul 24, 2024
1 parent 98399b6 commit 449c234
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
10 changes: 5 additions & 5 deletions plugins/outputs/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type metricGroup struct {
type Parquet struct {
Directory string `toml:"directory"`
RotationInterval config.Duration `toml:"rotation_interval"`
TimestampFieldName *string `toml:"timestamp_field_name"`
TimestampFieldName string `toml:"timestamp_field_name"`
Log telegraf.Logger `toml:"-"`

metricGroups map[string]*metricGroup
Expand Down Expand Up @@ -156,7 +156,7 @@ func (p *Parquet) rotateIfNeeded(name string) error {
func (p *Parquet) createRecord(metrics []telegraf.Metric, builder *array.RecordBuilder, schema *arrow.Schema) (arrow.Record, error) {
for index, col := range schema.Fields() {
for _, m := range metrics {
if p.TimestampFieldName != nil && *p.TimestampFieldName != "" && col.Name == *p.TimestampFieldName {
if p.TimestampFieldName != "" && col.Name == p.TimestampFieldName {
builder.Field(index).(*array.Int64Builder).Append(m.Time().UnixNano())
continue
}
Expand Down Expand Up @@ -265,9 +265,9 @@ func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error)
})
}

if p.TimestampFieldName != nil && *p.TimestampFieldName != "" {
if p.TimestampFieldName != "" {
fields = append(fields, arrow.Field{
Name: *p.TimestampFieldName,
Name: p.TimestampFieldName,
Type: arrow.PrimitiveTypes.Int64,
})
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func goToArrowType(value interface{}) (arrow.DataType, error) {
func init() {
outputs.Add("parquet", func() telegraf.Output {
return &Parquet{
TimestampFieldName: &defaultTimestampFieldName,
TimestampFieldName: defaultTimestampFieldName,
}
})
}
38 changes: 36 additions & 2 deletions plugins/outputs/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestCases(t *testing.T) {
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
TimestampFieldName: &defaultTimestampFieldName,
TimestampFieldName: defaultTimestampFieldName,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestRotation(t *testing.T) {
plugin := &Parquet{
Directory: testDir,
RotationInterval: config.Duration(1 * time.Second),
TimestampFieldName: &defaultTimestampFieldName,
TimestampFieldName: defaultTimestampFieldName,
}

require.NoError(t, plugin.Init())
Expand Down Expand Up @@ -210,3 +210,37 @@ func TestOmitTimestamp(t *testing.T) {
require.Equal(t, 1, int(metadata.NumRows))
require.Equal(t, 1, metadata.Schema.NumColumns())
}

func TestTimestampDifferentName(t *testing.T) {
metrics := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
}

testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
TimestampFieldName: "time",
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write(metrics))
require.NoError(t, plugin.Close())

files, err := os.ReadDir(testDir)
require.NoError(t, err)
require.Len(t, files, 1)
reader, err := file.OpenParquetFile(filepath.Join(testDir, files[0].Name()), false)
require.NoError(t, err)
defer reader.Close()

metadata := reader.MetaData()
require.Equal(t, 1, int(metadata.NumRows))
require.Equal(t, 2, metadata.Schema.NumColumns())
}

0 comments on commit 449c234

Please sign in to comment.