Skip to content

Commit

Permalink
fix flaky actions/tests, improve install-action
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Dec 25, 2022
1 parent bb8f56b commit 7247e46
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 40 deletions.
30 changes: 24 additions & 6 deletions .github/workflows/install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,36 @@ on:

jobs:
install_go_get:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.18

- name: Install
shell: bash
shell: bash
run: |
set +x
go mod init test
export GOSUMDB=off
go get -t github.com/lovoo/goka@master
go test -v github.com/lovoo/goka
go get github.com/lovoo/goka@master
cat > main.go << EndOfMessage
package main
import (
"context"
"github.com/lovoo/goka"
)
func main() {
proc, _ := goka.NewProcessor(nil, goka.DefineGroup("test"))
proc.Run(context.Background())
}
EndOfMessage
cat main.go
go mod tidy
cat go.mod
echo "Compiling module"
go build -v .
echo "... done"
12 changes: 6 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ on:

jobs:
unit:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.18

Expand All @@ -23,12 +23,12 @@ jobs:
go test -v -race ./...
system-test:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.18

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/runexamples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ on:

jobs:
run-examples:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.18

Expand Down
14 changes: 8 additions & 6 deletions systemtest/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"github.com/stretchr/testify/require"
)

const (
pollWaitSecs = 15.0
)

// TestAutoCommit tests/demonstrates the behavior of disabling the auto-commit functionality.
// The autocommiter sends the offsets of the marked messages to the broker regularily. If the processor shuts down
// (or the group rebalances), the offsets are sent one last time, so just turning it of is not enough.
// To get a processor to not commit any offsets, we're using a fault-injecting proxy
// and cut the connections before shutdown, so the last-commit is failing.
func TestAutoCommit(t *testing.T) {
t.Parallel()
var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-commit-test", time.Now().Unix()))
inputStream = goka.Stream(group) + "-input"
Expand Down Expand Up @@ -75,7 +78,7 @@ func TestAutoCommit(t *testing.T) {

// run the first processor
_, cancel, done := runProc(createProc())
pollTimed(t, "all-received1", 10, func() bool {
pollTimed(t, "all-received1", pollWaitSecs, func() bool {
return len(offsets) == 10 && offsets[0] == 0
})

Expand All @@ -93,7 +96,7 @@ func TestAutoCommit(t *testing.T) {

// --> we'll receive all messages again
// --> i.e., no offsets were committed
pollTimed(t, "all-received2", 10, func() bool {
pollTimed(t, "all-received2", pollWaitSecs, func() bool {
return len(offsets) == 10 && offsets[0] == 0
})

Expand All @@ -105,7 +108,6 @@ func TestAutoCommit(t *testing.T) {
// Two messages (1, 2) are emitted, after consuming (2), it crashes.
// Starting it a second time will reconsume it.
func TestUnmarkedMessages(t *testing.T) {
t.Parallel()
var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-mark-test", time.Now().Unix()))
inputStream = goka.Stream(group) + "-input"
Expand Down Expand Up @@ -151,7 +153,7 @@ func TestUnmarkedMessages(t *testing.T) {

// run the first processor
runProc(createProc())
pollTimed(t, "all-received1", 10, func() bool {
pollTimed(t, "all-received1", pollWaitSecs, func() bool {
return len(values) == 2 && values[0] == 1
})

Expand All @@ -160,7 +162,7 @@ func TestUnmarkedMessages(t *testing.T) {

// restart -> we'll only receive the second message
runProc(createProc())
pollTimed(t, "all-received2", 10, func() bool {
pollTimed(t, "all-received2", pollWaitSecs, func() bool {
return len(values) == 1 && values[0] == 2
})
}
8 changes: 1 addition & 7 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestHotStandby(t *testing.T) {
errg.Go(func() error {
// simulatenously stopping multiple processors sometimes fails the processors, so this one gets delayed
// see issue #376 for details
return proc1.Run(DelayedCtxCloser(ctx, 5*time.Second))
return proc1.Run(DelayedCtxCloser(ctx, 10*time.Second))
})

errg.Go(func() error {
Expand Down Expand Up @@ -300,7 +300,6 @@ func TestRecoverAhead(t *testing.T) {
// TestRebalance runs some processors to test rebalance. It's merely a
// runs-without-errors test, not a real functional test.
func TestRebalance(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down Expand Up @@ -370,7 +369,6 @@ func TestRebalance(t *testing.T) {
// TestRebalanceSharePartitions runs two processors one after each other
// and asserts that they rebalance partitions appropriately
func TestRebalanceSharePartitions(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down Expand Up @@ -480,7 +478,6 @@ func TestRebalanceSharePartitions(t *testing.T) {
}

func TestCallbackFail(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down Expand Up @@ -546,7 +543,6 @@ func TestCallbackFail(t *testing.T) {
}

func TestProcessorSlowStuck(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down Expand Up @@ -615,7 +611,6 @@ func TestProcessorSlowStuck(t *testing.T) {
// * restart this processor a couple of times and check whether it stays 10.
//
func TestMessageCommit(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down Expand Up @@ -690,7 +685,6 @@ func TestMessageCommit(t *testing.T) {
}

func TestProcessorGracefulShutdownContinue(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
Expand Down
15 changes: 3 additions & 12 deletions systemtest/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (fip *FIProxy) Dial(network, addr string) (c net.Conn, err error) {
defer fip.m.Unlock()

conn, err := net.Dial(network, addr)
if err != nil {
return nil, err
}

wrappedConn := &Conn{
Conn: conn,
Expand All @@ -74,18 +77,6 @@ func (fip *FIProxy) removeConn(c net.Conn) {
delete(fip.conns, c.LocalAddr().String())
}

func (fip *FIProxy) getConns() []string {
fip.m.Lock()
defer fip.m.Unlock()
var conns []string

for c := range fip.conns {
conns = append(conns, c)
}

return conns
}

func (fip *FIProxy) SetReadError(err error) {
fip.m.Lock()
defer fip.m.Unlock()
Expand Down

0 comments on commit 7247e46

Please sign in to comment.