From 32ac6bf0467da0754193cc29d78a62e6c3ca6bd9 Mon Sep 17 00:00:00 2001 From: Hang Xie <7977860+hangxie@users.noreply.github.com> Date: Mon, 3 Jul 2023 14:59:05 -0700 Subject: [PATCH] fix memory pressure in cat command with large skip number --- cmd/cat.go | 33 +++++++++++++++++++++++++++++---- cmd/cat_test.go | 2 +- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/cmd/cat.go b/cmd/cat.go index d49cc717..37af8744 100644 --- a/cmd/cat.go +++ b/cmd/cat.go @@ -42,6 +42,15 @@ var delimiter = map[string]struct { "tsv": {"", "\n", '\t', ""}, } +// here are performan number from a MacBook for different settings: +// page_size max_memory_usage time_taken +// 1K 1.9G 25s +// 10K 1.8G 15s +// 100K 2.4G 12s +// 1M 7.1G 15s +// 10M 52.1G 1m14s +const pageSize int64 = 100_000 + // Run does actual cat job func (c CatCmd) Run() error { if c.PageSize < 1 { @@ -92,6 +101,23 @@ func (c CatCmd) outputHeader(fileReader *reader.ParquetReader, schemaRoot *inter return fieldList, nil } +func (c CatCmd) skipRows(fileReader *reader.ParquetReader) error { + // Do not abort if c.Skip is greater than total number of rows + // This gives users flexibility to handle this scenario by themselves + + // use pagination to avoid excessive memory usage, see https://github.com/xitongsys/parquet-go/issues/545 + rowsToSkip := int64(c.Skip) + for ; rowsToSkip > pageSize; rowsToSkip -= pageSize { + if err := fileReader.SkipRows(pageSize); err != nil { + return fmt.Errorf("failed to skip %d rows: %s", c.Skip, err) + } + } + if err := fileReader.SkipRows(rowsToSkip); err != nil { + return fmt.Errorf("failed to skip %d rows: %s", c.Skip, err) + } + return nil +} + func (c CatCmd) outputRows(fileReader *reader.ParquetReader) error { schemaRoot := internal.NewSchemaTree(fileReader) @@ -104,10 +130,9 @@ func (c CatCmd) outputRows(fileReader *reader.ParquetReader) error { // retrieve schema for better formatting reinterpretFields := schemaRoot.GetReinterpretFields("", true) - // Do not abort if c.Skip is greater than total number of rows - // This gives users flexibility to handle this scenario by themselves - if err := fileReader.SkipRows(int64(c.Skip)); err != nil { - return fmt.Errorf("failed to skip %d rows: %s", c.Skip, err) + // skip rows + if err != c.skipRows(fileReader) { + return err } // Output rows one by one to avoid running out of memory with a jumbo list diff --git a/cmd/cat_test.go b/cmd/cat_test.go index 4b4d890d..5520f230 100644 --- a/cmd/cat_test.go +++ b/cmd/cat_test.go @@ -142,7 +142,7 @@ func Test_CatCmd_Run_good_skip(t *testing.T) { func Test_CatCmd_Run_good_all_skip(t *testing.T) { cmd := &CatCmd{} - cmd.Skip = 12 + cmd.Skip = 100_002 cmd.Limit = 10 cmd.PageSize = 10 cmd.SampleRatio = 1.0