From 2ee7c86dc8adce77db09ed3f238485e7e3c1c748 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 23 Sep 2024 14:15:35 +0930
Subject: [PATCH 1/3] x-pack/filebeat/input/azureblobstorage: add support for
 CSV decoding

The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz
---
 CHANGELOG.next.asciidoc                       |   1 +
 .../inputs/input-azure-blob-storage.asciidoc  |  55 ++
 .../filebeat/input/azureblobstorage/config.go |   9 +
 .../input/azureblobstorage/decoding.go        |  47 ++
 .../input/azureblobstorage/decoding_config.go |  54 ++
 .../input/azureblobstorage/decoding_csv.go    | 139 ++++
 .../input/azureblobstorage/decoding_test.go   | 237 +++++++
 .../filebeat/input/azureblobstorage/input.go  |   2 +
 .../input/azureblobstorage/input_stateless.go |   1 +
 x-pack/filebeat/input/azureblobstorage/job.go |  70 ++-
 .../input/azureblobstorage/testdata/txn.csv   |   5 +
 .../azureblobstorage/testdata/txn.csv.gz      | Bin 0 -> 2527 bytes
 .../input/azureblobstorage/testdata/txn.json  | 594 ++++++++++++++++++
 .../filebeat/input/azureblobstorage/types.go  |   1 +
 14 files changed, 1195 insertions(+), 20 deletions(-)
 create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding.go
 create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_config.go
 create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_csv.go
 create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_test.go
 create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.csv
 create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz
 create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.json

diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 46a17a8fea97..ac99bb1574da 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -316,6 +316,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
 - Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779]
 - Add CSV decoder to awss3 input. {pull}40896[40896]
 - Change request trace logging to include headers instead of complete request. {pull}41072[41072]
+- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
 
 *Auditbeat*
 
diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
index d6c1b4c90501..0ee02cf91d78 100644
--- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
+++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
@@ -247,6 +247,61 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds.
 This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always 
 take priority and override the root level values if both are specified.
 
+[id="input-{type}-encoding"]
+[float]
+==== `encoding`
+
+The file encoding to use for reading data that contains international
+characters. This only applies to non-JSON logs. See <<_encoding_3>>.
+
+[id="input-{type}-decoding"]
+[float]
+==== `decoding`
+
+The file decoding option is used to specify a codec that will be used to
+decode the file contents. This can apply to any file stream data.
+An example config is shown below:
+
+Currently supported codecs are given below:-
+
+    1. <<attrib-decoding-csv-azureblobstorage,CSV>>: This codec decodes RFC 4180 CSV data streams.
+
+[id="attrib-decoding-csv-azureblobstorage"]
+[float]
+==== `the CSV codec`
+The `CSV` codec is used to decode RFC 4180 CSV data streams.
+Enabling the codec without other options will use the default codec options.
+
+[source,yaml]
+----
+  decoding.codec.csv.enabled: true
+----
+
+The CSV codec supports five sub attributes to control aspects of CSV decoding.
+The `comma` attribute specifies the field separator character used by the CSV
+format. If it is not specified, the comma character '`,`' is used. The `comment`
+attribute specifies the character that should be interpreted as a comment mark.
+If it is specified, lines starting with the character will be ignored. Both
+`comma` and `comment` must be single characters. The `lazy_quotes` attribute
+controls how quoting in fields is handled. If `lazy_quotes` is true, a quote may
+appear in an unquoted field and a non-doubled quote may appear in a quoted field.
+The `trim_leading_space` attribute specifies that leading white space should be
+ignored, even if the `comma` character is white space. For complete details
+of the preceding configuration attribute behaviors, see the CSV decoder
+https://pkg.go.dev/encoding/csv#Reader[documentation] The `fields_names`
+attribute can be used to specify the column names for the data. If it is
+absent, the field names are obtained from the first non-comment line of
+data. The number of fields must match the number of field names.
+
+An example config is shown below:
+
+[source,yaml]
+----
+  decoding.codec.csv.enabled: true
+  decoding.codec.csv.comma: "\t"
+  decoding.codec.csv.comment: "#"
+----
+
 [id="attrib-file_selectors"]
 [float]
 ==== `file_selectors`
diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go
index 5367596935a1..5923e8ce7bb9 100644
--- a/x-pack/filebeat/input/azureblobstorage/config.go
+++ b/x-pack/filebeat/input/azureblobstorage/config.go
@@ -11,6 +11,7 @@ import (
 	"github.com/Azure/azure-sdk-for-go/sdk/azcore"
 
 	"github.com/elastic/beats/v7/libbeat/common/match"
+	"github.com/elastic/beats/v7/libbeat/reader/parser"
 )
 
 // MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can
@@ -25,6 +26,7 @@ type config struct {
 	PollInterval             *time.Duration       `config:"poll_interval"`
 	Containers               []container          `config:"containers" validate:"required"`
 	FileSelectors            []fileSelectorConfig `config:"file_selectors"`
+	ReaderConfig             readerConfig         `config:",inline"`
 	TimeStampEpoch           *int64               `config:"timestamp_epoch"`
 	ExpandEventListFromField string               `config:"expand_event_list_from_field"`
 }
@@ -36,6 +38,7 @@ type container struct {
 	Poll                     *bool                `config:"poll"`
 	PollInterval             *time.Duration       `config:"poll_interval"`
 	FileSelectors            []fileSelectorConfig `config:"file_selectors"`
+	ReaderConfig             readerConfig         `config:",inline"`
 	TimeStampEpoch           *int64               `config:"timestamp_epoch"`
 	ExpandEventListFromField string               `config:"expand_event_list_from_field"`
 }
@@ -46,6 +49,12 @@ type fileSelectorConfig struct {
 	// TODO: Add support for reader config in future
 }
 
