Skip to content

Commit

Permalink
Frame definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Oct 15, 2023
1 parent 71af405 commit 3315dee
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 0 deletions.
28 changes: 28 additions & 0 deletions azkustodata/query/error_frames.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package query

type OneApiError struct {
Error ErrorMessage `json:"error"`
}

type ErrorMessage struct {
Code string `json:"code"`
Message string `json:"message"`
Type string `json:"@type"`
Context ErrorContext `json:"@context"`
IsPermanent bool `json:"@permanent"`
}

type ErrorContext struct {
Timestamp string `json:"timestamp"`
ServiceAlias string `json:"serviceAlias"`
MachineName string `json:"machineName"`
ProcessName string `json:"processName"`
ProcessId int `json:"processId"`
ThreadId int `json:"threadId"`
ClientRequestId string `json:"clientRequestId"`
ActivityId string `json:"activityId"`
SubActivityId string `json:"subActivityId"`
ActivityType string `json:"activityType"`
ParentActivityId string `json:"parentActivityId"`
ActivityStack string `json:"activityStack"`
}
85 changes: 85 additions & 0 deletions azkustodata/query/frames_defs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package query

import (
"fmt"
)

type Frame interface {
fmt.Stringer
GetFrameType() string
}

type FrameColumn struct {
ColumnName string `json:"ColumnName"`
ColumnType string `json:"ColumnType"`
}

const DataSetHeaderFrameType = "DataSetHeader"

type DataSetHeader struct {
IsProgressive bool `json:"IsProgressive"`
Version string `json:"Version"`
IsFragmented bool `json:"IsFragmented"`
ErrorReportingPlacement string `json:"ErrorReportingPlacement"`
}

const DataTableFrameType = "DataTable"

type DataTable struct {
TableId int `json:"TableId"`
TableKind string `json:"TableKind"`
TableName string `json:"TableName"`
Columns []FrameColumn `json:"Columns"`
Rows [][]interface{} `json:"Rows"`
}

const TableHeaderFrameType = "TableHeader"

type TableHeader struct {
TableId int `json:"TableId"`
TableKind string `json:"TableKind"`
TableName string `json:"TableName"`
Columns []FrameColumn `json:"Columns"`
}

const TableFragmentFrameType = "TableFragment"

type TableFragment struct {
TableFragmentType string `json:"TableFragmentType"`
TableId int `json:"TableId"`
Rows [][]interface{} `json:"Rows"`
}

const TableCompletionFrameType = "TableCompletion"

type TableCompletion struct {
TableId int `json:"TableId"`
RowCount int `json:"RowCount"`
OneApiErrors []OneApiError `json:"OneApiErrors"`
}

const DataSetCompletionFrameType = "DataSetCompletion"

type DataSetCompletion struct {
HasErrors bool `json:"HasErrors"`
Cancelled bool `json:"Cancelled"`
OneApiErrors []OneApiError `json:"OneApiErrors"`
}

type EveryFrame struct {
FrameType string `json:"FrameType"`
IsProgressive bool `json:"IsProgressive"`
Version string `json:"Version"`
IsFragmented bool `json:"IsFragmented"`
ErrorReportingPlacement string `json:"ErrorReportingPlacement"`
TableId int `json:"TableId"`
TableKind string `json:"TableKind"`
TableName string `json:"TableName"`
Columns []FrameColumn `json:"Columns"`
Rows [][]interface{} `json:"Rows"`
TableFragmentType string `json:"TableFragmentType"`
RowCount int `json:"RowCount"`
OneApiErrors []OneApiError `json:"OneApiErrors"`
HasErrors bool `json:"HasErrors"`
Cancelled bool `json:"Cancelled"`
}
32 changes: 32 additions & 0 deletions azkustodata/query/frames_defs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package query

import (
"fmt"
"strings"
"testing"
)

