Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom logical types #448

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 144 additions & 8 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,69 @@ const (
Duration LogicalType = "duration"
)

// customLogicalSchema is a custom logical type schema that is not part of the Avro specification.
// It wraps a primitive type schema and thus supports no additional properties.
type customLogicalSchema struct {
PrimitiveLogicalSchema
}

type customSchemaKey = struct {
typ Type
ltyp LogicalType
}

var customLogicalSchemas sync.Map // map[customSchemaKey]*CustomLogicalSchema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being global is an issue. I should be possible to parse with and without the custom logical types.


func addCustomLogicalSchema(typ Type, ltyp LogicalType) {
key := customSchemaKey{typ, ltyp}
customLogicalSchemas.Store(key, &customLogicalSchema{
PrimitiveLogicalSchema: PrimitiveLogicalSchema{typ: ltyp},
})
}

func getCustomLogicalSchema(typ Type, ltyp LogicalType) LogicalSchema {
key := customSchemaKey{typ, ltyp}
if ls, ok := customLogicalSchemas.Load(key); ok {
return ls.(*customLogicalSchema)
}
return nil
}

// RegisterCustomLogicalType registers a custom logical type that is not part of the
// Avro specification for the given types.
// It returns an error if the logical type conflicts with a predefined logical type.
func RegisterCustomLogicalType(ltyp LogicalType, types ...Type) error {
// Ensure that the custom logical type does not overwrite a primitive type
switch ltyp {
case Decimal,
UUID,
Date,
TimeMillis,
TimeMicros,
TimestampMillis,
TimestampMicros,
LocalTimestampMillis,
LocalTimestampMicros,
Duration:
return errors.New("logical type conflicts with a predefined logical type")
}

// Check that all of the given type supports logical types
for _, typ := range types {
switch typ {
case Ref, Union, Null:
return fmt.Errorf("type %q does not support logical types", typ)
}
}

// Register the custom logical type
for _, typ := range types {
addCustomLogicalSchema(typ, ltyp)
}

return nil
}

// Action is a field action used during decoding process.
type Action string

Expand Down Expand Up @@ -396,12 +459,13 @@ func (p properties) marshalPropertiesToJSON(buf *bytes.Buffer) error {
}

type schemaConfig struct {
aliases []string
doc string
def any
order Order
props map[string]any
wfp *[32]byte
aliases []string
doc string
def any
order Order
props map[string]any
wfp *[32]byte
customLogicalType LogicalType
}

// SchemaOption is a function that sets a schema option.
Expand All @@ -414,6 +478,16 @@ func WithAliases(aliases []string) SchemaOption {
}
}

// WithCustomLogicalType sets a custom logical type on a schema.
// Make sure to register the custom logical type before using it,
// otherwise it will be ignored.
// See RegisterCustomLogicalType.
func WithCustomLogicalType(ltyp LogicalType) SchemaOption {
return func(opts *schemaConfig) {
opts.customLogicalType = ltyp
}
}

// WithDoc sets the doc on a schema.
func WithDoc(doc string) SchemaOption {
return func(opts *schemaConfig) {
Expand Down Expand Up @@ -477,6 +551,11 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv
opt(&cfg)
}

// If the logical schema is nil, use the custom logical schema.
if l == nil {
l = getCustomLogicalSchema(t, cfg.customLogicalType)
}

return &PrimitiveSchema{
properties: newProperties(cfg.props, schemaReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
Expand Down Expand Up @@ -552,6 +631,7 @@ type RecordSchema struct {
isError bool
fields []*Field
doc string
logical LogicalSchema
}

// NewRecordSchema creates a new record schema instance.
Expand All @@ -572,6 +652,7 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
fields: fields,
doc: cfg.doc,
logical: getCustomLogicalSchema(Record, cfg.customLogicalType),
}, nil
}

Expand All @@ -592,6 +673,11 @@ func (s *RecordSchema) Type() Type {
return Record
}

// Logical returns the logical schema or nil.
func (s *RecordSchema) Logical() LogicalSchema {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that every schema now implements LogicalTypeSchema. From a spec PoV this is really confusing.

return s.logical
}

// Doc returns the documentation of a record.
func (s *RecordSchema) Doc() string {
return s.doc
Expand Down Expand Up @@ -622,6 +708,12 @@ func (s *RecordSchema) String() string {
fields = fields[:len(fields)-1]
}

if s.logical != nil {
return fmt.Sprintf("{\"name\":\"%s\", \"type\":\"%s\", \"fields\":[%s]\", %s}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt.Sprintf is slow and includes many allocations. Given this function is called often, this is a performance bottleneck.

s.FullName(), typ, fields, s.logical.String(),
)
}

return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
}

Expand Down Expand Up @@ -659,6 +751,9 @@ func (s *RecordSchema) MarshalJSON() ([]byte, error) {
if err := s.marshalPropertiesToJSON(buf); err != nil {
return nil, err
}
if s.logical != nil {
buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
}
buf.WriteString("}")
return buf.Bytes(), nil
}
Expand Down Expand Up @@ -876,6 +971,7 @@ type EnumSchema struct {
symbols []string
def string
doc string
logical LogicalSchema

// encodedSymbols is the symbols of the encoded value.
// It's only used in the context of write-read schema resolution.
Expand Down Expand Up @@ -918,6 +1014,7 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio
symbols: symbols,
def: def,
doc: cfg.doc,
logical: getCustomLogicalSchema(Enum, cfg.customLogicalType),
}, nil
}