+// readerConfig defines the options for reading the content of an azure container.
+type readerConfig struct {
+	Parsers  parser.Config `config:",inline"`
+	Decoding decoderConfig `config:"decoding"`
+}
+
 type authConfig struct {
 	SharedCredentials *sharedKeyConfig        `config:"shared_credentials"`
 	ConnectionString  *connectionStringConfig `config:"connection_string"`
diff --git a/x-pack/filebeat/input/azureblobstorage/decoding.go b/x-pack/filebeat/input/azureblobstorage/decoding.go
new file mode 100644
index 000000000000..77f0a1e8c2c7
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/decoding.go
@@ -0,0 +1,47 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package azureblobstorage
+
+import (
+	"fmt"
+	"io"
+)
+
+// decoder is an interface for decoding data from an io.Reader.
+type decoder interface {
+	// decode reads and decodes data from an io reader based on the codec type.
+	// It returns the decoded data and an error if the data cannot be decoded.
+	decode() ([]byte, error)
+	// next advances the decoder to the next data item and returns true if there is more data to be decoded.
+	next() bool
+	// close closes the decoder and releases any resources associated with it.
+	// It returns an error if the decoder cannot be closed.
+
+	// more returns whether there are more records to read.
+	more() bool
+
+	close() error
+}
+
+// valueDecoder is a decoder that can decode directly to a JSON serialisable value.
+type valueDecoder interface {
+	decoder
+
+	decodeValue() ([]byte, map[string]any, error)
+}
+
+// newDecoder creates a new decoder based on the codec type.
+// It returns a decoder type and an error if the codec type is not supported.
+// If the reader config codec option is not set, it returns a nil decoder and nil error.
+func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) {
+	switch {
+	case cfg.Codec == nil:
+		return nil, nil
+	case cfg.Codec.CSV != nil:
+		return newCSVDecoder(cfg, r)
+	default:
+		return nil, fmt.Errorf("unsupported config value: %v", cfg)
+	}
+}
diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_config.go b/x-pack/filebeat/input/azureblobstorage/decoding_config.go
new file mode 100644
index 000000000000..3f680873bf78
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/decoding_config.go
@@ -0,0 +1,54 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package azureblobstorage
+
+import (
+	"fmt"
+	"unicode/utf8"
+)
+
+// decoderConfig contains the configuration options for instantiating a decoder.
+type decoderConfig struct {
+	Codec *codecConfig `config:"codec"`
+}
+
+// codecConfig contains the configuration options for different codecs used by a decoder.
+type codecConfig struct {
+	CSV *csvCodecConfig `config:"csv"`
+}
+
+// csvCodecConfig contains the configuration options for the CSV codec.
+type csvCodecConfig struct {
+	Enabled bool `config:"enabled"`
+
+	// Fields is the set of field names. If it is present
+	// it is used to specify the object names of returned
+	// values and the FieldsPerRecord field in the csv.Reader.
+	// Otherwise, names are obtained from the first
+	// line of the CSV data.
+	Fields []string `config:"fields_names"`
+
+	// The fields below have the same meaning as the
+	// fields of the same name in csv.Reader.
+	Comma            *configRune `config:"comma"`
+	Comment          configRune  `config:"comment"`
+	LazyQuotes       bool        `config:"lazy_quotes"`
+	TrimLeadingSpace bool        `config:"trim_leading_space"`
+}
+
+type configRune rune
+
+func (r *configRune) Unpack(s string) error {
+	if s == "" {
+		return nil
+	}
+	n := utf8.RuneCountInString(s)
+	if n != 1 {
+		return fmt.Errorf("single character option given more than one character: %q", s)
+	}
+	_r, _ := utf8.DecodeRuneInString(s)
+	*r = configRune(_r)
+	return nil
+}
diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
new file mode 100644
index 000000000000..7409e96f1d55
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
@@ -0,0 +1,139 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package azureblobstorage
+
+import (
+	"bytes"
+	"encoding/csv"
+	"fmt"
+	"io"
+	"slices"
+)
+
+// csvDecoder is a decoder for CSV data.
+type csvDecoder struct {
+	r *csv.Reader
+
+	header  []string
+	current []string
+	coming  []string
+
+	err error
+}
+
+// newParquetDecoder creates a new CSV decoder.
+func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) {
+	d := csvDecoder{r: csv.NewReader(r)}
+	d.r.ReuseRecord = true
+	if config.Codec.CSV.Comma != nil {
+		d.r.Comma = rune(*config.Codec.CSV.Comma)
+	}
+	d.r.Comment = rune(config.Codec.CSV.Comment)
+	d.r.LazyQuotes = config.Codec.CSV.LazyQuotes
+	d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace
+	if len(config.Codec.CSV.Fields) != 0 {
+		d.r.FieldsPerRecord = len(config.Codec.CSV.Fields)
+		d.header = config.Codec.CSV.Fields
+	} else {
+		h, err := d.r.Read()
+		if err != nil {
+			return nil, err
+		}
+		d.header = slices.Clone(h)
+	}
+	var err error
+	d.coming, err = d.r.Read()
+	if err != nil {
+		return nil, err
+	}
+	d.current = make([]string, 0, len(d.header))
+	return &d, nil
+}
+
+func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) }
+
+// next advances the decoder to the next data item and returns true if
+// there is more data to be decoded.
+func (d *csvDecoder) next() bool {
+	if !d.more() && d.err != nil {
+		return false
+	}
+	d.current = d.current[:len(d.header)]
+	copy(d.current, d.coming)
+	d.coming, d.err = d.r.Read()
+	if d.err == io.EOF {
+		d.coming = nil
+	}
+	return true
+}
+
+// decode returns the JSON encoded value of the current CSV line. next must
+// have been called before any calls to decode.
+func (d *csvDecoder) decode() ([]byte, error) {
+	err := d.check()
+	if err != nil {
+		return nil, err
+	}
+	var buf bytes.Buffer
+	buf.WriteByte('{')
+	for i, n := range d.header {
+		if i != 0 {
+			buf.WriteByte(',')
+		}
+		buf.WriteByte('"')
+		buf.WriteString(n)
+		buf.WriteString(`":"`)
+		buf.WriteString(d.current[i])
+		buf.WriteByte('"')
+	}
+	buf.WriteByte('}')
+	d.current = d.current[:0]
+	return buf.Bytes(), nil
+}
+
+// decodeValue returns the value of the current CSV line interpreted as
+// an object with fields based on the header held by the receiver. next must
+// have been called before any calls to decode.
+func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) {
+	err := d.check()
+	if err != nil {
+		return nil, nil, err
+	}
+	m := make(map[string]any, len(d.header))
+	for i, n := range d.header {
+		m[n] = d.current[i]
+	}
+	d.current = d.current[:0]
+	b, err := d.decode()
+	if err != nil {
+		return nil, nil, err
+	}
+	return b, m, nil
+}
+
+func (d *csvDecoder) check() error {
+	if d.err != nil {
+		if d.err == io.EOF && d.coming == nil {
+			return nil
+		}
+		return d.err
+	}
+	if len(d.current) == 0 {
+		return fmt.Errorf("decode called before next")
+	}
+	// By the time we are here, current must be the same
+	// length as header; if it was not read, it would be
+	// zero, but if it was, it must match by the contract
+	// of the csv.Reader.
+	return nil
+}
+
+// close closes the parquet decoder and releases the resources.
+func (d *csvDecoder) close() error {
+	if d.err == io.EOF {
+		return nil
+	}
+	return d.err
+}
diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_test.go b/x-pack/filebeat/input/azureblobstorage/decoding_test.go
new file mode 100644
index 000000000000..e541fc742796
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/decoding_test.go
@@ -0,0 +1,237 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package azureblobstorage
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"os"
+	"path/filepath"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
+	azcontainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
+	"github.com/google/go-cmp/cmp"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/elastic/beats/v7/libbeat/beat"
+	conf "github.com/elastic/elastic-agent-libs/config"
+	"github.com/elastic/elastic-agent-libs/logp"
+)
+
+// all test files are read from the "testdata" directory
+const testDataPath = "testdata"
+
+func TestDecoding(t *testing.T) {
+	logp.TestingSetup()
+	log := logp.L()
+
+	testCases := []struct {
+		name          string
+		file          string
+		content       string
+		contentType   string
+		numEvents     int
+		assertAgainst string
+		config        decoderConfig
+	}{
+		{
+			name:          "gzip_csv",
+			file:          "txn.csv.gz",
+			content:       "text/csv",
+			numEvents:     4,
+			assertAgainst: "txn.json",
+			config: decoderConfig{
+				Codec: &codecConfig{
+					CSV: &csvCodecConfig{
+						Enabled: true,
+						Comma:   ptr[configRune](' '),
+					},
+				},
+			},
+		},
+		{
+			name:          "csv",
+			file:          "txn.csv",
+			content:       "text/csv",
+			numEvents:     4,
+			assertAgainst: "txn.json",
+			config: decoderConfig{
+				Codec: &codecConfig{
+					CSV: &csvCodecConfig{
+						Enabled: true,
+						Comma:   ptr[configRune](' '),
+					},
+				},
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			file := filepath.Join(testDataPath, tc.file)
+			if tc.contentType == "" {
+				tc.contentType = "application/octet-stream"
+			}
+			f, err := os.Open(file)
+			if err != nil {
+				t.Fatalf("failed to open test data: %v", err)
+			}
+			defer f.Close()
+			p := &pub{t: t}
+			item := &azcontainer.BlobItem{
+				Name: ptr("test_blob"),
+				Properties: &azcontainer.BlobProperties{
+					ContentType:  ptr(tc.content),
+					LastModified: &time.Time{},
+				},
+			}
+			j := newJob(&blob.Client{}, item, "https://foo.blob.core.windows.net/", newState(), &Source{}, p, log)
+			j.src.ReaderConfig.Decoding = tc.config
+			err = j.decode(context.Background(), f, "test")
+			if err != nil {
+				t.Errorf("unexpected error calling decode: %v", err)
+			}
+
+			events := p.events
+			if tc.assertAgainst != "" {
+				targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst))
+				assert.Equal(t, len(targetData), len(events))
+
+				for i, event := range events {
+					msg, err := event.Fields.GetValue("message")
+					assert.NoError(t, err)
+					assert.JSONEq(t, targetData[i], msg.(string))
+				}
+			}
+		})
+	}
+}
+
+type pub struct {
+	t      *testing.T
+	events []beat.Event
+}
+
+func (p *pub) Publish(e beat.Event, _cursor interface{}) error {
+	p.t.Logf("%v\n", e.Fields)
+	p.events = append(p.events, e)
+	return nil
+}
+
+// readJSONFromFile reads the json file and returns the data as a slice of strings
+func readJSONFromFile(t *testing.T, filepath string) []string {
+	fileBytes, err := os.ReadFile(filepath)
+	assert.NoError(t, err)
+	var rawMessages []json.RawMessage
+	err = json.Unmarshal(fileBytes, &rawMessages)
+	assert.NoError(t, err)
+	var data []string
+
+	for _, rawMsg := range rawMessages {
+		data = append(data, string(rawMsg))
+	}
+	return data
+}
+
+var codecConfigTests = []struct {
+	name    string
+	yaml    string
+	want    decoderConfig
+	wantErr error
+}{
+	{
+		name: "handle_rune",
+		yaml: `
+codec:
+  csv:
+    enabled: true
+    comma: ' '
+    comment: '#'
+`,
+		want: decoderConfig{&codecConfig{
+			CSV: &csvCodecConfig{
+				Enabled: true,
+				Comma:   ptr[configRune](' '),
+				Comment: '#',
+			},
+		}},
+	},
+	{
+		name: "no_comma",
+		yaml: `
+codec:
+  csv:
+    enabled: true
+`,
+		want: decoderConfig{&codecConfig{
+			CSV: &csvCodecConfig{
+				Enabled: true,
+			},
+		}},
+	},
+	{
+		name: "null_comma",
+		yaml: `
+codec:
+  csv:
+    enabled: true
+    comma: "\u0000"
+`,
+		want: decoderConfig{&codecConfig{
+			CSV: &csvCodecConfig{
+				Enabled: true,
+				Comma:   ptr[configRune]('\x00'),
+			},
+		}},
+	},
+	{
+		name: "bad_rune",
+		yaml: `
+codec:
+  csv:
+    enabled: true
+    comma: 'this is too long'
+`,
+		wantErr: errors.New(`single character option given more than one character: "this is too long" accessing 'codec.csv.comma'`),
+	},
+}
+
+func TestCodecConfig(t *testing.T) {
+	for _, test := range codecConfigTests {
+		t.Run(test.name, func(t *testing.T) {
+			c, err := conf.NewConfigWithYAML([]byte(test.yaml), "")
+			if err != nil {
+				t.Fatalf("unexpected error unmarshaling config: %v", err)
+			}
+
+			var got decoderConfig
+			err = c.Unpack(&got)
+			if !sameError(err, test.wantErr) {
+				t.Errorf("unexpected error unpacking config: got:%v want:%v", err, test.wantErr)
+			}
+
+			if !reflect.DeepEqual(got, test.want) {
+				t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(test.want, got))
+			}
+		})
+	}
+}
+
+func sameError(a, b error) bool {
+	switch {
+	case a == nil && b == nil:
+		return true
+	case a == nil, b == nil:
+		return false
+	default:
+		return a.Error() == b.Error()
+	}
+}
+
+func ptr[T any](v T) *T { return &v }
diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go
index 245daaef6e5b..7c7989ccbce3 100644
--- a/x-pack/filebeat/input/azureblobstorage/input.go
+++ b/x-pack/filebeat/input/azureblobstorage/input.go
@@ -71,6 +71,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
 			TimeStampEpoch:           container.TimeStampEpoch,
 			ExpandEventListFromField: container.ExpandEventListFromField,
 			FileSelectors:            container.FileSelectors,