func TestReadFrames(t *testing.T) {
src := `[{"FrameType":"DataSetHeader","IsProgressive":false,"Version":"v2.0","IsFragmented":true,"ErrorReportingPlacement":"EndOfTable"}
,{"FrameType":"DataTable","TableId":0,"TableKind":"QueryProperties","TableName":"@ExtendedProperties","Columns":[{"ColumnName":"TableId","ColumnType":"int"},{"ColumnName":"Key","ColumnType":"string"},{"ColumnName":"Value","ColumnType":"dynamic"}],"Rows":[[1,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"]]}
,{"FrameType":"TableHeader","TableId":1,"TableKind":"PrimaryResult","TableName":"BigChunkus","Columns":[{"ColumnName":"AvgTicketPrice","ColumnType":"real"},{"ColumnName":"Cancelled","ColumnType":"bool"},{"ColumnName":"Carrier","ColumnType":"string"},{"ColumnName":"Dest","ColumnType":"string"},{"ColumnName":"DestAirportID","ColumnType":"string"},{"ColumnName":"DestCityName","ColumnType":"string"},{"ColumnName":"DestCountry","ColumnType":"string"},{"ColumnName":"DestLocation","ColumnType":"dynamic"},{"ColumnName":"DestRegion","ColumnType":"string"},{"ColumnName":"DestWeather","ColumnType":"string"},{"ColumnName":"DistanceKilometers","ColumnType":"real"},{"ColumnName":"DistanceMiles","ColumnType":"real"},{"ColumnName":"FlightDelay","ColumnType":"bool"},{"ColumnName":"FlightDelayMin","ColumnType":"long"},{"ColumnName":"FlightDelayType","ColumnType":"string"},{"ColumnName":"FlightNum","ColumnType":"string"},{"ColumnName":"FlightTimeHour","ColumnType":"real"},{"ColumnName":"FlightTimeMin","ColumnType":"real"},{"ColumnName":"Origin","ColumnType":"string"},{"ColumnName":"OriginAirportID","ColumnType":"string"},{"ColumnName":"OriginCityName","ColumnType":"string"},{"ColumnName":"OriginCountry","ColumnType":"string"},{"ColumnName":"OriginLocation","ColumnType":"dynamic"},{"ColumnName":"OriginRegion","ColumnType":"string"},{"ColumnName":"OriginWeather","ColumnType":"string"},{"ColumnName":"dayOfWeek","ColumnType":"int"},{"ColumnName":"timestamp","ColumnType":"datetime"}]}
,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":1,"Rows":[[841.26564196770755,false,"Kibana Airlines","Sydney Kingsford Smith International Airport","SYD","Sydney","AU",{"lat":"-33.94609833","lon":"151.177002"},"SE-BD","Rain",16492.32665375846,10247.856675613455,false,0,"No Delay","9HY9SWR",17.179506930998397,1030.7704158599038,"Frankfurt am Main Airport","FRA","Frankfurt am Main","DE",{"lat":"50.033333","lon":"8.570556"},"DE-HE","Sunny",0,"2018-01-01T00:00:00Z"],[882.98266155955184,false,"Logstash Airways","Venice Marco Polo Airport","VE05","Venice","IT",{"lat":"45.505299","lon":"12.3519"},"IT-34","Sunny",8823.40014044213,5482.6066648535862,false,0,"No Delay","X98CCZO",7.73982468459836,464.38948107590159,"Cape Town International Airport","CPT","Cape Town","ZA",{"lat":"-33.96480179","lon":"18.60169983"},"SE-BD","Clear",0,"2018-01-01T18:27:00Z"],[190.63690385083561,false,"Logstash Airways","Venice Marco Polo Airport","VE05","Venice","IT",{"lat":"45.505299","lon":"12.3519"},"IT-34","Cloudy",0.0,0.0,false,0,"No Delay","UFK2WIZ",0.0,0.0,"Venice Marco Polo Airport","VE05","Venice","IT",{"lat":"45.505299","lon":"12.3519"},"IT-34","Rain",0,"2018-01-01T17:11:14Z"],[181.69421554118,true,"Kibana Airlines","Treviso-Sant'Angelo Airport","TV01","Treviso","IT",{"lat":"45.648399","lon":"12.1944"},"IT-34","Clear",555.73776687252655,345.31943877289535,true,180,"Weather Delay","EAYQW69",3.7124843165032391,222.74905899019436,"Naples International Airport","NA01","Naples","IT",{"lat":"40.886002","lon":"14.2908"},"IT-72","Thunder & Lightning",0,"2018-01-01T10:33:28Z"],[730.041778346198,false,"Kibana Airlines","Xi'an Xianyang International Airport","XIY","Xi'an","CN",{"lat":"34.447102","lon":"108.751999"},"SE-BD","Clear",13358.24419986236,8300.4281246659248,false,0,"No Delay","58U013N",13.096317843002314,785.77907058013886,"Licenciado Benito Juarez International Airport","AICM","Mexico City","MX",{"lat":"19.4363","lon":"-99.072098"},"MX-DIF","Damaging Wind",0,"2018-01-01T05:13:00Z"]]}
,{"FrameType":"TableCompletion","TableId":1,"RowCount":5,"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 5 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-10-15T08:09:53.6389663Z","serviceAlias":"ASAF","machineName":"KEngine000000","processName":"Kusto.WinSvc.Svc","processId":2376,"threadId":4208,"clientRequestId":"blab6","activityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","subActivityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","activityType":"GW.Http.CallContext","parentActivityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","activityStack":"(Activity stack: CRID=blab6 ARID=44da9487-b217-49cf-b2dd-d2a03410a83f > GW.Http.CallContext/44da9487-b217-49cf-b2dd-d2a03410a83f)"},"@permanent":false}}]}
,{"FrameType":"DataSetCompletion","HasErrors":true,"Cancelled":false,"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 5 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-10-15T08:09:53.6389663Z","serviceAlias":"ASAF","machineName":"KEngine000000","processName":"Kusto.WinSvc.Svc","processId":2376,"threadId":4208,"clientRequestId":"blab6","activityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","subActivityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","activityType":"GW.Http.CallContext","parentActivityId":"44da9487-b217-49cf-b2dd-d2a03410a83f","activityStack":"(Activity stack: CRID=blab6 ARID=44da9487-b217-49cf-b2dd-d2a03410a83f > GW.Http.CallContext/44da9487-b217-49cf-b2dd-d2a03410a83f)"},"@permanent":false}}]}
]`

ch := make(chan Frame)

go func() {
err := ReadFrames(strings.NewReader(src), ch)
if err != nil {
panic(err)
}
close(ch)
}()

for frame := range ch {
fmt.Printf("%v\n", frame)
}

}
65 changes: 65 additions & 0 deletions azkustodata/query/frames_interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package query

