Skip to content

Commit

Permalink
Fixed data races. Removed fixed table (for now)
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Dec 3, 2023
1 parent b9efae0 commit 03e1639
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 87 deletions.
107 changes: 72 additions & 35 deletions azkustodata/query/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"io"
"sync"
)

// DefaultFrameCapacity is the default capacity of the channel that receives frames from the Kusto service. Lower capacity means less memory usage, but might cause the channel to block if the frames are not consumed fast enough.
Expand All @@ -28,17 +29,14 @@ type DataSet struct {
// ctx is the context of the data set, as received from the request.
ctx context.Context
// DataSetHeader is the header of the data set. It's the first frame received.
DataSetHeader *DataSetHeader
header *DataSetHeader
// Completion is the completion status of the data set. It's the last frame received.
Completion *DataSetCompletion

// queryProperties contains the information from the "QueryProperties" table.
queryProperties []QueryProperties
// queryCompletionInformation contains the information from the "QueryCompletionInformation" table.
queryCompletionInformation []QueryCompletionInformation

// SecondaryResults is all the non-primary tables, which are always saved.
SecondaryResults []Table
completion *DataSetCompletion
/*
// queryProperties contains the information from the "QueryProperties" table.
queryProperties []QueryProperties
// queryCompletionInformation contains the information from the "QueryCompletionInformation" table.
queryCompletionInformation []QueryCompletionInformation*/

// frames is a channel that receives all the frames from the data set as they are parsed.
frames chan Frame
Expand All @@ -50,6 +48,44 @@ type DataSet struct {

// currentStreamingTable is a reference to the current streamed table, which is still receiving rows.
currentStreamingTable *streamingTable

lock sync.RWMutex
}

func (d *DataSet) Header() *DataSetHeader {
d.lock.RLock()
defer d.lock.RUnlock()
return d.header
}

func (d *DataSet) setHeader(dataSetHeader *DataSetHeader) {
d.lock.Lock()
defer d.lock.Unlock()
d.header = dataSetHeader
}

func (d *DataSet) Completion() *DataSetCompletion {
d.lock.RLock()
defer d.lock.RUnlock()
return d.completion
}

func (d *DataSet) setCompletion(completion *DataSetCompletion) {
d.lock.Lock()
defer d.lock.Unlock()
d.completion = completion
}

func (d *DataSet) getCurrentStreamingTable() *streamingTable {
d.lock.RLock()
defer d.lock.RUnlock()
return d.currentStreamingTable
}

func (d *DataSet) setCurrentStreamingTable(currentStreamingTable *streamingTable) {
d.lock.Lock()
defer d.lock.Unlock()
d.currentStreamingTable = currentStreamingTable
}

func (d *DataSet) Tables() chan TableResult {
Expand Down Expand Up @@ -82,8 +118,9 @@ func (d *DataSet) readFrames() {
func (d *DataSet) decodeTables() {
defer func() {
close(d.tables)
if d.currentStreamingTable != nil {
d.currentStreamingTable.close([]OneApiError{})
table := d.getCurrentStreamingTable()
if table != nil {
table.close([]OneApiError{})
}
}()
op := d.op()
Expand All @@ -108,7 +145,7 @@ func (d *DataSet) decodeTables() {
break
}

if d.Completion != nil {
if d.Completion() != nil {
d.tables <- TableResult{Table: nil, Err: errors.ES(op, errors.KInternal, "received a frame after DataSetCompletion")}
break
}
Expand All @@ -118,16 +155,13 @@ func (d *DataSet) decodeTables() {
break
}
} else if completion, ok := f.(*DataSetCompletion); ok {
d.Completion = completion
d.setCompletion(completion)
} else if dt, ok := f.(*DataTable); ok {
t, err := NewFullTable(d, dt)
d.tables <- TableResult{Table: t, Err: err}
if err != nil {
break
}
if !t.IsPrimaryResult() {
d.SecondaryResults = append(d.SecondaryResults, t)
}
} else if d.parseStreamingTable(f, op) {
continue
} else {
Expand All @@ -139,8 +173,11 @@ func (d *DataSet) decodeTables() {
}

func (d *DataSet) parseStreamingTable(f Frame, op errors.Op) bool {

table := d.getCurrentStreamingTable()

if th, ok := f.(*TableHeader); ok {
if d.currentStreamingTable != nil {
if table != nil {
err := errors.ES(op, errors.KInternal, "received a TableHeader frame while a streaming table was still open")
d.tables <- TableResult{Table: nil, Err: err}
return false
Expand All @@ -150,63 +187,63 @@ func (d *DataSet) parseStreamingTable(f Frame, op errors.Op) bool {
d.tables <- TableResult{Table: nil, Err: err}
return false
}
d.currentStreamingTable = t.(*streamingTable)
d.setCurrentStreamingTable(t.(*streamingTable))
d.tables <- TableResult{Table: t, Err: nil}
} else if tf, ok := f.(*TableFragment); ok {
if d.currentStreamingTable == nil {
if table == nil {
err := errors.ES(op, errors.KInternal, "received a TableFragment frame while no streaming table was open")
d.tables <- TableResult{Table: nil, Err: err}
return false
}
if d.currentStreamingTable.Id() != tf.TableId {
err := errors.ES(op, errors.KInternal, "received a TableFragment frame for table %d while table %d was open", tf.TableId, d.currentStreamingTable.Id())
if table.Id() != tf.TableId {
err := errors.ES(op, errors.KInternal, "received a TableFragment frame for table %d while table %d was open", tf.TableId, table.Id())
d.tables <- TableResult{Table: nil, Err: err}
}

d.currentStreamingTable.rawRows <- tf.Rows
table.rawRows <- tf.Rows
} else if tc, ok := f.(*TableCompletion); ok {
if d.currentStreamingTable == nil {
if table == nil {
err := errors.ES(op, errors.KInternal, "received a TableCompletion frame while no streaming table was open")
d.tables <- TableResult{Table: nil, Err: err}
return false
}
if d.currentStreamingTable.Id() != tc.TableId {
err := errors.ES(op, errors.KInternal, "received a TableCompletion frame for table %d while table %d was open", tc.TableId, d.currentStreamingTable.Id())
if table.Id() != tc.TableId {
err := errors.ES(op, errors.KInternal, "received a TableCompletion frame for table %d while table %d was open", tc.TableId, table.Id())
d.tables <- TableResult{Table: nil, Err: err}
}

d.currentStreamingTable.close(tc.OneApiErrors)
table.close(tc.OneApiErrors)

if d.currentStreamingTable.rowCount != tc.RowCount {
err := errors.ES(op, errors.KInternal, "received a TableCompletion frame for table %d with row count %d while %d rows were received", tc.TableId, tc.RowCount, d.currentStreamingTable.rowCount)
if table.rowCount != tc.RowCount {
err := errors.ES(op, errors.KInternal, "received a TableCompletion frame for table %d with row count %d while %d rows were received", tc.TableId, tc.RowCount, table.rowCount)
d.tables <- TableResult{Table: nil, Err: err}
}

d.currentStreamingTable = nil
d.setCurrentStreamingTable(nil)
}

return true
}

func (d *DataSet) parseDatasetHeader(header *DataSetHeader, op errors.Op) bool {
d.DataSetHeader = header
if d.DataSetHeader.Version != version {
if header.Version != version {
d.tables <- TableResult{Table: nil, Err: errors.ES(op, errors.KInternal, "received a DataSetHeader frame that is not version 2")}
return false
}
if !d.DataSetHeader.IsFragmented {
if !header.IsFragmented {
d.tables <- TableResult{Table: nil, Err: errors.ES(op, errors.KInternal, "received a DataSetHeader frame that is not fragmented")}
return false
}
if d.DataSetHeader.IsProgressive {
if header.IsProgressive {
d.tables <- TableResult{Table: nil, Err: errors.ES(op, errors.KInternal, "received a DataSetHeader frame that is progressive")}
return false
}
const EndOfTableErrorPlacement = errorReportingPlacement
if d.DataSetHeader.ErrorReportingPlacement != EndOfTableErrorPlacement {
if header.ErrorReportingPlacement != EndOfTableErrorPlacement {
d.tables <- TableResult{Table: nil, Err: errors.ES(op, errors.KInternal, "received a DataSetHeader frame that does not report errors at the end of the table")}
return false
}
d.setHeader(header)

return true
}
Expand Down
86 changes: 43 additions & 43 deletions azkustodata/query/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,53 +194,53 @@ func TestDataSet_DecodeTables_GetRows(t *testing.T) {
}
}

expectedQueryCompletionInformation := []QueryCompletionInformation{
{
Timestamp: time.Date(2023, 11, 26, 13, 34, 17, 73147800, time.UTC),
ClientRequestId: "blab6",
ActivityId: u,
SubActivityId: u,
ParentActivityId: u,
Level: 4,
LevelName: "Info",
StatusCode: 0,
StatusCodeName: "S_OK (0)",
EventType: 4,
EventTypeName: "QueryInfo",
Payload: "{\"Count\":1,\"Text\":\"Query completed successfully\"}",
},
{
Timestamp: time.Date(2023, 11, 26, 13, 34, 17, 73147800, time.UTC),
ClientRequestId: "blab6",
ActivityId: u,
SubActivityId: u,
ParentActivityId: u,
Level: 4,
LevelName: "Info",
StatusCode: 0,
StatusCodeName: "S_OK (0)",
EventType: 5,
EventTypeName: "WorkloadGroup",
Payload: "{\"Count\":1,\"Text\":\"default\"}",
},
}
/* expectedQueryCompletionInformation := []QueryCompletionInformation{
{
Timestamp: time.Date(2023, 11, 26, 13, 34, 17, 73147800, time.UTC),
ClientRequestId: "blab6",
ActivityId: u,
SubActivityId: u,
ParentActivityId: u,
Level: 4,
LevelName: "Info",
StatusCode: 0,
StatusCodeName: "S_OK (0)",
EventType: 4,
EventTypeName: "QueryInfo",
Payload: "{\"Count\":1,\"Text\":\"Query completed successfully\"}",
},
{
Timestamp: time.Date(2023, 11, 26, 13, 34, 17, 73147800, time.UTC),
ClientRequestId: "blab6",
ActivityId: u,
SubActivityId: u,
ParentActivityId: u,
Level: 4,
LevelName: "Info",
StatusCode: 0,
StatusCodeName: "S_OK (0)",
EventType: 5,
EventTypeName: "WorkloadGroup",
Payload: "{\"Count\":1,\"Text\":\"default\"}",
},
}
information, err := d.QueryCompletionInformation()
assert.NoError(t, err)
information, err := d.QueryCompletionInformation()
assert.NoError(t, err)
assert.Equal(t, expectedQueryCompletionInformation, information)
assert.Equal(t, expectedQueryCompletionInformation, information)
expectedQueryProperties := []QueryProperties{
{
TableId: 1,
Key: "Visualization",
Value: map[string]interface{}{"Visualization": nil, "Title": nil, "XColumn": nil, "Series": nil, "YColumns": nil, "AnomalyColumns": nil, "XTitle": nil, "YTitle": nil, "XAxis": nil, "YAxis": nil, "Legend": nil, "YSplit": nil, "Accumulate": false, "IsQuerySorted": false, "Kind": nil, "Ymin": "NaN", "Ymax": "NaN", "Xmin": nil, "Xmax": nil},
},
}
expectedQueryProperties := []QueryProperties{
{
TableId: 1,
Key: "Visualization",
Value: map[string]interface{}{"Visualization": nil, "Title": nil, "XColumn": nil, "Series": nil, "YColumns": nil, "AnomalyColumns": nil, "XTitle": nil, "YTitle": nil, "XAxis": nil, "YAxis": nil, "Legend": nil, "YSplit": nil, "Accumulate": false, "IsQuerySorted": false, "Kind": nil, "Ymin": "NaN", "Ymax": "NaN", "Xmin": nil, "Xmax": nil},
},
}
properties, err := d.QueryProperties()
assert.NoError(t, err)
assert.Equal(t, expectedQueryProperties, properties)
properties, err := d.QueryProperties()
assert.NoError(t, err)
assert.Equal(t, expectedQueryProperties, properties)*/
}

func TestDataSet_MultiplePrimaryTables(t *testing.T) {
Expand Down
9 changes: 3 additions & 6 deletions azkustodata/query/secondary.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package query

import (
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/google/uuid"
"time"
)

/*
TODO: figure this out
type QueryProperties struct {
TableId int
Key string
Expand Down Expand Up @@ -87,3 +83,4 @@ func (d *DataSet) QueryCompletionInformation() ([]QueryCompletionInformation, er
errorTableUninitialized.Op = d.op()
return nil, errorTableUninitialized
}
*/
40 changes: 37 additions & 3 deletions azkustodata/query/streaming_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,30 @@ type streamingTable struct {
closed bool
}

func (t *streamingTable) RowCount() int {
t.lock.RLock()
defer t.lock.RUnlock()
return t.rowCount
}

func (t *streamingTable) setRowCount(rowCount int) {
t.lock.Lock()
defer t.lock.Unlock()
t.rowCount = rowCount
}

func (t *streamingTable) Skip() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.skip
}

func (t *streamingTable) setSkip(skip bool) {
t.lock.Lock()
defer t.lock.Unlock()
t.skip = skip
}

func NewStreamingTable(dataset *DataSet, th *TableHeader) (StreamingTable, *errors.Error) {
t := &streamingTable{
baseTable: baseTable{
Expand Down Expand Up @@ -56,9 +80,19 @@ func NewStreamingTable(dataset *DataSet, th *TableHeader) (StreamingTable, *erro
}

func (t *streamingTable) close(errors []OneApiError) {
if t.closed {
closed := func() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.closed
}()

if closed {
return
}

t.lock.Lock()
defer t.lock.Unlock()

t.closed = true

close(t.rawRows)
Expand All @@ -77,7 +111,7 @@ func (t *streamingTable) close(errors []OneApiError) {
func (t *streamingTable) readRows() {
for rows := range t.rawRows {
for _, r := range rows {
if t.skip {
if t.Skip() {
t.rows <- RowResult{Row: Row{}, Err: errors.ES(t.op(), errors.KInternal, "skipping row")}
} else {
values := make(value.Values, len(r))
Expand Down Expand Up @@ -109,7 +143,7 @@ func (t *streamingTable) Rows() <-chan RowResult {
}

func (t *streamingTable) SkipToEnd() []error {
t.skip = true
t.setSkip(true)

var errs []error
for r := range t.rows {
Expand Down
Loading

0 comments on commit 03e1639

Please sign in to comment.