+			ReaderConfig:             container.ReaderConfig,
 		})
 	}
 
@@ -120,6 +121,7 @@ func tryOverrideOrDefault(cfg config, c container) container {
 	if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 {
 		c.FileSelectors = cfg.FileSelectors
 	}
+	c.ReaderConfig = cfg.ReaderConfig
 	return c
 }
 
diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go
index 73ae14a62e58..8bff050c1123 100644
--- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go
+++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go
@@ -57,6 +57,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
 			TimeStampEpoch:           container.TimeStampEpoch,
 			ExpandEventListFromField: container.ExpandEventListFromField,
 			FileSelectors:            container.FileSelectors,
+			ReaderConfig:             container.ReaderConfig,
 		}
 
 		st := newState()
diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go
index f886f330af6a..ffc8fd04dcd4 100644
--- a/x-pack/filebeat/input/azureblobstorage/job.go
+++ b/x-pack/filebeat/input/azureblobstorage/job.go
@@ -134,20 +134,47 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error {
 		}
 	}()
 
-	err = j.readJsonAndPublish(ctx, reader, id)
-	if err != nil {
-		return fmt.Errorf("failed to read data from blob with error: %w", err)
-	}
-
-	return err
+	return j.decode(ctx, reader, id)
 }
 
-func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error {
+func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
 	r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r))
 	if err != nil {
 		return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err)
 	}
+	dec, err := newDecoder(j.src.ReaderConfig.Decoding, r)
+	if err != nil {
+		return err
+	}
+	var evtOffset int64
+	switch dec := dec.(type) {
+	case decoder:
+		defer dec.close()
+
+		for dec.next() {
+			msg, err := dec.decode()
+			if err != nil {
+				if err == io.EOF {
+					return nil
+				}
+				break
+			}
+			evt := j.createEvent(string(msg), evtOffset)
+			j.publish(evt, !dec.more(), id)
+		}
 
+	default:
+		err = j.readJsonAndPublish(ctx, r, id)
+		if err != nil {
+			return fmt.Errorf("failed to read data from blob with error: %w", err)
+		}
+	}
+
+	return err
+}
+
+func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error {
+	var err error
 	// checks if the root element is an array or not
 	r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r))
 	if err != nil {
@@ -183,22 +210,25 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
 			return err
 		}
 		evt := j.createEvent(string(data), offset)
+		j.publish(evt, !dec.More(), id)
+	}
+	return nil
+}
 
-		if !dec.More() {
-			// if this is the last object, then perform a complete state save
-			cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified)
-			if err := j.publisher.Publish(evt, cp); err != nil {
-				j.log.Errorf(jobErrString, id, err)
-			}
-			done()
-		} else {
-			// since we don't update the cursor checkpoint, lack of a lock here should be fine
-			if err := j.publisher.Publish(evt, nil); err != nil {
-				j.log.Errorf(jobErrString, id, err)
-			}
+func (j *job) publish(evt beat.Event, last bool, id string) {
+	if last {
+		// if this is the last object, then perform a complete state save
+		cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified)
+		if err := j.publisher.Publish(evt, cp); err != nil {
+			j.log.Errorf(jobErrString, id, err)
 		}
+		done()
+		return
+	}
+	// since we don't update the cursor checkpoint, lack of a lock here should be fine
+	if err := j.publisher.Publish(evt, nil); err != nil {
+		j.log.Errorf(jobErrString, id, err)
 	}
-	return nil
 }
 
 // splitEventList splits the event list into individual events and publishes them
