Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Csv header delimiter #370

Merged
merged 4 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370)

## 1.1.5 - 2021-07-15

Expand Down
45 changes: 45 additions & 0 deletions docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
Expand Down Expand Up @@ -144,6 +145,50 @@ Configuration:
}
```

</td>
</tr>
</table>

#### Parse the field `message` with differing delimiters for header and fields

Configuration:

```yaml
- type: csv_parser
parse_from: message
delimiter: "+"
header_delimiter: ","
header: 'id,severity,message'
```

<table>
<tr><td> Input record </td> <td> Output record </td></tr>
<tr>
<td>

```json
{
"timestamp": "",
"record": {
"message": "1+debug+\"\"Debug Message\"\""
}
}
```

</td>
<td>

```json
{
"timestamp": "",
"record": {
"id": "1",
"severity": "debug",
"message": "\"Debug Message\""
}
}
```

</td>
</tr>
</table>
43 changes: 25 additions & 18 deletions operator/builtin/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig {
type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`

Header string `json:"header" yaml:"header"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
Header string `json:"header" yaml:"header"`
HeaderDelimiter string `json:"header_delimiter,omitempty" yaml:"header_delimiter,omitempty"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
}

// Build will build a csv parser operator.
Expand All @@ -52,17 +53,24 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if !strings.Contains(c.Header, c.FieldDelimiter) {
return nil, fmt.Errorf("missing field delimiter in header")
if c.HeaderDelimiter == "" {
c.HeaderDelimiter = c.FieldDelimiter
}

numFields := len(strings.Split(c.Header, c.FieldDelimiter))
headerDelimiter := []rune(c.HeaderDelimiter)[0]

if !strings.Contains(c.Header, c.HeaderDelimiter) {
return nil, fmt.Errorf("missing header delimiter in header")
}

numFields := len(strings.Split(c.Header, c.HeaderDelimiter))

csvParser := &CSVParser{
ParserOperator: parserOperator,
header: c.Header,
fieldDelimiter: fieldDelimiter,
numFields: numFields,
ParserOperator: parserOperator,
header: c.Header,
headerDelimiter: headerDelimiter,
fieldDelimiter: fieldDelimiter,
numFields: numFields,
}

return []operator.Operator{csvParser}, nil
Expand All @@ -71,9 +79,10 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
// CSVParser is an operator that parses csv in an entry.
type CSVParser struct {
helper.ParserOperator
header string
fieldDelimiter rune
numFields int
header string
headerDelimiter rune
fieldDelimiter rune
numFields int
}

// Process will parse an entry for csv.
Expand All @@ -84,17 +93,15 @@ func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
switch value.(type) {
switch t := value.(type) {
case string:
csvLine += value.(string)
csvLine += t
case []byte:
csvLine += string(value.([]byte))
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

delimiterStr := string([]rune{r.fieldDelimiter})

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
Expand All @@ -110,7 +117,7 @@ func (r *CSVParser) parse(value interface{}) (interface{}, error) {
return nil, err
}

for i, key := range strings.Split(r.header, delimiterStr) {
for i, key := range strings.Split(r.header, string([]rune{r.headerDelimiter})) {
parsedValues[key] = record[i]
}
}
Expand Down
36 changes: 34 additions & 2 deletions operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func TestParserCSV(t *testing.T) {
"position": "agent",
},
},
{
"header-delimiter",
func(p *CSVParserConfig) {
p.Header = "name+sev+msg"
p.HeaderDelimiter = "+"
},
"stanza,INFO,started agent",
map[string]interface{}{
"name": "stanza",
"sev": "INFO",
"msg": "started agent",
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -270,7 +283,7 @@ func TestBuildParserCSV(t *testing.T) {
c.Header = "name"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing field delimiter in header")
require.Contains(t, err.Error(), "missing header delimiter in header")
})

t.Run("InvalidHeaderFieldWrongDelimiter", func(t *testing.T) {
Expand All @@ -286,6 +299,25 @@ func TestBuildParserCSV(t *testing.T) {
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing field delimiter in header")
require.Contains(t, err.Error(), "missing header delimiter in header")
})

t.Run("HeaderDelimiter", func(t *testing.T) {
c := newBasicCSVParser()
c.Header = "name+position+number"
c.HeaderDelimiter = "+"
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
})

t.Run("InvalidHeaderDelimiter", func(t *testing.T) {
c := newBasicCSVParser()
c.Header = "name,position,number"
c.HeaderDelimiter = "+"
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing header delimiter in header")
})
}