Skip to content

Commit

Permalink
Working json (kinda)
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Oct 15, 2023
1 parent 3315dee commit 2f74151
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 13 deletions.
14 changes: 14 additions & 0 deletions azkustodata/query/error_frames.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package query

import "fmt"

type OneApiError struct {
Error ErrorMessage `json:"error"`
}
Expand All @@ -26,3 +28,15 @@ type ErrorContext struct {
ParentActivityId string `json:"parentActivityId"`
ActivityStack string `json:"activityStack"`
}

func (e *OneApiError) String() string {
return fmt.Sprintf("OneApiError(Error=%v)", e.Error)
}

func (e *ErrorMessage) String() string {
return fmt.Sprintf("ErrorMessage(Code=%s, Message=%s, Type=%s, ErrorContext=%v, IsPermanent=%t)", e.Code, e.Message, e.Type, e.Context, e.IsPermanent)
}

func (e *ErrorContext) String() string {
return fmt.Sprintf("ErrorContext(Timestamp=%s, ServiceAlias=%s, MachineName=%s, ProcessName=%s, ProcessId=%d, ThreadId=%d, ClientRequestId=%s, ActivityId=%s, SubActivityId=%s, ActivityType=%s, ParentActivityId=%s, ActivityStack=%s)", e.Timestamp, e.ServiceAlias, e.MachineName, e.ProcessName, e.ProcessId, e.ThreadId, e.ClientRequestId, e.ActivityId, e.SubActivityId, e.ActivityType, e.ParentActivityId, e.ActivityStack)
}
16 changes: 3 additions & 13 deletions azkustodata/query/frames_interfaces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package query

import "fmt"
import (
"fmt"
)

func (f *DataSetHeader) String() string {
return fmt.Sprintf("DataSetHeader(IsProgressive=%t, Version=%s, IsFragmented=%t, ErrorReportingPlacement=%s)", f.IsProgressive, f.Version, f.IsFragmented, f.ErrorReportingPlacement)
Expand All @@ -24,18 +26,6 @@ func (f *TableCompletion) String() string {
return fmt.Sprintf("TableCompletion(TableId=%d, RowCount=%d, OneApiErrors=%v)", f.TableId, f.RowCount, f.OneApiErrors)
}

func (e *OneApiError) String() string {
return fmt.Sprintf("OneApiError(Error=%v)", e.Error)
}

func (e *ErrorMessage) String() string {
return fmt.Sprintf("ErrorMessage(Code=%s, Message=%s, Type=%s, ErrorContext=%v, IsPermanent=%t)", e.Code, e.Message, e.Type, e.Context, e.IsPermanent)
}

func (e *ErrorContext) String() string {
return fmt.Sprintf("ErrorContext(Timestamp=%s, ServiceAlias=%s, MachineName=%s, ProcessName=%s, ProcessId=%d, ThreadId=%d, ClientRequestId=%s, ActivityId=%s, SubActivityId=%s, ActivityType=%s, ParentActivityId=%s, ActivityStack=%s)", e.Timestamp, e.ServiceAlias, e.MachineName, e.ProcessName, e.ProcessId, e.ThreadId, e.ClientRequestId, e.ActivityId, e.SubActivityId, e.ActivityType, e.ParentActivityId, e.ActivityStack)
}

func (f *DataSetCompletion) String() string {
return fmt.Sprintf("DataSetCompletion(HasErrors=%t, Cancelled=%t, OneApiErrors=%v)", f.HasErrors, f.Cancelled, f.OneApiErrors)
}
Expand Down
139 changes: 139 additions & 0 deletions azkustodata/query/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package query

import (
"encoding/json"
"fmt"
"io"
)

func (f *EveryFrame) Decode() (Frame, error) {
switch f.FrameType {
case DataSetHeaderFrameType:
return &DataSetHeader{
IsProgressive: f.IsProgressive,
Version: f.Version,
IsFragmented: f.IsFragmented,
ErrorReportingPlacement: f.ErrorReportingPlacement,
}, nil
case DataTableFrameType:
return &DataTable{
TableId: f.TableId,
TableKind: f.TableKind,
TableName: f.TableName,
Columns: f.Columns,
Rows: f.Rows,
}, nil
case TableHeaderFrameType:
return &TableHeader{
TableId: f.TableId,
TableKind: f.TableKind,
TableName: f.TableName,
Columns: f.Columns,
}, nil
case TableFragmentFrameType:
return &TableFragment{
TableFragmentType: f.TableFragmentType,
TableId: f.TableId,
Rows: f.Rows,
}, nil
case TableCompletionFrameType:
return &TableCompletion{
TableId: f.TableId,
RowCount: f.RowCount,
OneApiErrors: f.OneApiErrors,
}, nil
case DataSetCompletionFrameType:
return &DataSetCompletion{
HasErrors: f.HasErrors,
Cancelled: f.Cancelled,
OneApiErrors: f.OneApiErrors,
}, nil
default:
return nil, fmt.Errorf("unknown frame type: %s", f.FrameType)
}
}

func ReadFrames(r io.Reader, ch chan<- Frame) error {
dec := json.NewDecoder(&skipReader{r: r})

for {
frame := EveryFrame{}
err := dec.Decode(&frame)
if err != nil {
if err == io.EOF {
return nil
}
return err
}

f, err := frame.Decode()

if err != nil {
return err
}

ch <- f
}
}

type skipReader struct {
r io.Reader
afterStart bool
}

func (s *skipReader) Read(p []byte) (n int, err error) {
// skip '[' at the beginning
if !s.afterStart {
s.afterStart = true

buf := make([]byte, 1)
amt, err := s.r.Read(buf)
if err != nil {
return 0, err
}
if amt != 1 || buf[0] != '[' {
return 0, fmt.Errorf("expected '[' at the beginning of the stream, got '%c'", buf[0])
}
}

cp := make([]byte, len(p)+1)
amt, err := s.r.Read(cp[:len(p)])
pIndex := 0
skipNext := false

if err != nil {
return 0, err
}

if cp[amt-1] == '\n' {
_, err = s.r.Read(cp[amt:])
if err != nil {
return 0, err
}
if cp[amt] != ']' && cp[amt] != ',' {
return 0, fmt.Errorf("expected ']' or ',' at the end of the stream, got '%c'", cp[amt])
}
amt++
}

for i := 0; i < amt; i++ {
if skipNext {
skipNext = false
next := cp[i]
if next == ']' {
return i, io.EOF
} else if next != ',' {
return 0, fmt.Errorf("expected ',' between frames, got '%c'", next)
}
continue
}
if cp[i] == '\n' {
skipNext = true
}
p[pIndex] = cp[i]
pIndex++

}

return pIndex, err
}

0 comments on commit 2f74151

Please sign in to comment.