diff --git a/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv
new file mode 100644
index 000000000000..80ca65df21ef
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv
@@ -0,0 +1,5 @@
+date time time-taken cs-bytes sc-bytes bytes c-ip s-ip cs-username cs-method cs-uri-scheme cs-uri-query cs-user-agent cs-content-type sc-status sc-content-type cs-dns cs-host cs-uri cs-uri-port cs-referer x-cs-session-id x-cs-access-method x-cs-app x-s-country x-s-latitude x-s-longitude x-s-location x-s-region x-s-zipcode x-c-country x-c-latitude x-c-longitude x-c-location x-c-region x-c-zipcode x-c-os x-c-browser x-c-browser-version x-c-device x-cs-site x-cs-timestamp x-cs-page-id x-cs-userip x-cs-traffic-type x-cs-tunnel-id x-category x-other-category x-type x-server-ssl-err x-client-ssl-err x-transaction-id x-request-id x-cs-sni x-cs-domain-fronted-sni x-category-id x-other-category-id x-sr-headers-name x-sr-headers-value x-cs-ssl-ja3 x-sr-ssl-ja3s x-ssl-bypass x-ssl-bypass-reason x-r-cert-subject-cn x-r-cert-issuer-cn x-r-cert-startdate x-r-cert-enddate x-r-cert-valid x-r-cert-expired x-r-cert-untrusted-root x-r-cert-incomplete-chain x-r-cert-self-signed x-r-cert-revoked x-r-cert-revocation-check x-r-cert-mismatch x-cs-ssl-fronting-error x-cs-ssl-handshake-error x-sr-ssl-handshake-error x-sr-ssl-client-certificate-error x-sr-ssl-malformed-ssl x-s-custom-signing-ca-error x-cs-ssl-engine-action x-cs-ssl-engine-action-reason x-sr-ssl-engine-action x-sr-ssl-engine-action-reason x-ssl-policy-src-ip x-ssl-policy-dst-ip x-ssl-policy-dst-host x-ssl-policy-dst-host-source x-ssl-policy-categories x-ssl-policy-action x-ssl-policy-name x-cs-ssl-version x-cs-ssl-cipher x-sr-ssl-version x-sr-ssl-cipher x-cs-src-ip-egress x-s-dp-name x-cs-src-ip x-cs-src-port x-cs-dst-ip x-cs-dst-port x-sr-src-ip x-sr-src-port x-sr-dst-ip x-sr-dst-port x-cs-ip-connect-xff x-cs-ip-xff x-cs-connect-host x-cs-connect-port x-cs-connect-user-agent x-cs-url x-cs-uri-path x-cs-http-version rs-status x-cs-app-category x-cs-app-cci x-cs-app-ccl x-cs-app-tags x-cs-app-suite x-cs-app-instance-id x-cs-app-instance-name x-cs-app-instance-tag x-cs-app-activity x-cs-app-from-user x-cs-app-to-user x-cs-app-object-type x-cs-app-object-name x-cs-app-object-id x-rs-file-type x-rs-file-category x-rs-file-language x-rs-file-size x-rs-file-md5 x-rs-file-sha256 x-error x-c-local-time x-policy-action x-policy-name x-policy-src-ip x-policy-dst-ip x-policy-dst-host x-policy-dst-host-source x-policy-justification-type x-policy-justification-reason x-sc-notification-name
+2024-08-05 16:24:20 64 2971 2050 5021 10.5.78.159 204.79.197.237 "vikash.ranjan@riverbed.com" GET https cc=US&setlang=en-US "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Cortana 1.14.7.19041; 10.0.0.0.19045.2006) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19045" - 200 "application/json; charset=utf-8" www.bing.com www.bing.com /client/config?cc=US&setlang=en-US 443 - 3683772769278232507 "Client" "Microsoft Bing" "US" 47.682899 -122.120903 "Redmond" "Washington" "N/A" "US" 29.775400 -95.598000 "Houston" "Texas" "77079" "Windows 10" "Edge" "18.19045" "Windows Device" "bing" 1722875060 5762388460300455936 10.5.78.159 CloudApp - "Search Engines" - http_transaction - - 2696581500064586450 2901306739654139904 www.bing.com - 551 - - - 28a2c9bd18a11de089ef85a160da29e4 NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.5.78.159 69.192.139.97 www.bing.com Sni "Search Engines" Decrypt - TLSv1.2 ECDHE-RSA-AES256-GCM-SHA384 NotChecked NotChecked 208.185.23.18 "US-ATL2" 10.5.78.159 25941 69.192.139.97 443 - - 10.144.54.201 842 - - - - - https://www.bing.com/client/config?cc=US&setlang=en-US /client/config HTTP1.1 200 "Search Engines" 58 low "Consumer,Unsanctioned" - - - - "Browse" - - - - - - - - - - - - "2024-08-05 11:24:00" "allow" "NetskopeAllow" 10.5.78.159 204.79.197.237 www.bing.com HttpHostHeader - - -
+2024-08-05 16:24:19 - 18 0 18 10.70.0.19 - "nadav@skyformation.onmicrosoft.com" PRI - - - - - - - us-west1-b-osconfig.googleapis.com * 443 - 0 "Client" - - - - - - - "US" 45.605600 -121.180700 "The Dalles" "Oregon" "97058" - - - - - 1722875059 0 10.70.0.19 - - "Technology" "Cloud Storage" http_transaction - - 2035489204758272484 0 us-west1-b-osconfig.googleapis.com - 564 "7" - - 7a15285d4efc355608b304698cd7f9ab NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.70.0.19 142.250.99.95 us-west1-b-osconfig.googleapis.com Sni "Technology, Cloud Storage" Decrypt - TLSv1.3 TLS_AES_256_GCM_SHA384 NotChecked NotChecked 34.82.190.203 "US-SEA2" 10.70.0.19 32951 142.250.99.95 443 - - - - - - - - - - - HTTP1.1 - - - - - - - - - - - - - - - - - - - - - http-malformed "NotChecked" NotChecked - - - - - - - -
+2024-08-05 16:24:20 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875060 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1350739992944030464 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - -
+2024-08-05 16:24:23 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875063 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1615432978285898071 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - -
diff --git a/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz
new file mode 100644
index 0000000000000000000000000000000000000000..52e8fb20539a40e3127997a8877e3346ea1c6dfc
GIT binary patch
literal 2527
zcmV<52_W_#iwFP!00000|8!Mbn(H<Qe(x%>0bbhgbUzHnwq|6>2XLC4)%VlQk~5hX
z8-672pb3p{=C<+y!!HguQuufp>rJT=<xAl9o2Cyb9Lf@K<3w+4Yn(DF4wP0dc;bE^
zEYEfUaN%HyqZo{XfjxIlcGS?0Z1Ex?g(+zItx>J&Y^wJt84{jZSZ)s>1!Ywi1GuU1
zNW^-&SfG8sJs|mT1e0f`J({y);=;tRj2xl47&0TgH1dzzDdsaYsG?C7T~yJiin=Pg
zsEV3@9}@r55>%t%&Z6zg#CJzrV|G@lR7Ex1_QLMC@R^BJ09OSjIM0qsk_&Q#z`h<v
z<0a;BqOB|=P;V&9odr}quw*a2OI7QD2?j;7vlzm^4^}(<(clU(#SLA6SyJVvhF6p2
z_|=VqslduOu_)w5ydZb<N6U_S_{4vTyfP`riTrr(s8<GrQFD?7h{ZsUk54888Zf7N
zB!|XhhGKc1PKd*F1@7oeDysakJK+IHO>}5Vg2ZTpxj=E-JsSh9=tisTXHd6<QH}6D
ze!T*v^MF>SuP)h~ZfLajROK?bu%t3Z>G+Ca(iOk>6x4M7D5x!w?6}#b;=P{O(9cn}
zRCM)aCJ(i^<%p6Ggs*OJSX^Kzjr9ZEOIYo^UU~-)m(;@^y`v|n<fV7Prqsi4u)Lw(
zqM-4RJgkGTHnU^B;@!!P)&8!r4r`zQ<o1<^wDtq7674{`kV67qgiGhcw2uZ_#FfrV
zwv1_uyV6yXOq^Mg@s7%nD;e)ldPUNv4{5L-^L%0_-Gnu(1JZRgWO!Q2rW4-?2RmBr
z?$vBRb0gZ`oc1P+TdpDGM#gu>a2amuac<zmx}b)E`s%%aJ=Yi5aOr4jg1fW9m09AJ
z108&bFF-7dsxS2cyjrbbSw;%8^W3gAcF0D9Jci}KWjv+s&&Y1muTi+--@pF7J^Bpi
z7trTt>h}27@z-vD9l_V}`#yR<j$IXe9$Gh1n)e#-TUp-)1R)NB<lA5W2LJ&7|82Vh
zz%~g003VA800000000A>RBdnDHW2<kVE^Hu4@C=96iJaHnIPCy$B2O@K~Xu#$AXq<
zTa7{*l45uJ^*h?>YPaiJ6tYDfd9TlNM^3p&sZOZ`R-AEh#wip6xKRu^l@ui93>YPn
zC{36&xGRV<gc(J6t{~aA4|Z6S-ujQ$|Jk=&*DqW{>hPGr)pQQ)aoh}0*B5uyi@}YZ
z^{Wf#)4K|ioA7trbvBcP!pr;CH(@ux?L3B~=YSi9IEPR1H3|J_y#+=XM#tb(u=Bm%
z{)w_AoYLYIl$%ZG?%m>TJ7zLhBro9Q+u8i)`ZaXzgM%wqKZI8>S@+@5Wh$EIGRN8|
zhsrK(-+nfmHY=Ageej+@iltLXEc$P2JGQ}RABW)2p<de_E4z3am#I!*x7(3L>sJwi
zQz5In*7-53gI~6*KmH37A#x0q7dlsptHN-ldCnz8peK9d1Yv9IJ`7<w!W;CA8+TO#
zLXkpqZ49K0bHX?^G>7E9YaWAdQ0X37LiI6t6mGNf*^V2cloVK5Y9x_H(^%vzJPilU
zx%*@XT$Q5AL^sbgF^WP&2$wGy{d;`rwrw5H7W)*L;#@093v4J=@LX$A(43-;G>G(Z
ze@(jZ)F6onLsB{0*K3&iRqNdlDTv+lVZ>$|Y#k}UE###zh15)9>V=RR{}k(@ET@Ia
z@kp@TV1B1$q#z~Ro1+75d2JRA)0VNuQEl8(OUnw{SZ<tv+c1_}+je%*ITT`UxHVa0
zFS!Wr&k0|?fA#bG?hd~ndV;d+!VZ}mMy#bB)(1!4?OmVj9+OhUfyb`O4KeBz$I7?g
z5_Re7{;$o5IL)uCEh8MJlgrsOeP5MnIjy)X(yPf$TFuH_i|-`MDaO_~);TV*S<`ZU
z%}?{<(g^ldc83v4qaG82NP!mv11<P7+39l|4QE+)T**&N=cx+J=JR)WBOUIAuOW~c
z_DD~HAD$jv|N0Jl*6*7(qWd|M<V}zB;*z7}PM?GyUC}IF(KL>pjXcKwa^vt2Hm*F#
z|8rBHur@;gW?>j-&Ni+;g!=ure*h9uW=ReP001A02mk;800003ol{MZ+aM6VSL%Nl
zajXgg3^wekRyOKZ>c^^f&la)qht@HXVYAA=FHR24M%gy1#<GWjd3uj|=9CLe6{Zqc
zddI~b(|~~~pv#L<l2Jm*v4|$qnEfaJbZFhYHDPcQanp9FXXjlTNbR-=9)Es`N6+Kg
z;}`1#!yD{;?WT6<NatKPTC*Gc(ghzth$J4RfL7yRr+_Xn(60i4kR+utr4%sZj4(yB
zn4_|{aBs%ZdIadlY&$o_L}yg0a~PI!t};obUZkDtitwea`^k;2JD`Pvdv2fzZZ@4o
z(1)<A!fYmQ>P7RhgVRI`r8yOuR6OHCi3odxR17i|fU*;{%rME7Y=mv=L`H3DlTeXr
zRX16y%?7@^knhbfn$2kAV&NJ~tKQa6wn4Wp{^b9gz72PJ{)q(nIJy@=YagPP!S^Si
z$sVC!RnLaQ1SeclqBYU-mg_C1;e47O;bNHgww@2W5bv@4TI?A~mT#4>i;By%`n>v9
z6%Tn5hk0E`B8cLV7$wmFiz{)l&iO0K3D=Tc5I?#5T0E5HW3=)cyVqi|Bi@>E`3L_p
zn2Wae?|uVN)}`m^0{{RYiwFP!000001MO2yZxb;Py(94-ET7=8mcQ0(=R^rXMW9ro
z6vQd=I!?2!v7@y&DF43RO%SwAqzDd3sG7^nc>LbXXdcCs7g!d!1Vl_IpHK`6fJY-3
zVMH+^gybA_c58jIXr=wFYhIZUL)=AV!grXT@0&JjYg*eC+POx1V<WOXx(%@n8#{c-
zHJr1N3Qk><fgggvk5>?Ifhti}7z4^ssW`)9egCbP>rF5k?jtY_y-&Lwaf~soe#ase
zrLroqcs$bH)+ucksnb4clbYZM?#uXAH?Hq2cwxIPbQ8j{;+)9I_4NnPg+4Xj#z~Bh
zEjp{0i+Y!GoLt-zpq=d!=zZ(M4-brf2?1dxu9#AaW)#dY7d&@!&X#^!ZK%?kQ?W8i
z8C+XZtJ<pEl29utHJFEVvCU2E+SwcoQ{>ciWLUy;gJ1rix~Jj5R~yfP0<T>#Er7$S
zXT$dMEQ5ck{W7j~Yxmg!-P@6G7E>O(iR*!LzptCa`U~*Jnr^pAhx~C;Nf1#2ky4<N
z0eJDL`2220;Jte(E2%2La-|fAGKCaB-0(5GnZKW3eVLzlJ!WQ<L6L9j<PINvdH#$(
pk}vyT<;(u1d<7{vqpFfrmQqPvktg{+$@j12dk3gT%Y1$Z007o~;T`}0

