From cc8632e4b1b102c8360ea28599cc5b921571da94 Mon Sep 17 00:00:00 2001 From: Frederic BIDON Date: Wed, 13 Dec 2023 10:25:54 +0100 Subject: [PATCH] feat: more capable csv consumer and producer This PR allows to use the CSV consumer and producer in a more versatile way. There is no breaking change to the interface. * fixes #263 (types built with an io.Reader should be able to produce CSV) * csv/consumer can now consume CSV into *csv.Writer, io.Writer, io.ReaderFrom, encoding.BinaryUnmarshaler * also supports the new CSVWriter interface, i.e. anything that can Write([]string) error like *csv.Writer * also supports pointers with underlying type *[][]string, *[]byte and *string, not just *[]byte * csv/producer can now produce CSV from *csv.Reader, io.Reader, io.WriterTo, encoding.BinaryMarshaler * also supports the new CSVReader interface, i.e. anything that can Read() ([]string, error) like *csv.Reader * also supports underlying types [][]string, []byte and string, not just []byte * CSVConsumer and CSVProducer now stream CSV records whenever possible, * like ByteStreamConsumer and Producer, added the CSVCloseStream() option * added support to (optionally) configure the CSV format with CSVOpts, using the options made available by the standard library * doc: documented the above in the exported func signatures * test: added full unit test of the CSVConsumer and Producer Signed-off-by: Frederic BIDON --- bytestream_test.go | 5 + csv.go | 326 +++++++++++++++++++++-- csv_options.go | 121 +++++++++ csv_test.go | 624 ++++++++++++++++++++++++++++++++++++++++++--- go.mod | 1 + go.sum | 2 + 6 files changed, 1021 insertions(+), 58 deletions(-) create mode 100644 csv_options.go diff --git a/bytestream_test.go b/bytestream_test.go index 38a97fdb..49669ddb 100644 --- a/bytestream_test.go +++ b/bytestream_test.go @@ -390,6 +390,7 @@ func TestByteStreamProducer(t *testing.T) { } type binaryUnmarshalDummy struct { + err error str string } @@ -398,6 +399,10 @@ type binaryUnmarshalDummyZeroAlloc struct { } func (b *binaryUnmarshalDummy) UnmarshalBinary(data []byte) error { + if b.err != nil { + return b.err + } + if len(data) == 0 { return errors.New("no text given") } diff --git a/csv.go b/csv.go index d807bd91..f3f48906 100644 --- a/csv.go +++ b/csv.go @@ -16,62 +16,334 @@ package runtime import ( "bytes" + "context" + "encoding" "encoding/csv" "errors" + "fmt" "io" + "reflect" + + "golang.org/x/sync/errgroup" ) -// CSVConsumer creates a new CSV consumer -func CSVConsumer() Consumer { +// CSVConsumer creates a new CSV consumer. +// +// The consumer consumes CSV records from a provided reader into the data passed by reference. +// +// CSVOpts options may be specified to alter the default CSV behavior on the reader and the writer side (e.g. separator, skip header, ...). +// The defaults are those of the standard library's csv.Reader and csv.Writer. +// +// Supported output underlying types and interfaces, prioritized in this order: +// - *csv.Writer +// - CSVWriter (writer options are ignored) +// - io.Writer (as raw bytes) +// - io.ReaderFrom (as raw bytes) +// - encoding.BinaryUnmarshaler (as raw bytes) +// - *[][]string (as a collection of records) +// - *[]byte (as raw bytes) +// - *string (a raw bytes) +// +// The consumer prioritizes situations where buffering the input is not required. +func CSVConsumer(opts ...CSVOpt) Consumer { + o := csvOptsWithDefaults(opts) + return ConsumerFunc(func(reader io.Reader, data interface{}) error { if reader == nil { return errors.New("CSVConsumer requires a reader") } + if data == nil { + return errors.New("nil destination for CSVConsumer") + } csvReader := csv.NewReader(reader) - writer, ok := data.(io.Writer) - if !ok { - return errors.New("data type must be io.Writer") + o.applyToReader(csvReader) + closer := defaultCloser + if o.closeStream { + if cl, isReaderCloser := reader.(io.Closer); isReaderCloser { + closer = cl.Close + } } - csvWriter := csv.NewWriter(writer) - records, err := csvReader.ReadAll() - if err != nil { + defer func() { + _ = closer() + }() + + switch destination := data.(type) { + case *csv.Writer: + csvWriter := destination + o.applyToWriter(csvWriter) + + return pipeCSV(csvWriter, csvReader, o) + + case CSVWriter: + csvWriter := destination + // no writer options available + + return pipeCSV(csvWriter, csvReader, o) + + case io.Writer: + csvWriter := csv.NewWriter(destination) + o.applyToWriter(csvWriter) + + return pipeCSV(csvWriter, csvReader, o) + + case io.ReaderFrom: + var buf bytes.Buffer + csvWriter := csv.NewWriter(&buf) + o.applyToWriter(csvWriter) + if err := bufferedCSV(csvWriter, csvReader, o); err != nil { + return err + } + _, err := destination.ReadFrom(&buf) + return err - } - for _, r := range records { - if err := csvWriter.Write(r); err != nil { + + case encoding.BinaryUnmarshaler: + var buf bytes.Buffer + csvWriter := csv.NewWriter(&buf) + o.applyToWriter(csvWriter) + if err := bufferedCSV(csvWriter, csvReader, o); err != nil { return err } + + return destination.UnmarshalBinary(buf.Bytes()) + + default: + // support *[][]string, *[]byte, *string + if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr { + return errors.New("destination must be a pointer") + } + + v := reflect.Indirect(reflect.ValueOf(data)) + t := v.Type() + + switch { + case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String: + csvWriter := &csvRecordsWriter{} + // writer options are ignored + if err := pipeCSV(csvWriter, csvReader, o); err != nil { + return err + } + v.Grow(len(csvWriter.records)) + v.SetCap(len(csvWriter.records)) // in case Grow was unnessary, trim down the capacity + v.SetLen(len(csvWriter.records)) + reflect.Copy(v, reflect.ValueOf(csvWriter.records)) + + return nil + + case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8: + var buf bytes.Buffer + csvWriter := csv.NewWriter(&buf) + o.applyToWriter(csvWriter) + if err := bufferedCSV(csvWriter, csvReader, o); err != nil { + return err + } + v.SetBytes(buf.Bytes()) + + return nil + + case t.Kind() == reflect.String: + var buf bytes.Buffer + csvWriter := csv.NewWriter(&buf) + o.applyToWriter(csvWriter) + if err := bufferedCSV(csvWriter, csvReader, o); err != nil { + return err + } + v.SetString(buf.String()) + + return nil + + default: + return fmt.Errorf("%v (%T) is not supported by the CSVConsumer, %s", + data, data, "can be resolved by supporting CSVWriter/Writer/BinaryUnmarshaler interface", + ) + } } - csvWriter.Flush() - return nil }) } -// CSVProducer creates a new CSV producer -func CSVProducer() Producer { +// CSVProducer creates a new CSV producer. +// +// The producer takes input data then writes as CSV to an output writer (essentially as a pipe). +// +// Supported input underlying types and interfaces, prioritized in this order: +// - *csv.Reader +// - CSVReader (reader options are ignored) +// - io.Reader +// - io.WriterTo +// - encoding.BinaryMarshaler +// - [][]string +// - []byte +// - string +// +// The producer prioritizes situations where buffering the input is not required. +func CSVProducer(opts ...CSVOpt) Producer { + o := csvOptsWithDefaults(opts) + return ProducerFunc(func(writer io.Writer, data interface{}) error { if writer == nil { return errors.New("CSVProducer requires a writer") } + if data == nil { + return errors.New("nil data for CSVProducer") + } - dataBytes, ok := data.([]byte) - if !ok { - return errors.New("data type must be byte array") + csvWriter := csv.NewWriter(writer) + o.applyToWriter(csvWriter) + closer := defaultCloser + if o.closeStream { + if cl, isWriterCloser := writer.(io.Closer); isWriterCloser { + closer = cl.Close + } } + defer func() { + _ = closer() + }() - csvReader := csv.NewReader(bytes.NewBuffer(dataBytes)) - records, err := csvReader.ReadAll() - if err != nil { - return err + if rc, isDataCloser := data.(io.ReadCloser); isDataCloser { + defer rc.Close() } - csvWriter := csv.NewWriter(writer) - for _, r := range records { - if err := csvWriter.Write(r); err != nil { + + switch origin := data.(type) { + case *csv.Reader: + csvReader := origin + o.applyToReader(csvReader) + + return pipeCSV(csvWriter, csvReader, o) + + case CSVReader: + csvReader := origin + // no reader options available + + return pipeCSV(csvWriter, csvReader, o) + + case io.Reader: + csvReader := csv.NewReader(origin) + o.applyToReader(csvReader) + + return pipeCSV(csvWriter, csvReader, o) + + case io.WriterTo: + // async piping of the writes performed by WriteTo + r, w := io.Pipe() + csvReader := csv.NewReader(r) + o.applyToReader(csvReader) + + pipe, _ := errgroup.WithContext(context.Background()) + pipe.Go(func() error { + _, err := origin.WriteTo(w) + _ = w.Close() + return err + }) + + pipe.Go(func() error { + defer func() { + _ = r.Close() + }() + + return pipeCSV(csvWriter, csvReader, o) + }) + + return pipe.Wait() + + case encoding.BinaryMarshaler: + buf, err := origin.MarshalBinary() + if err != nil { return err } + rdr := bytes.NewBuffer(buf) + csvReader := csv.NewReader(rdr) + + return bufferedCSV(csvWriter, csvReader, o) + + default: + // support [][]string, []byte, string (or pointers to those) + v := reflect.Indirect(reflect.ValueOf(data)) + t := v.Type() + + switch { + case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String: + csvReader := &csvRecordsWriter{ + records: make([][]string, v.Len()), + } + reflect.Copy(reflect.ValueOf(csvReader.records), v) + + return pipeCSV(csvWriter, csvReader, o) + + case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8: + buf := bytes.NewBuffer(v.Bytes()) + csvReader := csv.NewReader(buf) + o.applyToReader(csvReader) + + return bufferedCSV(csvWriter, csvReader, o) + + case t.Kind() == reflect.String: + buf := bytes.NewBufferString(v.String()) + csvReader := csv.NewReader(buf) + o.applyToReader(csvReader) + + return bufferedCSV(csvWriter, csvReader, o) + + default: + return fmt.Errorf("%v (%T) is not supported by the CSVProducer, %s", + data, data, "can be resolved by supporting CSVReader/Reader/BinaryMarshaler interface", + ) + } } - csvWriter.Flush() - return nil }) } + +// pipeCSV copies CSV records from a CSV reader to a CSV writer +func pipeCSV(csvWriter CSVWriter, csvReader CSVReader, opts csvOpts) error { + for ; opts.skippedLines > 0; opts.skippedLines-- { + _, err := csvReader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + + return err + } + } + + for { + record, err := csvReader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return err + } + + if err := csvWriter.Write(record); err != nil { + return err + } + } + + csvWriter.Flush() + + return csvWriter.Error() +} + +// bufferedCSV copies CSV records from a CSV reader to a CSV writer, +// by first reading all records then writing them at once. +func bufferedCSV(csvWriter *csv.Writer, csvReader *csv.Reader, opts csvOpts) error { + for ; opts.skippedLines > 0; opts.skippedLines-- { + _, err := csvReader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + + return err + } + } + + records, err := csvReader.ReadAll() + if err != nil { + return err + } + + return csvWriter.WriteAll(records) +} diff --git a/csv_options.go b/csv_options.go new file mode 100644 index 00000000..c16464c5 --- /dev/null +++ b/csv_options.go @@ -0,0 +1,121 @@ +package runtime + +import ( + "encoding/csv" + "io" +) + +// CSVOpts alter the behavior of the CSV consumer or producer. +type CSVOpt func(*csvOpts) + +type csvOpts struct { + csvReader csv.Reader + csvWriter csv.Writer + skippedLines int + closeStream bool +} + +// WithCSVReaderOpts specifies the options to csv.Reader +// when reading CSV. +func WithCSVReaderOpts(reader csv.Reader) CSVOpt { + return func(o *csvOpts) { + o.csvReader = reader + } +} + +// WithCSVWriterOpts specifies the options to csv.Writer +// when writing CSV. +func WithCSVWriterOpts(writer csv.Writer) CSVOpt { + return func(o *csvOpts) { + o.csvWriter = writer + } +} + +// WithCSVSkipLines will skip header lines. +func WithCSVSkipLines(skipped int) CSVOpt { + return func(o *csvOpts) { + o.skippedLines = skipped + } +} + +func WithCSVClosesStream() CSVOpt { + return func(o *csvOpts) { + o.closeStream = true + } +} + +func (o csvOpts) applyToReader(in *csv.Reader) { + if o.csvReader.Comma != 0 { + in.Comma = o.csvReader.Comma + } + if o.csvReader.Comment != 0 { + in.Comment = o.csvReader.Comment + } + if o.csvReader.FieldsPerRecord != 0 { + in.FieldsPerRecord = o.csvReader.FieldsPerRecord + } + + in.LazyQuotes = o.csvReader.LazyQuotes + in.TrimLeadingSpace = o.csvReader.TrimLeadingSpace + in.ReuseRecord = o.csvReader.ReuseRecord +} + +func (o csvOpts) applyToWriter(in *csv.Writer) { + if o.csvWriter.Comma != 0 { + in.Comma = o.csvWriter.Comma + } + in.UseCRLF = o.csvWriter.UseCRLF +} + +func csvOptsWithDefaults(opts []CSVOpt) csvOpts { + var o csvOpts + for _, apply := range opts { + apply(&o) + } + + return o +} + +type CSVWriter interface { + Write([]string) error + Flush() + Error() error +} + +type CSVReader interface { + Read() ([]string, error) +} + +var ( + _ CSVWriter = &csvRecordsWriter{} + _ CSVReader = &csvRecordsWriter{} +) + +// csvRecordsWriter is an internal container to move CSV records back and forth +type csvRecordsWriter struct { + i int + records [][]string +} + +func (w *csvRecordsWriter) Write(record []string) error { + w.records = append(w.records, record) + + return nil +} + +func (w *csvRecordsWriter) Read() ([]string, error) { + if w.i >= len(w.records) { + return nil, io.EOF + } + defer func() { + w.i++ + }() + + return w.records[w.i], nil +} + +func (w *csvRecordsWriter) Flush() {} + +func (w *csvRecordsWriter) Error() error { + return nil +} diff --git a/csv_test.go b/csv_test.go index 7a7b6e56..a9d8edfe 100644 --- a/csv_test.go +++ b/csv_test.go @@ -16,62 +16,595 @@ package runtime import ( "bytes" + "encoding/csv" + "errors" "io" "net/http/httptest" + "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -const consProdCSV = `name,country,age +const ( + csvFixture = `name,country,age John,US,19 Mike,US,20 ` + badCSVFixture = `name,country,age +John,US,19 +Mike,US +` + commentedCSVFixture = `# heading line +name,country,age +#John's record +John,US,19 +#Mike's record +Mike,US,20 +` +) -type csvEmptyReader struct{} - -func (r *csvEmptyReader) Read(_ []byte) (int, error) { - return 0, io.EOF +var testCSVRecords = [][]string{ + {"name", "country", "age"}, + {"John", "US", "19"}, + {"Mike", "US", "20"}, } func TestCSVConsumer(t *testing.T) { - cons := CSVConsumer() - reader := bytes.NewBufferString(consProdCSV) + consumer := CSVConsumer() + + t.Run("can consume as a *csv.Writer", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var buf bytes.Buffer + dest := csv.NewWriter(&buf) + + err := consumer.Consume(reader, dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, buf.String()) + }) + + t.Run("can consume as a CSVReader", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest csvRecordsWriter + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assertCSVRecords(t, dest.records) + }) + + t.Run("can consume as a Writer", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest closingWriter + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, dest.b.String()) + }) + + t.Run("can consume as a ReaderFrom", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest readerFromDummy + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, dest.b.String()) + }) + + t.Run("can consume as a BinaryUnmarshaler", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest binaryUnmarshalDummy + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, dest.str) + }) + + t.Run("can consume as a *[][]string", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + dest := [][]string{} + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assertCSVRecords(t, dest) + }) + + t.Run("can consume as an alias to *[][]string", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + type records [][]string + var dest records + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assertCSVRecords(t, dest) + }) + + t.Run("can consume as a *[]byte", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest []byte + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, string(dest)) + }) + + t.Run("can consume as an alias to *[]byte", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + type buffer []byte + var dest buffer + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, string(dest)) + }) + + t.Run("can consume as a *string", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest string + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, dest) + }) + + t.Run("can consume as an alias to *string", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + type buffer string + var dest buffer + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, string(dest)) + }) + + t.Run("can consume from an empty reader", func(t *testing.T) { + reader := &csvEmptyReader{} + var dest bytes.Buffer + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Empty(t, dest.String()) + }) + + t.Run("error cases", func(t *testing.T) { + t.Run("nil data is never accepted", func(t *testing.T) { + var rdr bytes.Buffer + + require.Error(t, consumer.Consume(&rdr, nil)) + }) + + t.Run("nil readers should also never be acccepted", func(t *testing.T) { + var buf bytes.Buffer - outBuf := new(bytes.Buffer) - err := cons.Consume(reader, outBuf) - require.NoError(t, err) - assert.Equal(t, consProdCSV, outBuf.String()) + err := consumer.Consume(nil, &buf) + require.Error(t, err) + }) - outBuf2 := new(bytes.Buffer) - err = cons.Consume(nil, outBuf2) - require.Error(t, err) + t.Run("data must be a pointer", func(t *testing.T) { + var rdr bytes.Buffer + var dest []byte - err = cons.Consume(reader, struct{}{}) - require.Error(t, err) + err := consumer.Consume(&rdr, dest) + require.Error(t, err) + }) - emptyOutBuf := new(bytes.Buffer) - err = cons.Consume(&csvEmptyReader{}, emptyOutBuf) - require.NoError(t, err) - assert.Equal(t, "", emptyOutBuf.String()) + t.Run("unsupported type", func(t *testing.T) { + var rdr bytes.Buffer + var dest struct{} + + err := consumer.Consume(&rdr, &dest) + require.Error(t, err) + }) + + t.Run("should propagate CSV error (buffered)", func(t *testing.T) { + reader := bytes.NewBufferString(badCSVFixture) + var dest []byte + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 3: wrong number of fields") + }) + + t.Run("should propagate CSV error (buffered, string)", func(t *testing.T) { + reader := bytes.NewBufferString(badCSVFixture) + var dest string + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 3: wrong number of fields") + }) + + t.Run("should propagate CSV error (buffered, ReaderFrom)", func(t *testing.T) { + reader := bytes.NewBufferString(badCSVFixture) + var dest readerFromDummy + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 3: wrong number of fields") + }) + + t.Run("should propagate CSV error (buffered, BinaryUnmarshaler)", func(t *testing.T) { + reader := bytes.NewBufferString(badCSVFixture) + var dest binaryUnmarshalDummy + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 3: wrong number of fields") + }) + + t.Run("should propagate CSV error (streaming)", func(t *testing.T) { + reader := bytes.NewBufferString(badCSVFixture) + var dest bytes.Buffer + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 3: wrong number of fields") + }) + + t.Run("should propagate CSV error (streaming, write error)", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var buf bytes.Buffer + dest := csvWriterDummy{err: errors.New("test error"), Writer: csv.NewWriter(&buf)} + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "test error") + }) + + t.Run("should propagate ReaderFrom error", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + dest := readerFromDummy{err: errors.New("test error")} + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "test error") + }) + + t.Run("should propagate BinaryUnmarshaler error", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + dest := binaryUnmarshalDummy{err: errors.New("test error")} + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "test error") + }) + }) +} + +func TestCSVConsumerWithOptions(t *testing.T) { + semiColonFixture := strings.ReplaceAll(csvFixture, ",", ";") + + t.Run("with CSV reader Comma", func(t *testing.T) { + consumer := CSVConsumer(WithCSVReaderOpts(csv.Reader{Comma: ';', FieldsPerRecord: 3})) + + t.Run("should not read comma-separated input", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest bytes.Buffer + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.EqualError(t, err, "record on line 1: wrong number of fields") + }) + + t.Run("should read semicolon-separated input and convert it to colon-separated", func(t *testing.T) { + reader := bytes.NewBufferString(semiColonFixture) + var dest bytes.Buffer + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, csvFixture, dest.String()) + }) + }) + + t.Run("with CSV reader Comment", func(t *testing.T) { + consumer := CSVConsumer(WithCSVReaderOpts(csv.Reader{Comment: '#'})) + + t.Run("should read input and skip commented lines", func(t *testing.T) { + reader := bytes.NewBufferString(commentedCSVFixture) + var dest [][]string + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assertCSVRecords(t, dest) + }) + }) + + t.Run("with CSV writer Comma", func(t *testing.T) { + consumer := CSVConsumer(WithCSVWriterOpts(csv.Writer{Comma: ';'})) + + t.Run("should read comma-separated input and convert it to semicolon-separated", func(t *testing.T) { + reader := bytes.NewBufferString(csvFixture) + var dest bytes.Buffer + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + assert.Equal(t, semiColonFixture, dest.String()) + }) + }) + + t.Run("with SkipLines (streaming)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(1)) + reader := bytes.NewBufferString(csvFixture) + var dest [][]string + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + + expected := testCSVRecords[1:] + assert.Equalf(t, expected, dest, "expected output to skip header") + }) + + t.Run("with SkipLines (buffered)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(1)) + reader := bytes.NewBufferString(csvFixture) + var dest []byte + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + + r := csv.NewReader(bytes.NewReader(dest)) + consumed, err := r.ReadAll() + require.NoError(t, err) + expected := testCSVRecords[1:] + assert.Equalf(t, expected, consumed, "expected output to skip header") + }) + + t.Run("should detect errors on skipped lines (streaming)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(1)) + reader := bytes.NewBufferString(strings.ReplaceAll(csvFixture, ",age", `,"age`)) + var dest [][]string + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.ErrorContains(t, err, "record on line 1; parse error") + }) + + t.Run("should detect errors on skipped lines (buffered)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(1)) + reader := bytes.NewBufferString(strings.ReplaceAll(csvFixture, ",age", `,"age`)) + var dest []byte + + err := consumer.Consume(reader, &dest) + require.Error(t, err) + require.ErrorContains(t, err, "record on line 1; parse error") + }) + + t.Run("with SkipLines greater than the total number of lines (streaming)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(4)) + reader := bytes.NewBufferString(csvFixture) + var dest [][]string + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + + assert.Empty(t, dest) + }) + + t.Run("with SkipLines greater than the total number of lines (buffered)", func(t *testing.T) { + consumer := CSVConsumer(WithCSVSkipLines(4)) + reader := bytes.NewBufferString(csvFixture) + var dest []byte + + err := consumer.Consume(reader, &dest) + require.NoError(t, err) + + assert.Empty(t, dest) + }) + + t.Run("with CloseStream", func(t *testing.T) { + t.Run("wants to close stream", func(t *testing.T) { + closingConsumer := CSVConsumer(WithCSVClosesStream()) + var dest bytes.Buffer + r := &closingReader{b: bytes.NewBufferString(csvFixture)} + + require.NoError(t, closingConsumer.Consume(r, &dest)) + assert.Equal(t, csvFixture, dest.String()) + assert.EqualValues(t, 1, r.calledClose) + }) + + t.Run("don't want to close stream", func(t *testing.T) { + nonClosingConsumer := CSVConsumer() + var dest bytes.Buffer + r := &closingReader{b: bytes.NewBufferString(csvFixture)} + + require.NoError(t, nonClosingConsumer.Consume(r, &dest)) + assert.Equal(t, csvFixture, dest.String()) + assert.EqualValues(t, 0, r.calledClose) + }) + }) } func TestCSVProducer(t *testing.T) { - prod := CSVProducer() - data := []byte(consProdCSV) + producer := CSVProducer() + + t.Run("can produce CSV from *csv.Reader", func(t *testing.T) { + writer := new(bytes.Buffer) + buf := bytes.NewBufferString(csvFixture) + data := csv.NewReader(buf) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from CSVReader", func(t *testing.T) { + writer := new(bytes.Buffer) + data := &csvRecordsWriter{ + records: testCSVRecords, + } + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from Reader", func(t *testing.T) { + writer := new(bytes.Buffer) + data := bytes.NewReader([]byte(csvFixture)) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from WriterTo", func(t *testing.T) { + writer := new(bytes.Buffer) + buf := bytes.NewBufferString(csvFixture) + data := &writerToDummy{ + b: *buf, + } + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from BinaryMarshaler", func(t *testing.T) { + writer := new(bytes.Buffer) + data := &binaryMarshalDummy{str: csvFixture} + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from [][]string", func(t *testing.T) { + writer := new(bytes.Buffer) + data := testCSVRecords + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from alias to [][]string", func(t *testing.T) { + writer := new(bytes.Buffer) + type records [][]string + data := records(testCSVRecords) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.String()) + }) + + t.Run("can produce CSV from []byte", func(t *testing.T) { + writer := httptest.NewRecorder() + data := []byte(csvFixture) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.Body.String()) + }) + + t.Run("can produce CSV from alias to []byte", func(t *testing.T) { + writer := httptest.NewRecorder() + type buffer []byte + data := buffer(csvFixture) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.Body.String()) + }) + + t.Run("can produce CSV from string", func(t *testing.T) { + writer := httptest.NewRecorder() + data := csvFixture + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.Body.String()) + }) + + t.Run("can produce CSV from alias to string", func(t *testing.T) { + writer := httptest.NewRecorder() + type buffer string + data := buffer(csvFixture) + + err := producer.Produce(writer, data) + require.NoError(t, err) + assert.Equal(t, csvFixture, writer.Body.String()) + }) - rw := httptest.NewRecorder() - err := prod.Produce(rw, data) - require.NoError(t, err) - assert.Equal(t, consProdCSV, rw.Body.String()) + t.Run("always close data reader whenever possible", func(t *testing.T) { + nonClosingProducer := CSVProducer() + r := &closingWriter{} + data := &closingReader{b: bytes.NewBufferString(csvFixture)} - rw2 := httptest.NewRecorder() - err = prod.Produce(rw2, struct{}{}) - require.Error(t, err) + require.NoError(t, nonClosingProducer.Produce(r, data)) + assert.Equal(t, csvFixture, r.String()) + assert.EqualValuesf(t, 0, r.calledClose, "expected the input reader NOT to be closed") + assert.EqualValuesf(t, 1, data.calledClose, "expected the data reader to be closed") + }) - err = prod.Produce(nil, data) - require.Error(t, err) + t.Run("error cases", func(t *testing.T) { + t.Run("unsupported type", func(t *testing.T) { + writer := httptest.NewRecorder() + var data struct{} + + err := producer.Produce(writer, data) + require.Error(t, err) + }) + + t.Run("data cannot be nil", func(t *testing.T) { + writer := httptest.NewRecorder() + + err := producer.Produce(writer, nil) + require.Error(t, err) + }) + + t.Run("writer cannot be nil", func(t *testing.T) { + data := []byte(csvFixture) + + err := producer.Produce(nil, data) + require.Error(t, err) + }) + + t.Run("should propagate error from BinaryMarshaler", func(t *testing.T) { + var rdr bytes.Buffer + data := new(binaryMarshalDummy) + + err := producer.Produce(&rdr, data) + require.Error(t, err) + require.ErrorContains(t, err, "no text set") + }) + }) +} + +func TestCSVProducerWithOptions(t *testing.T) { + t.Run("with CloseStream", func(t *testing.T) { + t.Run("wants to close stream", func(t *testing.T) { + closingProducer := CSVProducer(WithCSVClosesStream()) + r := &closingWriter{} + data := bytes.NewBufferString(csvFixture) + + require.NoError(t, closingProducer.Produce(r, data)) + assert.Equal(t, csvFixture, r.String()) + assert.EqualValues(t, 1, r.calledClose) + }) + + t.Run("don't want to close stream", func(t *testing.T) { + nonClosingProducer := CSVProducer() + r := &closingWriter{} + data := bytes.NewBufferString(csvFixture) + + require.NoError(t, nonClosingProducer.Produce(r, data)) + assert.Equal(t, csvFixture, r.String()) + assert.EqualValues(t, 0, r.calledClose) + }) + }) +} + +func assertCSVRecords(t testing.TB, dest [][]string) { + assert.Len(t, dest, 3) + for i, record := range dest { + assert.Equal(t, testCSVRecords[i], record) + } +} + +type csvEmptyReader struct{} + +func (r *csvEmptyReader) Read(_ []byte) (int, error) { + return 0, io.EOF } type readerFromDummy struct { @@ -86,3 +619,32 @@ func (r *readerFromDummy) ReadFrom(rdr io.Reader) (int64, error) { return r.b.ReadFrom(rdr) } + +type writerToDummy struct { + b bytes.Buffer +} + +func (w *writerToDummy) WriteTo(writer io.Writer) (int64, error) { + return w.b.WriteTo(writer) +} + +type csvWriterDummy struct { + err error + *csv.Writer +} + +func (w *csvWriterDummy) Write(record []string) error { + if w.err != nil { + return w.err + } + + return w.Writer.Write(record) +} + +func (w *csvWriterDummy) Error() error { + if w.err != nil { + return w.err + } + + return w.Writer.Error() +} diff --git a/go.mod b/go.mod index a2d0769f..befcfc8a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/otel v1.17.0 go.opentelemetry.io/otel/sdk v1.17.0 go.opentelemetry.io/otel/trace v1.17.0 + golang.org/x/sync v0.5.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 0e3a0d9c..495c3eb7 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=