diff --git a/common/common.go b/common/common.go index e5b571ca..75c44a80 100644 --- a/common/common.go +++ b/common/common.go @@ -3,6 +3,7 @@ package common import ( "bytes" "encoding/gob" + "errors" "fmt" "reflect" "strconv" @@ -69,7 +70,7 @@ func NewTag() *Tag { } } -func StringToTag(tag string) *Tag { +func StringToTag(tag string) (*Tag, error) { mp := NewTag() tagStr := strings.Replace(tag, "\t", "", -1) tags := strings.Split(tagStr, ",") @@ -78,7 +79,9 @@ func StringToTag(tag string) *Tag { tag = strings.TrimSpace(tag) kv := strings.SplitN(tag, "=", 2) - + if len(kv) != 2 { + return nil, fmt.Errorf("expect 'key=value' but got '%s'", tag) + } key := kv[0] key = strings.ToLower(key) key = strings.TrimSpace(key) @@ -86,22 +89,7 @@ func StringToTag(tag string) *Tag { val := kv[1] val = strings.TrimSpace(val) - valInt32 := func() int32 { - valInt, err := strconv.Atoi(val) - if err != nil { - panic(err) - } - return int32(valInt) - } - - valBoolean := func() bool { - valBoolean, err := strconv.ParseBool(val) - if err != nil { - panic(err) - } - return valBoolean - } - + var err error switch key { case "type": mp.Type = val @@ -116,35 +104,65 @@ func StringToTag(tag string) *Tag { case "valueconvertedtype": mp.ValueConvertedType = val case "length": - mp.Length = valInt32() + if mp.Length, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse length: %s", err.Error()) + } case "keylength": - mp.KeyLength = valInt32() + if mp.KeyLength, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse keylength: %s", err.Error()) + } case "valuelength": - mp.ValueLength = valInt32() + if mp.ValueLength, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse valuelength: %s", err.Error()) + } case "scale": - mp.Scale = valInt32() + if mp.Scale, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse scale: %s", err.Error()) + } case "keyscale": - mp.KeyScale = valInt32() + if mp.KeyScale, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse keyscale: %s", err.Error()) + } case "valuescale": - mp.ValueScale = valInt32() + if mp.ValueScale, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse valuescale: %s", err.Error()) + } case "precision": - mp.Precision = valInt32() + if mp.Precision, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse precision: %s", err.Error()) + } case "keyprecision": - mp.KeyPrecision = valInt32() + if mp.KeyPrecision, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse keyprecision: %s", err.Error()) + } case "valueprecision": - mp.ValuePrecision = valInt32() + if mp.ValuePrecision, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse valueprecision: %s", err.Error()) + } case "fieldid": - mp.FieldID = valInt32() + if mp.FieldID, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse fieldid: %s", err.Error()) + } case "keyfieldid": - mp.KeyFieldID = valInt32() + if mp.KeyFieldID, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse keyfieldid: %s", err.Error()) + } case "valuefieldid": - mp.ValueFieldID = valInt32() + if mp.ValueFieldID, err = Str2Int32(val); err != nil { + return nil, fmt.Errorf("failed to parse valuefieldid: %s", err.Error()) + } case "isadjustedtoutc": - mp.IsAdjustedToUTC = valBoolean() + if mp.IsAdjustedToUTC, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse isadjustedtoutc: %s", err.Error()) + } case "keyisadjustedtoutc": - mp.KeyIsAdjustedToUTC = valBoolean() + if mp.KeyIsAdjustedToUTC, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse keyisadjustedtoutc: %s", err.Error()) + } case "valueisadjustedtoutc": - mp.ValueIsAdjustedToUTC = valBoolean() + if mp.ValueIsAdjustedToUTC, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse valueisadjustedtoutc: %s", err.Error()) + } case "name": if mp.InName == "" { mp.InName = StringToVariableName(val) @@ -153,11 +171,17 @@ func StringToTag(tag string) *Tag { case "inname": mp.InName = val case "omitstats": - mp.OmitStats = valBoolean() + if mp.OmitStats, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse omitstats: %s", err.Error()) + } case "keyomitstats": - mp.KeyOmitStats = valBoolean() + if mp.KeyOmitStats, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse keyomitstats: %s", err.Error()) + } case "valueomitstats": - mp.ValueOmitStats = valBoolean() + if mp.ValueOmitStats, err = Str2Bool(val); err != nil { + return nil, fmt.Errorf("failed to parse valueomitstats: %s", err.Error()) + } case "repetitiontype": switch strings.ToLower(val) { case "repeated": @@ -167,7 +191,7 @@ func StringToTag(tag string) *Tag { case "optional": mp.RepetitionType = parquet.FieldRepetitionType_OPTIONAL default: - panic(fmt.Errorf("Unknown repetitiontype: '%v'", val)) + return nil, fmt.Errorf("unknown repetitiontype: '%v'", val) } case "keyrepetitiontype": switch strings.ToLower(val) { @@ -178,7 +202,7 @@ func StringToTag(tag string) *Tag { case "optional": mp.KeyRepetitionType = parquet.FieldRepetitionType_OPTIONAL default: - panic(fmt.Errorf("Unknown keyrepetitiontype: '%v'", val)) + return nil, fmt.Errorf("unknown keyrepetitiontype: '%v'", val) } case "valuerepetitiontype": switch strings.ToLower(val) { @@ -189,7 +213,7 @@ func StringToTag(tag string) *Tag { case "optional": mp.ValueRepetitionType = parquet.FieldRepetitionType_OPTIONAL default: - panic(fmt.Errorf("Unknown valuerepetitiontype: '%v'", val)) + return nil, fmt.Errorf("unknown valuerepetitiontype: '%v'", val) } case "encoding": switch strings.ToLower(val) { @@ -210,7 +234,7 @@ func StringToTag(tag string) *Tag { case "byte_stream_split": mp.Encoding = parquet.Encoding_BYTE_STREAM_SPLIT default: - panic(fmt.Errorf("Unknown encoding type: '%v'", val)) + return nil, fmt.Errorf("unknown encoding type: '%v'", val) } case "keyencoding": switch strings.ToLower(val) { @@ -227,7 +251,7 @@ func StringToTag(tag string) *Tag { case "byte_stream_split": mp.KeyEncoding = parquet.Encoding_BYTE_STREAM_SPLIT default: - panic(fmt.Errorf("Unknown keyencoding type: '%v'", val)) + return nil, fmt.Errorf("unknown keyencoding type: '%v'", val) } case "valueencoding": switch strings.ToLower(val) { @@ -244,7 +268,7 @@ func StringToTag(tag string) *Tag { case "byte_stream_split": mp.ValueEncoding = parquet.Encoding_BYTE_STREAM_SPLIT default: - panic(fmt.Errorf("Unknown valueencoding type: '%v'", val)) + return nil, fmt.Errorf("unknown valueencoding type: '%v'", val) } default: if strings.HasPrefix(key, "logicaltype") { @@ -256,14 +280,14 @@ func StringToTag(tag string) *Tag { newKey := key[5:] mp.ValueLogicalTypeFields[newKey] = val } else { - panic(fmt.Errorf("Unrecognized tag '%v'", key)) + return nil, fmt.Errorf("unrecognized tag '%v'", key) } } } - return mp + return mp, nil } -func NewSchemaElementFromTagMap(info *Tag) *parquet.SchemaElement { +func NewSchemaElementFromTagMap(info *Tag) (*parquet.SchemaElement, error) { schema := parquet.NewSchemaElement() schema.Name = info.InName schema.TypeLength = &info.Length @@ -277,7 +301,7 @@ func NewSchemaElementFromTagMap(info *Tag) *parquet.SchemaElement { schema.Type = &t } else { - panic("type " + info.Type + ": " + err.Error()) + return nil, fmt.Errorf("type " + info.Type + ": " + err.Error()) } if ct, err := parquet.ConvertedTypeFromString(info.ConvertedType); err == nil { @@ -285,22 +309,26 @@ func NewSchemaElementFromTagMap(info *Tag) *parquet.SchemaElement { } var logicalType *parquet.LogicalType + var err error if len(info.LogicalTypeFields) > 0 { - logicalType = NewLogicalTypeFromFieldsMap(info.LogicalTypeFields) + logicalType, err = NewLogicalTypeFromFieldsMap(info.LogicalTypeFields) + if err != nil { + return nil, fmt.Errorf("failed to create logicaltype from field map: %s", err.Error()) + } } else { logicalType = NewLogicalTypeFromConvertedType(schema, info) } schema.LogicalType = logicalType - return schema + return schema, nil } -func NewLogicalTypeFromFieldsMap(mp map[string]string) *parquet.LogicalType { +func NewLogicalTypeFromFieldsMap(mp map[string]string) (*parquet.LogicalType, error) { if val, ok := mp["logicaltype"]; !ok { - return nil - + return nil, errors.New("does not have logicaltype") } else { + var err error logicalType := parquet.NewLogicalType() switch val { case "STRING": @@ -314,15 +342,24 @@ func NewLogicalTypeFromFieldsMap(mp map[string]string) *parquet.LogicalType { case "DECIMAL": logicalType.DECIMAL = parquet.NewDecimalType() - logicalType.DECIMAL.Precision = Str2Int32(mp["logicaltype.precision"]) - logicalType.DECIMAL.Scale = Str2Int32(mp["logicaltype.scale"]) + logicalType.DECIMAL.Precision, err = Str2Int32(mp["logicaltype.precision"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.precision as int32: %s", err.Error()) + } + logicalType.DECIMAL.Scale, err = Str2Int32(mp["logicaltype.scale"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.scale as int32: %s", err.Error()) + } case "DATE": logicalType.DATE = parquet.NewDateType() case "TIME": logicalType.TIME = parquet.NewTimeType() - logicalType.TIME.IsAdjustedToUTC = Str2Bool(mp["logicaltype.isadjustedtoutc"]) + logicalType.TIME.IsAdjustedToUTC, err = Str2Bool(mp["logicaltype.isadjustedtoutc"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.isadjustedtoutc as boolean: %s", err.Error()) + } switch mp["logicaltype.unit"] { case "MILLIS": logicalType.TIME.Unit = parquet.NewTimeUnit() @@ -334,12 +371,15 @@ func NewLogicalTypeFromFieldsMap(mp map[string]string) *parquet.LogicalType { logicalType.TIME.Unit = parquet.NewTimeUnit() logicalType.TIME.Unit.NANOS = parquet.NewNanoSeconds() default: - panic("logicaltype time error") + return nil, fmt.Errorf("logicaltype time error, unknown unit: %s", mp["logicaltype.unit"]) } case "TIMESTAMP": logicalType.TIMESTAMP = parquet.NewTimestampType() - logicalType.TIMESTAMP.IsAdjustedToUTC = Str2Bool(mp["logicaltype.isadjustedtoutc"]) + logicalType.TIMESTAMP.IsAdjustedToUTC, err = Str2Bool(mp["logicaltype.isadjustedtoutc"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.isadjustedtoutc as boolean: %s", err.Error()) + } switch mp["logicaltype.unit"] { case "MILLIS": logicalType.TIMESTAMP.Unit = parquet.NewTimeUnit() @@ -351,13 +391,20 @@ func NewLogicalTypeFromFieldsMap(mp map[string]string) *parquet.LogicalType { logicalType.TIMESTAMP.Unit = parquet.NewTimeUnit() logicalType.TIMESTAMP.Unit.NANOS = parquet.NewNanoSeconds() default: - panic("logicaltype time error") + return nil, fmt.Errorf("logicaltype time error, unknown unit: %s", mp["logicaltype.unit"]) } case "INTEGER": logicalType.INTEGER = parquet.NewIntType() - logicalType.INTEGER.BitWidth = int8(Str2Int32(mp["logicaltype.bitwidth"])) - logicalType.INTEGER.IsSigned = Str2Bool(mp["logicaltype.issigned"]) + bitWidth, err := Str2Int32(mp["logicaltype.bitwidth"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.bitwidth as int32: %s", err.Error()) + } + logicalType.INTEGER.BitWidth = int8(bitWidth) + logicalType.INTEGER.IsSigned, err = Str2Bool(mp["logicaltype.issigned"]) + if err != nil { + return nil, fmt.Errorf("cannot parse logicaltype.issigned as boolean: %s", err.Error()) + } case "JSON": logicalType.JSON = parquet.NewJsonType() @@ -369,10 +416,10 @@ func NewLogicalTypeFromFieldsMap(mp map[string]string) *parquet.LogicalType { logicalType.UUID = parquet.NewUUIDType() default: - panic("unknow logicaltype: " + val) + return nil, fmt.Errorf("unknow logicaltype: " + val) } - return logicalType + return logicalType, nil } } @@ -698,20 +745,20 @@ func FindFuncTable(pT *parquet.Type, cT *parquet.ConvertedType, logT *parquet.Lo panic("No known func table in FindFuncTable") } -func Str2Int32(val string) int32 { +func Str2Int32(val string) (int32, error) { valInt, err := strconv.Atoi(val) if err != nil { - panic(err) + return 0, err } - return int32(valInt) + return int32(valInt), nil } -func Str2Bool(val string) bool { +func Str2Bool(val string) (bool, error) { valBoolean, err := strconv.ParseBool(val) if err != nil { - panic(err) + return false, err } - return valBoolean + return valBoolean, nil } type FuncTable interface { diff --git a/schema/csv.go b/schema/csv.go index 375c11db..e544c17b 100644 --- a/schema/csv.go +++ b/schema/csv.go @@ -1,12 +1,14 @@ package schema import ( + "fmt" + "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/parquet" ) //Create a schema handler from CSV metadata -func NewSchemaHandlerFromMetadata(mds []string) *SchemaHandler { +func NewSchemaHandlerFromMetadata(mds []string) (*SchemaHandler, error) { schemaList := make([]*parquet.SchemaElement, 0) infos := make([]*common.Tag, 0) @@ -25,14 +27,20 @@ func NewSchemaHandlerFromMetadata(mds []string) *SchemaHandler { infos = append(infos, rootInfo) for _, md := range mds { - info := common.StringToTag(md) + info, err := common.StringToTag(md) + if err != nil { + return nil, fmt.Errorf("failed to parse metadata: %s", err.Error()) + } infos = append(infos, info) - schema := common.NewSchemaElementFromTagMap(info) + schema, err := common.NewSchemaElementFromTagMap(info) + if err != nil { + return nil, fmt.Errorf("failed to create schema from tag map: %s", err.Error()) + } //schema.RepetitionType = parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_OPTIONAL) schemaList = append(schemaList, schema) } res := NewSchemaHandlerFromSchemaList(schemaList) res.Infos = infos res.CreateInExMap() - return res + return res, nil } diff --git a/schema/json.go b/schema/json.go index 1fec0d8c..1cda8772 100644 --- a/schema/json.go +++ b/schema/json.go @@ -46,7 +46,10 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) { ln := len(stack) item := stack[ln-1] stack = stack[:ln-1] - info := common.StringToTag(item.Tag) + info, err := common.StringToTag(item.Tag) + if err != nil { + return nil, fmt.Errorf("failed parse tag: %s", err.Error()) + } var newInfo *common.Tag if info.Type == "" { //struct schema := parquet.NewSchemaElement() @@ -136,7 +139,10 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) { stack = append(stack, item.Fields[0]) //put key } else { //normal variable - schema := common.NewSchemaElementFromTagMap(info) + schema, err := common.NewSchemaElementFromTagMap(info) + if err != nil { + return nil, fmt.Errorf("failed to create schema from tag map: %s", err.Error()) + } schemaElements = append(schemaElements, schema) newInfo = common.NewTag() diff --git a/schema/schemahandler.go b/schema/schemahandler.go index 392200ab..75345b1b 100644 --- a/schema/schemahandler.go +++ b/schema/schemahandler.go @@ -282,7 +282,10 @@ func NewSchemaHandlerFromStruct(obj interface{}) (sh *SchemaHandler, err error) } newItem := NewItem() - newItem.Info = common.StringToTag(tagStr) + newItem.Info, err = common.StringToTag(tagStr) + if err != nil { + return nil, fmt.Errorf("failed parse tag: %s", err.Error()) + } newItem.Info.InName = f.Name newItem.GoType = f.Type if f.Type.Kind() == reflect.Ptr { @@ -383,7 +386,10 @@ func NewSchemaHandlerFromStruct(obj interface{}) (sh *SchemaHandler, err error) stack = append(stack, newItem) } else { - schema := common.NewSchemaElementFromTagMap(item.Info) + schema, err := common.NewSchemaElementFromTagMap(item.Info) + if err != nil { + return nil, fmt.Errorf("failed to create schema from tag map: %s", err.Error()) + } schemaElements = append(schemaElements, schema) newInfo = common.NewTag() common.DeepCopy(item.Info, newInfo) diff --git a/writer/csv.go b/writer/csv.go index 9395d0b8..def1c4b4 100644 --- a/writer/csv.go +++ b/writer/csv.go @@ -1,6 +1,7 @@ package writer import ( + "fmt" "io" "github.com/xitongsys/parquet-go-source/writerfile" @@ -23,8 +24,12 @@ func NewCSVWriterFromWriter(md []string, w io.Writer, np int64) (*CSVWriter, err //Create CSV writer func NewCSVWriter(md []string, pfile source.ParquetFile, np int64) (*CSVWriter, error) { + var err error res := new(CSVWriter) - res.SchemaHandler = schema.NewSchemaHandlerFromMetadata(md) + res.SchemaHandler, err = schema.NewSchemaHandlerFromMetadata(md) + if err != nil { + return nil, fmt.Errorf("failed to create schema from metadata: %s", err.Error()) + } res.PFile = pfile res.PageSize = 8 * 1024 //8K res.RowGroupSize = 128 * 1024 * 1024 //128M @@ -36,7 +41,7 @@ func NewCSVWriter(md []string, pfile source.ParquetFile, np int64) (*CSVWriter, res.Footer.Version = 1 res.Footer.Schema = append(res.Footer.Schema, res.SchemaHandler.SchemaElements...) res.Offset = 4 - _, err := res.PFile.Write([]byte("PAR1")) + _, err = res.PFile.Write([]byte("PAR1")) res.MarshalFunc = marshal.MarshalCSV return res, err }