literal 0
HcmV?d00001

diff --git a/x-pack/filebeat/input/azureblobstorage/testdata/txn.json b/x-pack/filebeat/input/azureblobstorage/testdata/txn.json
new file mode 100644
index 000000000000..57e92b1c7601
--- /dev/null
+++ b/x-pack/filebeat/input/azureblobstorage/testdata/txn.json
@@ -0,0 +1,594 @@
+[
+    {
+        "bytes": "5021",
+        "c-ip": "10.5.78.159",
+        "cs-bytes": "2971",
+        "cs-content-type": "-",
+        "cs-dns": "www.bing.com",
+        "cs-host": "www.bing.com",
+        "cs-method": "GET",
+        "cs-referer": "-",
+        "cs-uri": "/client/config?cc=US&setlang=en-US",
+        "cs-uri-port": "443",
+        "cs-uri-query": "cc=US&setlang=en-US",
+        "cs-uri-scheme": "https",
+        "cs-user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Cortana 1.14.7.19041; 10.0.0.0.19045.2006) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19045",
+        "cs-username": "vikash.ranjan@riverbed.com",
+        "date": "2024-08-05",
+        "rs-status": "200",
+        "s-ip": "204.79.197.237",
+        "sc-bytes": "2050",
+        "sc-content-type": "application/json; charset=utf-8",
+        "sc-status": "200",
+        "time": "16:24:20",
+        "time-taken": "64",
+        "x-c-browser": "Edge",
+        "x-c-browser-version": "18.19045",
+        "x-c-country": "US",
+        "x-c-device": "Windows Device",
+        "x-c-latitude": "29.775400",
+        "x-c-local-time": "2024-08-05 11:24:00",
+        "x-c-location": "Houston",
+        "x-c-longitude": "-95.598000",
+        "x-c-os": "Windows 10",
+        "x-c-region": "Texas",
+        "x-c-zipcode": "77079",
+        "x-category": "Search Engines",
+        "x-category-id": "551",
+        "x-client-ssl-err": "-",
+        "x-cs-access-method": "Client",
+        "x-cs-app": "Microsoft Bing",
+        "x-cs-app-activity": "Browse",
+        "x-cs-app-category": "Search Engines",
+        "x-cs-app-cci": "58",
+        "x-cs-app-ccl": "low",
+        "x-cs-app-from-user": "-",
+        "x-cs-app-instance-id": "-",
+        "x-cs-app-instance-name": "-",
+        "x-cs-app-instance-tag": "-",
+        "x-cs-app-object-id": "-",
+        "x-cs-app-object-name": "-",
+        "x-cs-app-object-type": "-",
+        "x-cs-app-suite": "-",
+        "x-cs-app-tags": "Consumer,Unsanctioned",
+        "x-cs-app-to-user": "-",
+        "x-cs-connect-host": "-",
+        "x-cs-connect-port": "-",
+        "x-cs-connect-user-agent": "-",
+        "x-cs-domain-fronted-sni": "-",
+        "x-cs-dst-ip": "69.192.139.97",
+        "x-cs-dst-port": "443",
+        "x-cs-http-version": "HTTP1.1",
+        "x-cs-ip-connect-xff": "-",
+        "x-cs-ip-xff": "-",
+        "x-cs-page-id": "5762388460300455936",
+        "x-cs-session-id": "3683772769278232507",
+        "x-cs-site": "bing",
+        "x-cs-sni": "www.bing.com",
+        "x-cs-src-ip": "10.5.78.159",
+        "x-cs-src-ip-egress": "208.185.23.18",
+        "x-cs-src-port": "25941",
+        "x-cs-ssl-cipher": "ECDHE-RSA-AES256-GCM-SHA384",
+        "x-cs-ssl-engine-action": "Allow",
+        "x-cs-ssl-engine-action-reason": "Established",
+        "x-cs-ssl-fronting-error": "No",
+        "x-cs-ssl-handshake-error": "No",
+        "x-cs-ssl-ja3": "28a2c9bd18a11de089ef85a160da29e4",
+        "x-cs-ssl-version": "TLSv1.2",
+        "x-cs-timestamp": "1722875060",
+        "x-cs-traffic-type": "CloudApp",
+        "x-cs-tunnel-id": "-",
+        "x-cs-uri-path": "/client/config",
+        "x-cs-url": "https://www.bing.com/client/config?cc=US&setlang=en-US",
+        "x-cs-userip": "10.5.78.159",
+        "x-error": "-",
+        "x-other-category": "-",
+        "x-other-category-id": "-",
+        "x-policy-action": "allow",
+        "x-policy-dst-host": "www.bing.com",
+        "x-policy-dst-host-source": "HttpHostHeader",
+        "x-policy-dst-ip": "204.79.197.237",
+        "x-policy-justification-reason": "-",
+        "x-policy-justification-type": "-",
+        "x-policy-name": "NetskopeAllow",
+        "x-policy-src-ip": "10.5.78.159",
+        "x-r-cert-enddate": "NotChecked",
+        "x-r-cert-expired": "NotChecked",
+        "x-r-cert-incomplete-chain": "NotChecked",
+        "x-r-cert-issuer-cn": "NotChecked",
+        "x-r-cert-mismatch": "NotChecked",
+        "x-r-cert-revocation-check": "NotChecked",
+        "x-r-cert-revoked": "NotChecked",
+        "x-r-cert-self-signed": "NotChecked",
+        "x-r-cert-startdate": "NotChecked",
+        "x-r-cert-subject-cn": "NotChecked",
+        "x-r-cert-untrusted-root": "NotChecked",
+        "x-r-cert-valid": "NotChecked",
+        "x-request-id": "2901306739654139904",
+        "x-rs-file-category": "-",
+        "x-rs-file-language": "-",
+        "x-rs-file-md5": "-",
+        "x-rs-file-sha256": "-",
+        "x-rs-file-size": "-",
+        "x-rs-file-type": "-",
+        "x-s-country": "US",
+        "x-s-custom-signing-ca-error": "No",
+        "x-s-dp-name": "US-ATL2",
+        "x-s-latitude": "47.682899",
+        "x-s-location": "Redmond",
+        "x-s-longitude": "-122.120903",
+        "x-s-region": "Washington",
+        "x-s-zipcode": "N/A",
+        "x-sc-notification-name": "-",
+        "x-server-ssl-err": "-",
+        "x-sr-dst-ip": "10.144.54.201",
+        "x-sr-dst-port": "842",
+        "x-sr-headers-name": "-",
+        "x-sr-headers-value": "-",
+        "x-sr-src-ip": "-",
+        "x-sr-src-port": "-",
+        "x-sr-ssl-cipher": "NotChecked",
+        "x-sr-ssl-client-certificate-error": "NotChecked",
+        "x-sr-ssl-engine-action": "None",
+        "x-sr-ssl-engine-action-reason": "NotEstablished",
+        "x-sr-ssl-handshake-error": "NotChecked",
+        "x-sr-ssl-ja3s": "NotAvailable",
+        "x-sr-ssl-malformed-ssl": "NotChecked",
+        "x-sr-ssl-version": "NotChecked",
+        "x-ssl-bypass": "No",
+        "x-ssl-bypass-reason": "-",
+        "x-ssl-policy-action": "Decrypt",
+        "x-ssl-policy-categories": "Search Engines",
+        "x-ssl-policy-dst-host": "www.bing.com",
+        "x-ssl-policy-dst-host-source": "Sni",
+        "x-ssl-policy-dst-ip": "69.192.139.97",
+        "x-ssl-policy-name": "-",
+        "x-ssl-policy-src-ip": "10.5.78.159",
+        "x-transaction-id": "2696581500064586450",
+        "x-type": "http_transaction"
+    },
+    {
+        "bytes": "18",
+        "c-ip": "10.70.0.19",
+        "cs-bytes": "18",
+        "cs-content-type": "-",
+        "cs-dns": "-",
+        "cs-host": "us-west1-b-osconfig.googleapis.com",
+        "cs-method": "PRI",
+        "cs-referer": "-",
+        "cs-uri": "*",
+        "cs-uri-port": "443",
+        "cs-uri-query": "-",
+        "cs-uri-scheme": "-",
+        "cs-user-agent": "-",
+        "cs-username": "nadav@skyformation.onmicrosoft.com",
+        "date": "2024-08-05",
+        "rs-status": "-",
+        "s-ip": "-",
+        "sc-bytes": "0",
+        "sc-content-type": "-",
+        "sc-status": "-",
+        "time": "16:24:19",
+        "time-taken": "-",
+        "x-c-browser": "-",
+        "x-c-browser-version": "-",
+        "x-c-country": "US",
+        "x-c-device": "-",
+        "x-c-latitude": "45.605600",
+        "x-c-local-time": "NotChecked",
+        "x-c-location": "The Dalles",
+        "x-c-longitude": "-121.180700",
+        "x-c-os": "-",
+        "x-c-region": "Oregon",
+        "x-c-zipcode": "97058",
+        "x-category": "Technology",
+        "x-category-id": "564",
+        "x-client-ssl-err": "-",
+        "x-cs-access-method": "Client",
+        "x-cs-app": "-",
+        "x-cs-app-activity": "-",
+        "x-cs-app-category": "-",
+        "x-cs-app-cci": "-",
+        "x-cs-app-ccl": "-",
+        "x-cs-app-from-user": "-",
+        "x-cs-app-instance-id": "-",
+        "x-cs-app-instance-name": "-",
+        "x-cs-app-instance-tag": "-",
+        "x-cs-app-object-id": "-",
+        "x-cs-app-object-name": "-",
+        "x-cs-app-object-type": "-",
+        "x-cs-app-suite": "-",
+        "x-cs-app-tags": "-",
+        "x-cs-app-to-user": "-",
+        "x-cs-connect-host": "-",
+        "x-cs-connect-port": "-",
+        "x-cs-connect-user-agent": "-",
+        "x-cs-domain-fronted-sni": "-",
+        "x-cs-dst-ip": "142.250.99.95",
+        "x-cs-dst-port": "443",
+        "x-cs-http-version": "HTTP1.1",
+        "x-cs-ip-connect-xff": "-",
+        "x-cs-ip-xff": "-",
+        "x-cs-page-id": "0",
+        "x-cs-session-id": "0",
+        "x-cs-site": "-",
+        "x-cs-sni": "us-west1-b-osconfig.googleapis.com",
+        "x-cs-src-ip": "10.70.0.19",
+        "x-cs-src-ip-egress": "34.82.190.203",
+        "x-cs-src-port": "32951",
+        "x-cs-ssl-cipher": "TLS_AES_256_GCM_SHA384",
+        "x-cs-ssl-engine-action": "Allow",
+        "x-cs-ssl-engine-action-reason": "Established",
+        "x-cs-ssl-fronting-error": "No",
+        "x-cs-ssl-handshake-error": "No",
+        "x-cs-ssl-ja3": "7a15285d4efc355608b304698cd7f9ab",
+        "x-cs-ssl-version": "TLSv1.3",
+        "x-cs-timestamp": "1722875059",
+        "x-cs-traffic-type": "-",
+        "x-cs-tunnel-id": "-",
+        "x-cs-uri-path": "-",
+        "x-cs-url": "-",
+        "x-cs-userip": "10.70.0.19",
+        "x-error": "http-malformed",
+        "x-other-category": "Cloud Storage",
+        "x-other-category-id": "7",
+        "x-policy-action": "NotChecked",
+        "x-policy-dst-host": "-",
+        "x-policy-dst-host-source": "-",
+        "x-policy-dst-ip": "-",
+        "x-policy-justification-reason": "-",
+        "x-policy-justification-type": "-",
+        "x-policy-name": "-",
+        "x-policy-src-ip": "-",
+        "x-r-cert-enddate": "NotChecked",
+        "x-r-cert-expired": "NotChecked",
+        "x-r-cert-incomplete-chain": "NotChecked",
+        "x-r-cert-issuer-cn": "NotChecked",
+        "x-r-cert-mismatch": "NotChecked",
+        "x-r-cert-revocation-check": "NotChecked",
+        "x-r-cert-revoked": "NotChecked",
+        "x-r-cert-self-signed": "NotChecked",
+        "x-r-cert-startdate": "NotChecked",
+        "x-r-cert-subject-cn": "NotChecked",
+        "x-r-cert-untrusted-root": "NotChecked",
+        "x-r-cert-valid": "NotChecked",
+        "x-request-id": "0",
+        "x-rs-file-category": "-",
+        "x-rs-file-language": "-",
+        "x-rs-file-md5": "-",
+        "x-rs-file-sha256": "-",
+        "x-rs-file-size": "-",
+        "x-rs-file-type": "-",
+        "x-s-country": "-",
+        "x-s-custom-signing-ca-error": "No",
+        "x-s-dp-name": "US-SEA2",
+        "x-s-latitude": "-",
+        "x-s-location": "-",
+        "x-s-longitude": "-",
+        "x-s-region": "-",
+        "x-s-zipcode": "-",
+        "x-sc-notification-name": "-",
+        "x-server-ssl-err": "-",
+        "x-sr-dst-ip": "-",
+        "x-sr-dst-port": "-",
+        "x-sr-headers-name": "-",
+        "x-sr-headers-value": "-",
+        "x-sr-src-ip": "-",
+        "x-sr-src-port": "-",
+        "x-sr-ssl-cipher": "NotChecked",
+        "x-sr-ssl-client-certificate-error": "NotChecked",
+        "x-sr-ssl-engine-action": "None",
+        "x-sr-ssl-engine-action-reason": "NotEstablished",
+        "x-sr-ssl-handshake-error": "NotChecked",
+        "x-sr-ssl-ja3s": "NotAvailable",
+        "x-sr-ssl-malformed-ssl": "NotChecked",
+        "x-sr-ssl-version": "NotChecked",
+        "x-ssl-bypass": "No",
+        "x-ssl-bypass-reason": "-",
+        "x-ssl-policy-action": "Decrypt",
+        "x-ssl-policy-categories": "Technology, Cloud Storage",
+        "x-ssl-policy-dst-host": "us-west1-b-osconfig.googleapis.com",
+        "x-ssl-policy-dst-host-source": "Sni",
+        "x-ssl-policy-dst-ip": "142.250.99.95",
+        "x-ssl-policy-name": "-",
+        "x-ssl-policy-src-ip": "10.70.0.19",
+        "x-transaction-id": "2035489204758272484",
+        "x-type": "http_transaction"
+    },
+    {
+        "bytes": "0",
+        "c-ip": "10.0.20.111",
+        "cs-bytes": "0",
+        "cs-content-type": "-",
+        "cs-dns": "-",
+        "cs-host": "achecker-alliances.eu.goskope.com",
+        "cs-method": "-",
+        "cs-referer": "-",
+        "cs-uri": "-",
+        "cs-uri-port": "443",
+        "cs-uri-query": "-",
+        "cs-uri-scheme": "-",
+        "cs-user-agent": "-",
+        "cs-username": "levente.fangli@cososys.com",
+        "date": "2024-08-05",
+        "rs-status": "-",
+        "s-ip": "-",
+        "sc-bytes": "0",
+        "sc-content-type": "-",
+        "sc-status": "-",
+        "time": "16:24:20",
+        "time-taken": "-",
+        "x-c-browser": "-",
+        "x-c-browser-version": "-",
+        "x-c-country": "RO",
+        "x-c-device": "-",
+        "x-c-latitude": "46.765700",
+        "x-c-local-time": "NotChecked",
+        "x-c-location": "Cluj-Napoca",
+        "x-c-longitude": "23.594300",
+        "x-c-os": "-",
+        "x-c-region": "Cluj County",
+        "x-c-zipcode": "400027",
+        "x-category": "-",
+        "x-category-id": "-",
+        "x-client-ssl-err": "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)",
+        "x-cs-access-method": "Client",
+        "x-cs-app": "-",
+        "x-cs-app-activity": "-",
+        "x-cs-app-category": "-",
+        "x-cs-app-cci": "-",
+        "x-cs-app-ccl": "-",
+        "x-cs-app-from-user": "-",
+        "x-cs-app-instance-id": "-",
+        "x-cs-app-instance-name": "-",
+        "x-cs-app-instance-tag": "-",
+        "x-cs-app-object-id": "-",
+        "x-cs-app-object-name": "-",
+        "x-cs-app-object-type": "-",
+        "x-cs-app-suite": "-",
+        "x-cs-app-tags": "-",
+        "x-cs-app-to-user": "-",
+        "x-cs-connect-host": "-",
+        "x-cs-connect-port": "-",
+        "x-cs-connect-user-agent": "-",
+        "x-cs-domain-fronted-sni": "-",
+        "x-cs-dst-ip": "31.186.239.94",
+        "x-cs-dst-port": "443",
+        "x-cs-http-version": "UNKNOWN",
+        "x-cs-ip-connect-xff": "-",
+        "x-cs-ip-xff": "-",
+        "x-cs-page-id": "0",
+        "x-cs-session-id": "0",
+        "x-cs-site": "-",
+        "x-cs-sni": "achecker-alliances.eu.goskope.com",
+        "x-cs-src-ip": "10.0.20.111",
+        "x-cs-src-ip-egress": "81.196.156.53",
+        "x-cs-src-port": "57897",
+        "x-cs-ssl-cipher": "-",
+        "x-cs-ssl-engine-action": "Block",
+        "x-cs-ssl-engine-action-reason": "SSL Error - SSL Handshake Error",
+        "x-cs-ssl-fronting-error": "No",
+        "x-cs-ssl-handshake-error": "Yes",
+        "x-cs-ssl-ja3": "bc29aa426fc99c0be1b9be941869f88a",
+        "x-cs-ssl-version": "-",
+        "x-cs-timestamp": "1722875060",
+        "x-cs-traffic-type": "-",
+        "x-cs-tunnel-id": "-",
+        "x-cs-uri-path": "-",
+        "x-cs-url": "-",
+        "x-cs-userip": "10.0.20.111",
+        "x-error": "client-ssl",
+        "x-other-category": "-",
+        "x-other-category-id": "-",
+        "x-policy-action": "NotChecked",
+        "x-policy-dst-host": "-",
+        "x-policy-dst-host-source": "-",
+        "x-policy-dst-ip": "-",
+        "x-policy-justification-reason": "-",
+        "x-policy-justification-type": "-",
+        "x-policy-name": "-",
+        "x-policy-src-ip": "-",
+        "x-r-cert-enddate": "NotChecked",
+        "x-r-cert-expired": "NotChecked",
+        "x-r-cert-incomplete-chain": "NotChecked",
+        "x-r-cert-issuer-cn": "NotChecked",
+        "x-r-cert-mismatch": "NotChecked",
+        "x-r-cert-revocation-check": "NotChecked",
+        "x-r-cert-revoked": "NotChecked",
+        "x-r-cert-self-signed": "NotChecked",
+        "x-r-cert-startdate": "NotChecked",
+        "x-r-cert-subject-cn": "NotChecked",
+        "x-r-cert-untrusted-root": "NotChecked",
+        "x-r-cert-valid": "NotChecked",
+        "x-request-id": "0",
+        "x-rs-file-category": "-",
+        "x-rs-file-language": "-",
+        "x-rs-file-md5": "-",
+        "x-rs-file-sha256": "-",
+        "x-rs-file-size": "-",
+        "x-rs-file-type": "-",
+        "x-s-country": "-",
+        "x-s-custom-signing-ca-error": "No",
+        "x-s-dp-name": "AT-VIE1",
+        "x-s-latitude": "-",
+        "x-s-location": "-",
+        "x-s-longitude": "-",
+        "x-s-region": "-",
+        "x-s-zipcode": "-",
+        "x-sc-notification-name": "-",
+        "x-server-ssl-err": "-",
+        "x-sr-dst-ip": "-",
+        "x-sr-dst-port": "-",
+        "x-sr-headers-name": "-",
+        "x-sr-headers-value": "-",
+        "x-sr-src-ip": "-",
+        "x-sr-src-port": "-",
+        "x-sr-ssl-cipher": "NotChecked",
+        "x-sr-ssl-client-certificate-error": "NotChecked",
+        "x-sr-ssl-engine-action": "None",
+        "x-sr-ssl-engine-action-reason": "NotEstablished",
+        "x-sr-ssl-handshake-error": "NotChecked",
+        "x-sr-ssl-ja3s": "NotAvailable",
+        "x-sr-ssl-malformed-ssl": "NotChecked",
+        "x-sr-ssl-version": "NotChecked",
+        "x-ssl-bypass": "No",
+        "x-ssl-bypass-reason": "-",
+        "x-ssl-policy-action": "Decrypt",
+        "x-ssl-policy-categories": "-",
+        "x-ssl-policy-dst-host": "-",
+        "x-ssl-policy-dst-host-source": "Unknown",
+        "x-ssl-policy-dst-ip": "-",
+        "x-ssl-policy-name": "-",
+        "x-ssl-policy-src-ip": "-",
+        "x-transaction-id": "1350739992944030464",
+        "x-type": "http_transaction"
+    },
+    {
+        "bytes": "0",
+        "c-ip": "10.0.20.111",
+        "cs-bytes": "0",
+        "cs-content-type": "-",
+        "cs-dns": "-",
+        "cs-host": "achecker-alliances.eu.goskope.com",
+        "cs-method": "-",
+        "cs-referer": "-",
+        "cs-uri": "-",
+        "cs-uri-port": "443",
+        "cs-uri-query": "-",
+        "cs-uri-scheme": "-",
+        "cs-user-agent": "-",
+        "cs-username": "levente.fangli@cososys.com",
+        "date": "2024-08-05",
+        "rs-status": "-",
+        "s-ip": "-",
+        "sc-bytes": "0",
+        "sc-content-type": "-",
+        "sc-status": "-",
+        "time": "16:24:23",
+        "time-taken": "-",
+        "x-c-browser": "-",
+        "x-c-browser-version": "-",
+        "x-c-country": "RO",
+        "x-c-device": "-",
+        "x-c-latitude": "46.765700",
+        "x-c-local-time": "NotChecked",
+        "x-c-location": "Cluj-Napoca",
+        "x-c-longitude": "23.594300",
+        "x-c-os": "-",
+        "x-c-region": "Cluj County",
+        "x-c-zipcode": "400027",
+        "x-category": "-",
+        "x-category-id": "-",
+        "x-client-ssl-err": "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)",
+        "x-cs-access-method": "Client",
+        "x-cs-app": "-",
+        "x-cs-app-activity": "-",
+        "x-cs-app-category": "-",
+        "x-cs-app-cci": "-",
+        "x-cs-app-ccl": "-",
+        "x-cs-app-from-user": "-",
+        "x-cs-app-instance-id": "-",
+        "x-cs-app-instance-name": "-",
+        "x-cs-app-instance-tag": "-",
+        "x-cs-app-object-id": "-",
+        "x-cs-app-object-name": "-",
+        "x-cs-app-object-type": "-",
+        "x-cs-app-suite": "-",
+        "x-cs-app-tags": "-",
+        "x-cs-app-to-user": "-",
+        "x-cs-connect-host": "-",
+        "x-cs-connect-port": "-",
+        "x-cs-connect-user-agent": "-",
+        "x-cs-domain-fronted-sni": "-",
+        "x-cs-dst-ip": "31.186.239.94",
+        "x-cs-dst-port": "443",
+        "x-cs-http-version": "UNKNOWN",
+        "x-cs-ip-connect-xff": "-",
+        "x-cs-ip-xff": "-",
+        "x-cs-page-id": "0",
+        "x-cs-session-id": "0",
+        "x-cs-site": "-",
+        "x-cs-sni": "achecker-alliances.eu.goskope.com",
+        "x-cs-src-ip": "10.0.20.111",
+        "x-cs-src-ip-egress": "81.196.156.53",
+        "x-cs-src-port": "57897",
+        "x-cs-ssl-cipher": "-",
+        "x-cs-ssl-engine-action": "Block",
+        "x-cs-ssl-engine-action-reason": "SSL Error - SSL Handshake Error",
+        "x-cs-ssl-fronting-error": "No",
+        "x-cs-ssl-handshake-error": "Yes",
+        "x-cs-ssl-ja3": "bc29aa426fc99c0be1b9be941869f88a",
+        "x-cs-ssl-version": "-",
+        "x-cs-timestamp": "1722875063",
+        "x-cs-traffic-type": "-",
+        "x-cs-tunnel-id": "-",
+        "x-cs-uri-path": "-",
+        "x-cs-url": "-",
+        "x-cs-userip": "10.0.20.111",
+        "x-error": "client-ssl",
+        "x-other-category": "-",
+        "x-other-category-id": "-",
+        "x-policy-action": "NotChecked",
+        "x-policy-dst-host": "-",
+        "x-policy-dst-host-source": "-",
+        "x-policy-dst-ip": "-",
+        "x-policy-justification-reason": "-",
+        "x-policy-justification-type": "-",
+        "x-policy-name": "-",
+        "x-policy-src-ip": "-",
+        "x-r-cert-enddate": "NotChecked",
+        "x-r-cert-expired": "NotChecked",
+        "x-r-cert-incomplete-chain": "NotChecked",
+        "x-r-cert-issuer-cn": "NotChecked",
+        "x-r-cert-mismatch": "NotChecked",
+        "x-r-cert-revocation-check": "NotChecked",
+        "x-r-cert-revoked": "NotChecked",
+        "x-r-cert-self-signed": "NotChecked",
+        "x-r-cert-startdate": "NotChecked",
+        "x-r-cert-subject-cn": "NotChecked",
+        "x-r-cert-untrusted-root": "NotChecked",
+        "x-r-cert-valid": "NotChecked",
+        "x-request-id": "0",
+        "x-rs-file-category": "-",
+        "x-rs-file-language": "-",
+        "x-rs-file-md5": "-",
+        "x-rs-file-sha256": "-",
+        "x-rs-file-size": "-",
+        "x-rs-file-type": "-",
+        "x-s-country": "-",
+        "x-s-custom-signing-ca-error": "No",
+        "x-s-dp-name": "AT-VIE1",
+        "x-s-latitude": "-",
+        "x-s-location": "-",
+        "x-s-longitude": "-",
+        "x-s-region": "-",
+        "x-s-zipcode": "-",
+        "x-sc-notification-name": "-",
+        "x-server-ssl-err": "-",
+        "x-sr-dst-ip": "-",
+        "x-sr-dst-port": "-",
+        "x-sr-headers-name": "-",
+        "x-sr-headers-value": "-",
+        "x-sr-src-ip": "-",
+        "x-sr-src-port": "-",
+        "x-sr-ssl-cipher": "NotChecked",
+        "x-sr-ssl-client-certificate-error": "NotChecked",
+        "x-sr-ssl-engine-action": "None",
+        "x-sr-ssl-engine-action-reason": "NotEstablished",
+        "x-sr-ssl-handshake-error": "NotChecked",
+        "x-sr-ssl-ja3s": "NotAvailable",
+        "x-sr-ssl-malformed-ssl": "NotChecked",
+        "x-sr-ssl-version": "NotChecked",
+        "x-ssl-bypass": "No",
+        "x-ssl-bypass-reason": "-",
+        "x-ssl-policy-action": "Decrypt",
+        "x-ssl-policy-categories": "-",
+        "x-ssl-policy-dst-host": "-",
+        "x-ssl-policy-dst-host-source": "Unknown",
+        "x-ssl-policy-dst-ip": "-",
+        "x-ssl-policy-name": "-",
+        "x-ssl-policy-src-ip": "-",
+        "x-transaction-id": "1615432978285898071",
+        "x-type": "http_transaction"
+    }
+]
diff --git a/x-pack/filebeat/input/azureblobstorage/types.go b/x-pack/filebeat/input/azureblobstorage/types.go
index 9b2b0a4eca72..5654ba54f645 100644
--- a/x-pack/filebeat/input/azureblobstorage/types.go
+++ b/x-pack/filebeat/input/azureblobstorage/types.go
@@ -21,6 +21,7 @@ type Source struct {
 	PollInterval             time.Duration
 	TimeStampEpoch           *int64
 	FileSelectors            []fileSelectorConfig
+	ReaderConfig             readerConfig
 	ExpandEventListFromField string
 }
 

