From a435aa0161a8ab50d29241a8d6f578f69e7a0c06 Mon Sep 17 00:00:00 2001 From: LeeHyeonHee Date: Thu, 16 May 2024 16:45:03 +0900 Subject: [PATCH] (feature) Added column case sensitivity option when reading parquet read --- reader/reader.go | 54 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/reader/reader.go b/reader/reader.go index 0c9253b6..795a19f6 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -17,6 +17,10 @@ import ( "github.com/xitongsys/parquet-go/source" ) +type ParquetReaderOptions struct { + CaseInsensitive bool +} + type ParquetReader struct { SchemaHandler *schema.SchemaHandler NP int64 //parallel number @@ -28,14 +32,23 @@ type ParquetReader struct { //One reader can only read one type objects ObjType reflect.Type ObjPartialType reflect.Type + + //Determines whether case sensitivity is enabled + CaseInsensitive bool } -//Create a parquet reader: obj is a object with schema tags or a JSON schema string -func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64) (*ParquetReader, error) { +// Create a parquet reader: obj is a object with schema tags or a JSON schema string +func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64, opts ...ParquetReaderOptions) (*ParquetReader, error) { + var caseInsensitive bool + if len(opts) > 0 { + caseInsensitive = opts[0].CaseInsensitive + } + var err error res := new(ParquetReader) res.NP = np res.PFile = pFile + res.CaseInsensitive = caseInsensitive if err = res.ReadFooter(); err != nil { return nil, err } @@ -95,19 +108,32 @@ func (pr *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error { return nil } -//Rename schema name to inname +// Rename schema name to inname func (pr *ParquetReader) RenameSchema() { for i := 0; i < len(pr.SchemaHandler.Infos); i++ { pr.Footer.Schema[i].Name = pr.SchemaHandler.Infos[i].InName } + + exPathToInPath := make(map[string]string) + if pr.CaseInsensitive { + for exPath, inPath := range pr.SchemaHandler.ExPathToInPath { + lowerCaseKey := strings.ToLower(exPath) + exPathToInPath[lowerCaseKey] = inPath + } + } else { + exPathToInPath = pr.SchemaHandler.ExPathToInPath + } + for _, rowGroup := range pr.Footer.RowGroups { for _, chunk := range rowGroup.Columns { - exPath := make([]string, 0) - exPath = append(exPath, pr.SchemaHandler.GetRootExName()) - exPath = append(exPath, chunk.MetaData.GetPathInSchema()...) + exPath := append([]string{pr.SchemaHandler.GetRootExName()}, chunk.MetaData.GetPathInSchema()...) exPathStr := common.PathToStr(exPath) - inPathStr := pr.SchemaHandler.ExPathToInPath[exPathStr] + if pr.CaseInsensitive { + exPathStr = strings.ToLower(exPathStr) + } + + inPathStr := exPathToInPath[exPathStr] inPath := common.StrToPath(inPathStr)[1:] chunk.MetaData.PathInSchema = inPath } @@ -118,7 +144,7 @@ func (pr *ParquetReader) GetNumRows() int64 { return pr.Footer.GetNumRows() } -//Get the footer size +// Get the footer size func (pr *ParquetReader) GetFooterSize() (uint32, error) { var err error buf := make([]byte, 4) @@ -132,7 +158,7 @@ func (pr *ParquetReader) GetFooterSize() (uint32, error) { return size, err } -//Read footer from parquet file +// Read footer from parquet file func (pr *ParquetReader) ReadFooter() error { size, err := pr.GetFooterSize() if err != nil { @@ -149,7 +175,7 @@ func (pr *ParquetReader) ReadFooter() error { return pr.Footer.Read(context.TODO(), protocol) } -//Skip rows of parquet file +// Skip rows of parquet file func (pr *ParquetReader) SkipRows(num int64) error { var err error if num <= 0 { @@ -195,7 +221,7 @@ func (pr *ParquetReader) SkipRows(num int64) error { return err } -//Read rows of parquet file and unmarshal all to dst +// Read rows of parquet file and unmarshal all to dst func (pr *ParquetReader) Read(dstInterface interface{}) error { return pr.read(dstInterface, "") } @@ -226,7 +252,7 @@ func (pr *ParquetReader) ReadByNumber(maxReadNumber int) ([]interface{}, error) return ret, nil } -//Read rows of parquet file and unmarshal all to dst +// Read rows of parquet file and unmarshal all to dst func (pr *ParquetReader) ReadPartial(dstInterface interface{}, prefixPath string) error { prefixPath, err := pr.SchemaHandler.ConvertToInPathStr(prefixPath) if err != nil { @@ -262,7 +288,7 @@ func (pr *ParquetReader) ReadPartialByNumber(maxReadNumber int, prefixPath strin return ret, nil } -//Read rows of parquet file with a prefixPath +// Read rows of parquet file with a prefixPath func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error { var err error tmap := make(map[string]*layout.Table) @@ -352,7 +378,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error return err } -//Stop Read +// Stop Read func (pr *ParquetReader) ReadStop() { for _, cb := range pr.ColumnBuffers { if cb != nil {