Skip to content

Commit

Permalink
Merge branch 'main' into lossless
Browse files Browse the repository at this point in the history
  • Loading branch information
shollyman committed Feb 5, 2024
2 parents 2fcbc74 + 261c8d9 commit d305a05
Show file tree
Hide file tree
Showing 43 changed files with 2,364 additions and 250 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/logging/ @googleapis/api-logging @googleapis/api-logging-partners @googleapis/yoshi-go-admins
/profiler/ @googleapis/api-profiler @googleapis/yoshi-go-admins
/vertexai/ @googleapis/go-vertexai @googleapis/yoshi-go-admins
/internal/protoveneer/ @googleapis/yoshi-go-admins @jba @eliben

# individual release versions manifest is unowned (to avoid notifying every team)
.release-please-manifest-individual.json
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/action_syntax.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ jobs:
permissions:
pull-requests: write
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: reviewdog/action-actionlint@v1
18 changes: 9 additions & 9 deletions .github/workflows/apidiff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ jobs:
scan_changes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: main
- name: Get main commit
id: main
run: echo "hash=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21.x'
- name: Get changed directories
Expand Down Expand Up @@ -48,12 +48,12 @@ jobs:
strategy:
matrix: ${{ fromJson(needs.scan_changes.outputs.changed_dirs) }}
steps:
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21.x'
- name: Install latest apidiff
run: go install golang.org/x/exp/cmd/apidiff@latest
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: main
- name: Create baseline
Expand All @@ -64,14 +64,14 @@ jobs:
- name: Create Go package baseline
run: cd ${{ matrix.changed }} && apidiff -m -w ${{ steps.baseline.outputs.pkg }} .
- name: Upload baseline package data
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ steps.baseline.outputs.pkg }}
path: ${{ matrix.changed }}/${{ steps.baseline.outputs.pkg }}
retention-days: 1
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Download baseline package data
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: ${{ steps.baseline.outputs.pkg }}
path: ${{ matrix.changed }}
Expand All @@ -89,7 +89,7 @@ jobs:
cat diff.txt && ! [ -s diff.txt ]
- name: Add breaking change label
if: ${{ failure() && !github.event.pull_request.head.repo.fork }}
uses: actions/github-script@v6
uses: actions/github-script@v7
with:
script: |
github.rest.issues.addLabels({
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ jobs:
go: [ '1.19', '1.20', '1.21' ]
folders: ['bigtable']
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
path: google-cloud-go
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
repository: googleapis/cloud-bigtable-clients-test
ref: main
path: cloud-bigtable-clients-test
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
- run: go version
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/new_client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
new_versions:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 2
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: 1.21.x
- name: Find new version files
Expand Down Expand Up @@ -48,7 +48,7 @@ jobs:
strategy:
matrix: ${{ fromJson(needs.new_versions.outputs.versions) }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- run: echo >> ${{ matrix.new }}/CHANGES.md
- uses: googleapis/code-suggester@v4
id: code_suggester
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/third_party_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
changed_gomods:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 2
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: 1.21.x
- name: Find modified go.mod files
Expand All @@ -42,5 +42,5 @@ jobs:
strategy:
matrix: ${{ fromJson(needs.changed_gomods.outputs.mods) }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- run: go run ./internal/actions/cmd/thirdpartycheck -q -mod ${{ matrix.mods }}/go.mod
4 changes: 2 additions & 2 deletions .github/workflows/vet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ jobs:
vet:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.20.x'
- name: Install tools
Expand Down
72 changes: 72 additions & 0 deletions bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,78 @@ func TestIntegration_ExtractExternal(t *testing.T) {
}
}

func TestIntegration_ExportDataStatistics(t *testing.T) {
// Create a table, extract it to GCS using EXPORT DATA statement.
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
schema := Schema{
{Name: "name", Type: StringFieldType},
{Name: "num", Type: IntegerFieldType},
}
table := newTable(t, schema)
defer table.Delete(ctx)

// Extract to a GCS object as CSV.
bucketName := testutil.ProjID()
uri := fmt.Sprintf("gs://%s/bq-export-test-*.csv", bucketName)
defer func() {
it := storageClient.Bucket(bucketName).Objects(ctx, &storage.Query{
MatchGlob: "bq-export-test-*.csv",
})
for {
obj, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Logf("failed to delete bucket: %v", err)
continue
}
err = storageClient.Bucket(bucketName).Object(obj.Name).Delete(ctx)
t.Logf("deleted object %s: %v", obj.Name, err)
}
}()

// EXPORT DATA to GCS object.
sql := fmt.Sprintf(`EXPORT DATA
OPTIONS (
uri = '%s',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ';'
)
AS (
SELECT 'a' as name, 1 as num
UNION ALL
SELECT 'b' as name, 2 as num
UNION ALL
SELECT 'c' as name, 3 as num
);`,
uri)
stats, _, err := runQuerySQL(ctx, sql)
if err != nil {
t.Fatal(err)
}

qStats, ok := stats.Details.(*QueryStatistics)
if !ok {
t.Fatalf("expected query statistics not present")
}

if qStats.ExportDataStatistics == nil {
t.Fatal("jobStatus missing ExportDataStatistics")
}
if qStats.ExportDataStatistics.FileCount != 1 {
t.Fatalf("expected ExportDataStatistics to have 1 file, but got %d files", qStats.ExportDataStatistics.FileCount)
}
if qStats.ExportDataStatistics.RowCount != 3 {
t.Fatalf("expected ExportDataStatistics to have 3 rows, got %d rows", qStats.ExportDataStatistics.RowCount)
}
}

func TestIntegration_ReadNullIntoStruct(t *testing.T) {
// Reading a null into a struct field should return an error (not panic).
if client == nil {
Expand Down
25 changes: 25 additions & 0 deletions bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,30 @@ type QueryStatistics struct {

// The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries.
DDLTargetRoutine *Routine

// Statistics for the EXPORT DATA statement as part of Query Job.
ExportDataStatistics *ExportDataStatistics
}

// ExportDataStatistics represents statistics for
// a EXPORT DATA statement as part of Query Job.
type ExportDataStatistics struct {
// Number of destination files generated.
FileCount int64

// Number of destination rows generated.
RowCount int64
}

func bqToExportDataStatistics(in *bq.ExportDataStatistics) *ExportDataStatistics {
if in == nil {
return nil
}
stats := &ExportDataStatistics{
FileCount: in.FileCount,
RowCount: in.RowCount,
}
return stats
}

// BIEngineStatistics contains query statistics specific to the use of BI Engine.
Expand Down Expand Up @@ -1029,6 +1053,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
DDLTargetTable: bqToTable(s.Query.DdlTargetTable, c),
DDLOperationPerformed: s.Query.DdlOperationPerformed,
DDLTargetRoutine: bqToRoutine(s.Query.DdlTargetRoutine, c),
ExportDataStatistics: bqToExportDataStatistics(s.Query.ExportDataStatistics),
StatementType: s.Query.StatementType,
TotalBytesBilled: s.Query.TotalBytesBilled,
TotalBytesProcessed: s.Query.TotalBytesProcessed,
Expand Down
15 changes: 8 additions & 7 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
return err
}

var statsOnExit func()
var statsOnExit func(ctx context.Context)

// critical section: Things that need to happen inside the critical section:
//
Expand All @@ -362,9 +362,10 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
// * add the pending write to the channel for the connection (ordering for the response)
co.mu.Lock()
defer func() {
sCtx := co.ctx
co.mu.Unlock()
if statsOnExit != nil {
statsOnExit()
if statsOnExit != nil && sCtx != nil {
statsOnExit(sCtx)
}
}()

Expand Down Expand Up @@ -441,12 +442,12 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
numRows = int64(len(pr.GetSerializedRows()))
}
}
statsOnExit = func() {
statsOnExit = func(ctx context.Context) {
// these will get recorded once we exit the critical section.
// TODO: resolve open questions around what labels should be attached (connection, streamID, etc)
recordStat(co.ctx, AppendRequestRows, numRows)
recordStat(co.ctx, AppendRequests, 1)
recordStat(co.ctx, AppendRequestBytes, int64(pw.reqSize))
recordStat(ctx, AppendRequestRows, numRows)
recordStat(ctx, AppendRequests, 1)
recordStat(ctx, AppendRequestBytes, int64(pw.reqSize))
}
ch <- pw
return nil
Expand Down
82 changes: 82 additions & 0 deletions bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"io"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -643,3 +644,84 @@ func TestManagedStream_Closure(t *testing.T) {
t.Errorf("expected writer ctx to be dead, is alive")
}
}

// This test exists to try to surface data races by sharing
// a single writer with multiple goroutines. It doesn't assert
// anything about the behavior of the system.
func TestManagedStream_RaceFinder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

var totalsMu sync.Mutex
totalSends := 0
totalRecvs := 0
pool := &connectionPool{
ctx: ctx,
cancel: cancel,
baseFlowController: newFlowController(0, 0),
open: openTestArc(&testAppendRowsClient{},
func(req *storagepb.AppendRowsRequest) error {
totalsMu.Lock()
totalSends = totalSends + 1
curSends := totalSends
totalsMu.Unlock()
if curSends%25 == 0 {
//time.Sleep(10 * time.Millisecond)
return io.EOF
}
return nil
},
func() (*storagepb.AppendRowsResponse, error) {
totalsMu.Lock()
totalRecvs = totalRecvs + 1
curRecvs := totalRecvs
totalsMu.Unlock()
if curRecvs%15 == 0 {
return nil, io.EOF
}
return &storagepb.AppendRowsResponse{}, nil
}),
}
router := newSimpleRouter("")
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}

ms := &ManagedStream{
id: "foo",
streamSettings: defaultStreamSettings(),
retry: newStatelessRetryer(),
}
ms.retry.maxAttempts = 4
ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter A: %v", err)
}

if router.conn == nil {
t.Errorf("expected non-nil connection")
}

numWriters := 5
numWrites := 50

var wg sync.WaitGroup
wg.Add(numWriters)
for i := 0; i < numWriters; i++ {
go func() {
for j := 0; j < numWrites; j++ {
result, err := ms.AppendRows(ctx, [][]byte{[]byte("foo")})
if err != nil {
continue
}
_, err = result.GetResult(ctx)
if err != nil {
continue
}
}
wg.Done()
}()
}
wg.Wait()
cancel()
}
Loading

0 comments on commit d305a05

Please sign in to comment.