From 78f1610fab78ad712c9d8d8b25ef63f1a147cb30 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Tue, 8 Oct 2024 07:02:55 +1030
Subject: [PATCH 2/3] address pr comment

---
 x-pack/filebeat/input/azureblobstorage/decoding_csv.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
index 7409e96f1d55..ecde1017fb2c 100644
--- a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
+++ b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
@@ -23,7 +23,7 @@ type csvDecoder struct {
 	err error
 }
 
-// newParquetDecoder creates a new CSV decoder.
+// newCSVDecoder creates a new CSV decoder.
 func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) {
 	d := csvDecoder{r: csv.NewReader(r)}
 	d.r.ReuseRecord = true

From 2dbe691021ec1bcc554c32685ed4f01db2d93d58 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Tue, 8 Oct 2024 13:19:28 +1030
Subject: [PATCH 3/3] address pr comment

---
 x-pack/filebeat/input/azureblobstorage/decoding_csv.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
index ecde1017fb2c..b28be94c5232 100644
--- a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
+++ b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go
@@ -130,7 +130,7 @@ func (d *csvDecoder) check() error {
 	return nil
 }
 
-// close closes the parquet decoder and releases the resources.
+// close closes the csv decoder and releases the resources.
 func (d *csvDecoder) close() error {
 	if d.err == io.EOF {
 		return nil