Skip to content

Commit

Permalink
spanner: improve transaction test coverage
Browse files Browse the repository at this point in the history
integration test: transaction runner and transaction aborts and retries
transaction unit test: acquire and error handling

mockclient: add function to stall processing requests

key,session,session_test: lint

Change-Id: I4b2a9392a3e9a8d89b89e8e4af439745cbcbbc15
Reviewed-on: https://code-review.googlesource.com/14731
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Jonathan Amsterdam <[email protected]>
  • Loading branch information
larryxiao authored and fruit-steamers committed Jul 25, 2017
1 parent 9a38b47 commit c6036dd
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 21 deletions.
8 changes: 4 additions & 4 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// Test validDatabaseName()
func TestValidDatabaseName(t *testing.T) {
validDbUri := "projects/spanner-cloud-test/instances/foo/databases/foodb"
validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb"
invalidDbUris := []string{
// Completely wrong DB URI.
"foobarDB",
Expand All @@ -32,12 +32,12 @@ func TestValidDatabaseName(t *testing.T) {
// No instance ID.
"projects/spanner-cloud-test/instances//databases/foodb",
}
if err := validDatabaseName(validDbUri); err != nil {
t.Errorf("validateDatabaseName(%q) = %v, want nil", validDbUri, err)
if err := validDatabaseName(validDbURI); err != nil {
t.Errorf("validateDatabaseName(%q) = %v, want nil", validDbURI, err)
}
for _, d := range invalidDbUris {
if err, wantErr := validDatabaseName(d), "should conform to pattern"; !strings.Contains(err.Error(), wantErr) {
t.Errorf("validateDatabaseName(%q) = %q, want error pattern %q", validDbUri, err, wantErr)
t.Errorf("validateDatabaseName(%q) = %q, want error pattern %q", validDbURI, err, wantErr)
}
}
}
Expand Down
38 changes: 35 additions & 3 deletions spanner/internal/testutil/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,23 @@ type MockCloudSpannerClient struct {
sessions map[string]bool
// Expected set of actions that will be executed by the client.
actions []Action
// Session ping history
// Session ping history.
pings []string
// Injected error, will be returned by all APIs
// Injected error, will be returned by all APIs.
injErr map[string]error
// nice client will not fail on any request
// Client will not fail on any request.
nice bool
// Client will stall on any requests.
freezed chan struct{}
}

// NewMockCloudSpannerClient creates new MockCloudSpannerClient instance.
func NewMockCloudSpannerClient(t *testing.T, acts ...Action) *MockCloudSpannerClient {
mc := &MockCloudSpannerClient{t: t, sessions: map[string]bool{}, injErr: map[string]error{}}
mc.SetActions(acts...)
// Produce a closed channel, so the default action of ready is to not block.
mc.Freeze()
mc.Unfreeze()
return mc
}

Expand Down Expand Up @@ -121,6 +126,7 @@ func (m *MockCloudSpannerClient) DumpSessions() map[string]bool {

// CreateSession is a placeholder for SpannerClient.CreateSession.
func (m *MockCloudSpannerClient) CreateSession(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.injErr["CreateSession"]; err != nil {
Expand All @@ -139,6 +145,7 @@ func (m *MockCloudSpannerClient) CreateSession(c context.Context, r *sppb.Create

// GetSession is a placeholder for SpannerClient.GetSession.
func (m *MockCloudSpannerClient) GetSession(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.injErr["GetSession"]; err != nil {
Expand All @@ -153,6 +160,7 @@ func (m *MockCloudSpannerClient) GetSession(c context.Context, r *sppb.GetSessio

// DeleteSession is a placeholder for SpannerClient.DeleteSession.
func (m *MockCloudSpannerClient) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.injErr["DeleteSession"]; err != nil {
Expand All @@ -169,11 +177,13 @@ func (m *MockCloudSpannerClient) DeleteSession(c context.Context, r *sppb.Delete

// ExecuteSql is a placeholder for SpannerClient.ExecuteSql.
func (m *MockCloudSpannerClient) ExecuteSql(c context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (*sppb.ResultSet, error) {
m.ready()
return nil, errors.New("Unimplemented")
}

// ExecuteStreamingSql is a mock implementation of SpannerClient.ExecuteStreamingSql.
func (m *MockCloudSpannerClient) ExecuteStreamingSql(c context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (sppb.Spanner_ExecuteStreamingSqlClient, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.injErr["ExecuteStreamingSql"]; err != nil {
Expand Down Expand Up @@ -220,12 +230,14 @@ func (m *MockCloudSpannerClient) ExecuteStreamingSql(c context.Context, r *sppb.

// Read is a placeholder for SpannerClient.Read.
func (m *MockCloudSpannerClient) Read(c context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (*sppb.ResultSet, error) {
m.ready()
m.t.Fatalf("Read is unimplemented")
return nil, errors.New("Unimplemented")
}

// StreamingRead is a placeholder for SpannerClient.StreamingRead.
func (m *MockCloudSpannerClient) StreamingRead(c context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (sppb.Spanner_StreamingReadClient, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.injErr["StreamingRead"]; err != nil {
Expand Down Expand Up @@ -283,6 +295,7 @@ func (m *MockCloudSpannerClient) StreamingRead(c context.Context, r *sppb.ReadRe

// BeginTransaction is a placeholder for SpannerClient.BeginTransaction.
func (m *MockCloudSpannerClient) BeginTransaction(c context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if !m.nice {
Expand Down Expand Up @@ -310,6 +323,7 @@ func (m *MockCloudSpannerClient) BeginTransaction(c context.Context, r *sppb.Beg

// Commit is a placeholder for SpannerClient.Commit.
func (m *MockCloudSpannerClient) Commit(c context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if !m.nice {
Expand All @@ -333,6 +347,7 @@ func (m *MockCloudSpannerClient) Commit(c context.Context, r *sppb.CommitRequest

// Rollback is a placeholder for SpannerClient.Rollback.
func (m *MockCloudSpannerClient) Rollback(c context.Context, r *sppb.RollbackRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
m.ready()
m.mu.Lock()
defer m.mu.Unlock()
if !m.nice {
Expand All @@ -353,3 +368,20 @@ func (m *MockCloudSpannerClient) Rollback(c context.Context, r *sppb.RollbackReq
}
return nil, nil
}

// Freeze stalls all requests.
func (m *MockCloudSpannerClient) Freeze() {
m.freezed = make(chan struct{})
}

// Unfreeze restores processing requests.
func (m *MockCloudSpannerClient) Unfreeze() {
close(m.freezed)
}

// ready checks conditions before executing requests
// TODO: also check injected errors, actions
func (m *MockCloudSpannerClient) ready() {
// check if client should be freezed
<-m.freezed
}
6 changes: 3 additions & 3 deletions spanner/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ func (key Key) String() string {
}

// AsPrefix returns a KeyRange for all keys where k is the prefix.
func (k Key) AsPrefix() KeyRange {
func (key Key) AsPrefix() KeyRange {
return KeyRange{
Start: k,
End: k,
Start: key,
End: key,
Kind: ClosedClosed,
}
}
Expand Down
8 changes: 4 additions & 4 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,9 @@ func (hc *healthChecker) maintainer() {
<-hc.ready

var (
kWindowSize uint64 = 10
iteration uint64 = 0
timeout <-chan time.Time
windowSize uint64 = 10
iteration uint64
timeout <-chan time.Time
)

// replenishPool is run if numOpened is less than sessionsToKeep, timeouts on sampleInterval.
Expand Down Expand Up @@ -1080,7 +1080,7 @@ func (hc *healthChecker) maintainer() {
hc.pool.mu.Unlock()

hc.mu.Lock()
if iteration%kWindowSize == 0 || maxSessionsInUse < currSessionsInUse {
if iteration%windowSize == 0 || maxSessionsInUse < currSessionsInUse {
maxSessionsInUse = currSessionsInUse
}
sessionsToKeep := maxUint64(hc.pool.MinOpened,
Expand Down
10 changes: 5 additions & 5 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,10 +844,10 @@ func TestMaintainer(t *testing.T) {
t.SkipNow()
}
var (
kMinOpened uint64 = 5
kMaxIdle uint64 = 4
minOpened uint64 = 5
maxIdle uint64 = 4
)
sp, _, cancel := setup(t, SessionPoolConfig{MinOpened: kMinOpened, MaxIdle: kMaxIdle})
sp, _, cancel := setup(t, SessionPoolConfig{MinOpened: minOpened, MaxIdle: maxIdle})
sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval
hcInterval := sp.SessionPoolConfig.HealthCheckInterval
defer cancel()
Expand Down Expand Up @@ -889,8 +889,8 @@ func TestMaintainer(t *testing.T) {
}
<-time.After(sampleInterval*10 + hcInterval)
sp.mu.Lock()
if sp.numOpened != kMinOpened {
t.Errorf("Scale down. Expect %d open, got %d", kMinOpened, sp.numOpened)
if sp.numOpened != minOpened {
t.Errorf("Scale down. Expect %d open, got %d", minOpened, sp.numOpened)
}
sp.mu.Unlock()
}
Loading

0 comments on commit c6036dd

Please sign in to comment.