-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathintegration.go
360 lines (343 loc) · 12.1 KB
/
integration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package integration provides functionality that needs to be shared between all
// integration tests.
//
// Integration tests are implemented through Go's test framework, as test
// functions that create and execute pipelines using the ptest package. Tests
// should be placed in smaller sub-packages for organizational purposes and
// parallelism (tests are only run in parallel across different packages).
// Integration tests should always begin with a call to CheckFilters to ensure
// test filters can be applied, and each package containing integration tests
// should call ptest.Main in a TestMain function if it uses ptest.
//
// Running integration tests can be done with a go test call with any flags that
// are required by the test pipelines, such as --runner or --endpoint.
// Example:
//
// go test -v ./sdks/go/test/integration/... --runner=portable --endpoint=localhost:8099
//
// Alternatively, tests can be executed by running the
// run_validatesrunner_tests.sh script, which also performs much of the
// environment setup, or by calling gradle commands in :sdks:go:test.
package integration
import (
"fmt"
"math/rand"
"regexp"
"strings"
"testing"
"time"
// common runner flag.
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
// Filters for temporarily skipping integration tests. All filters are regex
// matchers that must match the full name of a test at the point where
// CheckFilters is called. Multiple tests can be skipped by using regex
// wildcards. (ex. "TestXLang_.*" filters all tests starting with TestXLang_)
//
// It is strongly recommended to include, TODOs, GitHub issues, or just comments
// describing why tests are being skipped.
// sickbay filters tests that fail due to Go SDK errors. These tests will not
// execute on any runners.
var sickbay = []string{}
// Runner-specific test filters, for features that are not yet supported on
// specific runners.
var directFilters = []string{
// The direct runner does not yet support cross-language.
"TestXLang.*",
"TestKafkaIO.*",
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
"TestDebeziumIO_BasicRead",
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
"TestDataframe",
// Triggers, Panes are not yet supported
"TestTrigger.*",
"TestPanes",
// The direct runner does not support the TestStream primitive
"TestTestStream.*",
// (https://github.com/apache/beam/issues/21130): The direct runner does not support windowed side inputs
"TestValidateWindowedSideInputs",
// (https://github.com/apache/beam/issues/21130): The direct runner does not currently support multimap side inputs
"TestParDoMultiMapSideInput",
"TestLargeWordcount_Loopback",
// The direct runner does not support self-checkpointing
"TestCheckpointing",
// The direct runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The direct runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
}
var portableFilters = []string{
// The portable runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
// The portable runner does not support self-checkpointing
"TestCheckpointing",
// The portable runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The portable runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
}
var prismFilters = []string{
// The prism runner does not yet support Java's CoGBK.
"TestXLang_CoGroupBy",
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The prism runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The prism runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
}
var flinkFilters = []string{
// TODO(https://github.com/apache/beam/issues/20723): Flink tests timing out on reads.
"TestXLang_Combine.*",
"TestDebeziumIO_BasicRead",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
// The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs.
"TestTriggerAfterSynchronizedProcessingTime",
// The flink runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// Flink does not support map based state types.
"TestMapState",
"TestMapStateClear",
"TestSetStateClear",
"TestSetState",
}
var samzaFilters = []string{
// TODO(https://github.com/apache/beam/issues/20987): Samza tests invalid encoding.
"TestReshuffle",
"TestReshuffleKV",
// The Samza runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(https://github.com/apache/beam/issues/21244): Samza doesn't yet support post job metrics, used by WordCount
"TestWordCount.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
// The Samza runner does not support self-checkpointing
"TestCheckpointing",
// The samza runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The samza runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
// TODO(https://github.com/apache/beam/issues/26126): Java runner issue (AcitveBundle has no regsitered handler)
"TestDebeziumIO_BasicRead",
}
var sparkFilters = []string{
// TODO(BEAM-11498): XLang tests broken with Spark runner.
"TestXLang.*",
"TestParDoSideInput",
"TestParDoKVSideInput",
// The Spark runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// [BEAM-13921]: Spark doesn't support side inputs to executable stages
"TestDebeziumIO_BasicRead",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
// The spark runner does not support self-checkpointing
"TestCheckpointing",
// The spark runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// Spark does not support map based state types.
"TestMapState",
"TestMapStateClear",
"TestSetStateClear",
"TestSetState",
}
var dataflowFilters = []string{
// The Dataflow runner doesn't work with tests using testcontainers locally.
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
"TestDebeziumIO_BasicRead",
"TestMongoDBIO.*",
// TODO(BEAM-11576): TestFlattenDup failing on this runner.
"TestFlattenDup",
// The Dataflow runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// There is no infrastructure for running KafkaIO tests with Dataflow.
"TestKafkaIO.*",
"TestSpannerIO.*",
// Dataflow doesn't support any test that requires loopback.
// Eg. For FileIO examples.
".*Loopback.*",
// Dataflow does not automatically terminate the TestCheckpointing pipeline when
// complete.
"TestCheckpointing",
// TODO(21761): This test needs to provide GCP project to expansion service.
"TestBigQueryIO_BasicWriteQueryRead",
// Can't handle the test spanner container or access a local spanner.
"TestSpannerIO.*",
// Dataflow does not drain jobs by itself.
"TestDrain",
}
// CheckFilters checks if an integration test is filtered to be skipped, either
// because the intended runner does not support it, or the test is sickbayed.
// This method should be called at the beginning of any integration test. If
// t.Run is used, CheckFilters should be called within the t.Run callback, so
// that sub-tests can be skipped individually.
func CheckFilters(t *testing.T) {
if !ptest.MainCalled() {
panic("ptest.Main() has not been called: please override TestMain to ensure that the integration test runs properly.")
}
// Check for sickbaying first.
n := t.Name()
for _, f := range sickbay {
// Add start and end of string regexp matchers so only a full match is
// counted.
f = fmt.Sprintf("^%v$", f)
match, err := regexp.MatchString(f, n)
if err != nil {
t.Errorf("Matching of regex '%v' with test '%v' failed: %v", f, n, err)
}
if match {
t.Skipf("Test %v is currently sickbayed on all runners", n)
}
}
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
*jobopts.JobName = fmt.Sprintf("go-%v-%v", strings.ToLower(n), r1.Intn(1000))
// Test for runner-specific skipping second.
var filters []string
runner := *ptest.Runner
if runner == "" {
runner = ptest.DefaultRunner()
}
switch runner {
case "direct", "DirectRunner":
filters = directFilters
case "prism", "PrismRunner":
filters = prismFilters
case "portable", "PortableRunner":
filters = portableFilters
case "flink", "FlinkRunner":
filters = flinkFilters
case "samza", "SamzaRunner":
filters = samzaFilters
case "spark", "SparkRunner":
filters = sparkFilters
case "dataflow", "DataflowRunner":
filters = dataflowFilters
default:
return
}
for _, f := range filters {
// Add start and end of string regexp matchers so only a full match is
// counted.
f = fmt.Sprintf("^%v$", f)
match, err := regexp.MatchString(f, n)
if err != nil {
t.Errorf("Matching of regex '%v' with test '%v' failed: %v", f, n, err)
}
if match {
t.Skipf("Test %v is currently filtered for runner %v", n, runner)
}
}
}