Expand Down Expand Up @@ -979,6 +1076,11 @@ func (s *EnumSchema) HasDefault() bool {
return s.def != ""
}

// Logical returns the logical schema or nil.
func (s *EnumSchema) Logical() LogicalSchema {
return s.logical
}

// String returns the canonical form of the schema.
func (s *EnumSchema) String() string {
symbols := ""
Expand All @@ -989,6 +1091,11 @@ func (s *EnumSchema) String() string {
symbols = symbols[:len(symbols)-1]
}

if s.logical != nil {
return fmt.Sprintf("{\"name\":\"%s\", \"type\":\"enum\", \"symbols\":[%s]\", %s}",
s.FullName(), symbols, s.logical.String())
}

return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
}

Expand Down Expand Up @@ -1025,6 +1132,9 @@ func (s *EnumSchema) MarshalJSON() ([]byte, error) {
if err := s.marshalPropertiesToJSON(buf); err != nil {
return nil, err
}
if s.logical != nil {
buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
}
buf.WriteString("}")
return buf.Bytes(), nil
}
Expand Down Expand Up @@ -1055,7 +1165,8 @@ type ArraySchema struct {
fingerprinter
cacheFingerprinter

items Schema
items Schema
logical LogicalSchema
}

// NewArraySchema creates an array schema instance.
Expand All @@ -1069,6 +1180,7 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema {
properties: newProperties(cfg.props, schemaReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
items: items,
logical: getCustomLogicalSchema(Array, cfg.customLogicalType),
}
}

Expand All @@ -1082,8 +1194,16 @@ func (s *ArraySchema) Items() Schema {
return s.items
}

// Logical returns the logical schema or nil.
func (s *ArraySchema) Logical() LogicalSchema {
return s.logical
}

// String returns the canonical form of the schema.
func (s *ArraySchema) String() string {
if s.logical != nil {
return `{"type":"array","items":` + s.items.String() + `,"` + s.logical.String() + `"}`
}
return `{"type":"array","items":` + s.items.String() + `}`
}

Expand All @@ -1100,6 +1220,9 @@ func (s *ArraySchema) MarshalJSON() ([]byte, error) {
if err = s.marshalPropertiesToJSON(buf); err != nil {
return nil, err
}
if s.logical != nil {
buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
}
buf.WriteString("}")
return buf.Bytes(), nil
}
Expand All @@ -1125,7 +1248,8 @@ type MapSchema struct {
fingerprinter
cacheFingerprinter

values Schema
values Schema
logical LogicalSchema
}

// NewMapSchema creates a map schema instance.
Expand All @@ -1139,6 +1263,7 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema {
properties: newProperties(cfg.props, schemaReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
values: values,
logical: getCustomLogicalSchema(Map, cfg.customLogicalType),
}
}

Expand All @@ -1152,8 +1277,16 @@ func (s *MapSchema) Values() Schema {
return s.values
}

// Logical returns the logical schema or nil.
func (s *MapSchema) Logical() LogicalSchema {
return s.logical
}

// String returns the canonical form of the schema.
func (s *MapSchema) String() string {
if s.logical != nil {
return `{"type":"map","values":` + s.values.String() + `,"` + s.logical.String() + `"}`
}
return `{"type":"map","values":` + s.values.String() + `}`
}

Expand All @@ -1170,6 +1303,9 @@ func (s *MapSchema) MarshalJSON() ([]byte, error) {
if err := s.marshalPropertiesToJSON(buf); err != nil {
return nil, err
}
if s.logical != nil {
buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
}
buf.WriteString("}")
return buf.Bytes(), nil
}
Expand Down
Loading
Loading