import "fmt"

func (f *DataSetHeader) String() string {
return fmt.Sprintf("DataSetHeader(IsProgressive=%t, Version=%s, IsFragmented=%t, ErrorReportingPlacement=%s)", f.IsProgressive, f.Version, f.IsFragmented, f.ErrorReportingPlacement)
}
func (f *DataTable) String() string {
return fmt.Sprintf("DataTable(TableId=%d, TableKind=%s, TableName=%s, Columns=%v, Rows=%v)", f.TableId, f.TableKind, f.TableName, f.Columns, f.Rows)
}
func (c *FrameColumn) String() string {
return fmt.Sprintf("FrameColumn(ColumnName=%s, ColumnType=%s)", c.ColumnName, c.ColumnType)
}

func (f *TableHeader) String() string {
return fmt.Sprintf("TableHeader(TableId=%d, TableKind=%s, TableName=%s, Columns=%v)", f.TableId, f.TableKind, f.TableName, f.Columns)
}

func (f *TableFragment) String() string {
return fmt.Sprintf("TableFragment(TableFragmentType=%s, TableId=%d, Rows=%v)", f.TableFragmentType, f.TableId, f.Rows)
}

func (f *TableCompletion) String() string {
return fmt.Sprintf("TableCompletion(TableId=%d, RowCount=%d, OneApiErrors=%v)", f.TableId, f.RowCount, f.OneApiErrors)
}

func (e *OneApiError) String() string {
return fmt.Sprintf("OneApiError(Error=%v)", e.Error)
}

func (e *ErrorMessage) String() string {
return fmt.Sprintf("ErrorMessage(Code=%s, Message=%s, Type=%s, ErrorContext=%v, IsPermanent=%t)", e.Code, e.Message, e.Type, e.Context, e.IsPermanent)
}

func (e *ErrorContext) String() string {
return fmt.Sprintf("ErrorContext(Timestamp=%s, ServiceAlias=%s, MachineName=%s, ProcessName=%s, ProcessId=%d, ThreadId=%d, ClientRequestId=%s, ActivityId=%s, SubActivityId=%s, ActivityType=%s, ParentActivityId=%s, ActivityStack=%s)", e.Timestamp, e.ServiceAlias, e.MachineName, e.ProcessName, e.ProcessId, e.ThreadId, e.ClientRequestId, e.ActivityId, e.SubActivityId, e.ActivityType, e.ParentActivityId, e.ActivityStack)
}

func (f *DataSetCompletion) String() string {
return fmt.Sprintf("DataSetCompletion(HasErrors=%t, Cancelled=%t, OneApiErrors=%v)", f.HasErrors, f.Cancelled, f.OneApiErrors)
}

func (f *DataSetHeader) GetFrameType() string {
return DataSetHeaderFrameType
}

func (f *DataTable) GetFrameType() string {
return DataTableFrameType
}

func (f *TableHeader) GetFrameType() string {
return TableHeaderFrameType
}

func (f *TableFragment) GetFrameType() string {
return TableFragmentFrameType
}

func (f *TableCompletion) GetFrameType() string {
return TableCompletionFrameType
}

func (f *DataSetCompletion) GetFrameType() string {
return DataSetCompletionFrameType
}

0 comments on commit 3315dee

Please sign in to comment.