Skip to content

Commit

Permalink
Merge pull request #172 from dekart-xyz/error-logging
Browse files Browse the repository at this point in the history
Add job package and EmptyResultError
  • Loading branch information
delfrrr authored Apr 7, 2024
2 parents 9a3780e + 7fe41f9 commit e3b5a56
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
23 changes: 12 additions & 11 deletions src/server/bqjob/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bqjob

import (
"context"
"dekart/src/server/job"
"dekart/src/server/user"
"fmt"
"io"
Expand Down Expand Up @@ -93,8 +94,8 @@ func (r *Reader) getTableFields() []string {
func (r *Reader) getStreams() ([]*bqStoragePb.ReadStream, error) {
readStreams := r.session.GetStreams()
if len(readStreams) == 0 {
err := fmt.Errorf("no streams in read session")
r.logger.Error().Err(err).Send()
err := &job.EmptyResultError{}
r.logger.Debug().Err(err).Send()
return readStreams, err
}
r.logger.Debug().Int32("maxReadStreamsCount", r.maxReadStreamsCount).Msgf("Number of Streams %d", len(readStreams))
Expand All @@ -110,8 +111,8 @@ type StreamReader struct {
logger zerolog.Logger
}

func (r *StreamReader) read(proccessWaitGroup *sync.WaitGroup) {
go r.proccessStreamResponse(proccessWaitGroup)
func (r *StreamReader) read(processWaitGroup *sync.WaitGroup) {
go r.processStreamResponse(processWaitGroup)
go r.readStream()
}

Expand Down Expand Up @@ -152,13 +153,13 @@ func Read(
return
}

var proccessWaitGroup sync.WaitGroup
var processWaitGroup sync.WaitGroup
for _, stream := range readStreams {
proccessWaitGroup.Add(1)
r.newStreamReader(stream.Name, csvRows, errors, logger).read(&proccessWaitGroup)
processWaitGroup.Add(1)
r.newStreamReader(stream.Name, csvRows, errors, logger).read(&processWaitGroup)
}

proccessWaitGroup.Wait() // to close channels and client, see defer statements
processWaitGroup.Wait() // to close channels and client, see defer statements
r.logger.Debug().Msg("All Reading Streams Done")
}

Expand Down Expand Up @@ -202,9 +203,9 @@ func (r *StreamReader) readStream() {
}
}

func (r *StreamReader) proccessStreamResponse(proccessWaitGroup *sync.WaitGroup) {
defer proccessWaitGroup.Done()
defer r.logger.Debug().Str("readStream", r.streamName).Msg("proccessStreamResponse Done")
func (r *StreamReader) processStreamResponse(processWaitGroup *sync.WaitGroup) {
defer processWaitGroup.Done()
defer r.logger.Debug().Str("readStream", r.streamName).Msg("processStreamResponse Done")
var err error
for {
select {
Expand Down
6 changes: 6 additions & 0 deletions src/server/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
"github.com/rs/zerolog/log"
)

type EmptyResultError struct{}

func (e *EmptyResultError) Error() string {
return "Empty result"
}

// Store is the interface for the job storage
type Store interface {
Create(reportID string, queryID string, queryText string, userCtx context.Context) (Job, chan int32, error)
Expand Down
2 changes: 1 addition & 1 deletion src/server/user/claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c ClaimsCheck) validateAuthToken(ctx context.Context, header string) *Clai
})

if err != nil {
log.Error().Err(err).Msg("Error getting token info")
log.Debug().Err(err).Msg("Error getting token info")
return nil
}
missingSensitiveScope := checkMissingScope(sensitiveScope, tokenInfo.Scope)
Expand Down

0 comments on commit e3b5a56

Please sign in to comment.