Skip to content

Commit

Permalink
[issue 195] terminate streams gracefully (#207)
Browse files Browse the repository at this point in the history
* config

* add timeout

* wip

* not working

* uncomment stream restart

* remove default and fix unit tests

* validate that user sets maxstreamlifetime

* fix tests

* fix format

* comment out closesend

* add timer

* tidy and fix mock api issue

* not updated?

* fix failed test

* check status message is exact match with EOF

* update config

* broken unit test

* fix test?

* call streamCall once

* address review feedback

---------

Co-authored-by: Laurent Quérel <[email protected]>
  • Loading branch information
moh-osman3 and lquerel authored Aug 7, 2023
1 parent ebd4ad4 commit 9d6c208
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 25 deletions.
36 changes: 33 additions & 3 deletions collector/examples/synthesize/record.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
receivers:
otlp/loopback:
protocols:
grpc:
endpoint: 127.0.0.1:4000
keepalive:
server_parameters:
max_connection_age: 10s
max_connection_age_grace: 10s
arrow:
disabled: false
otlp:
protocols:
grpc:
generator:
path: ./generator/hipster_shop.yaml

exporters:
otlp/arrow:
endpoint: 127.0.0.1:4000
wait_for_ready: true
tls:
insecure: true
arrow:
disabled: false
num_streams: 1
max_stream_lifetime: 5s
retry_on_failure:
enabled: false
sending_queue:
enabled: false
file/traces:
path: ./recorded_traces.json
compression: zstd
Expand All @@ -29,11 +52,18 @@ processors:
# list of processors below.
service:
pipelines:
traces:
# This pipeline generates trace data, obfuscates + batches the data, and
# send it to the otlp receiver of this same collector
traces/generate:
receivers: [generator]
processors: [obfuscation, batch]
exporters: [file/traces, logging]
exporters: [otlp/arrow, logging]

traces/loop:
receivers: [otlp/loopback]
exporters: [file/traces]

metrics:
receivers: [generator]
processors: [obfuscation, batch]
exporters: [file/metrics, logging]
exporters: [file/metrics, logging]
14 changes: 10 additions & 4 deletions collector/gen/exporter/otlpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otlpexporter // import "github.com/f5/otel-arrow-adapter/collector/gen/e

import (
"fmt"
"time"

"google.golang.org/grpc"

Expand Down Expand Up @@ -33,10 +34,11 @@ type Config struct {
// ArrowSettings includes whether Arrow is enabled and the number of
// concurrent Arrow streams.
type ArrowSettings struct {
Disabled bool `mapstructure:"disabled"`
NumStreams int `mapstructure:"num_streams"`
DisableDowngrade bool `mapstructure:"disable_downgrade"`
EnableMixedSignals bool `mapstructure:"enable_mixed_signals"`
Disabled bool `mapstructure:"disabled"`
NumStreams int `mapstructure:"num_streams"`
DisableDowngrade bool `mapstructure:"disable_downgrade"`
EnableMixedSignals bool `mapstructure:"enable_mixed_signals"`
MaxStreamLifetime time.Duration `mapstructure:"max_stream_lifetime"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -59,5 +61,9 @@ func (cfg *ArrowSettings) Validate() error {
return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams)
}

if cfg.MaxStreamLifetime.Seconds() < float64(1) {
return fmt.Errorf("max stream life must be > 0: %d", cfg.MaxStreamLifetime)
}

return nil
}
25 changes: 15 additions & 10 deletions collector/gen/exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,27 @@ func TestUnmarshalConfig(t *testing.T) {
}

func TestArrowSettingsValidate(t *testing.T) {
settings := func(enabled bool, numStreams int) *ArrowSettings {
return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams}
settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration) *ArrowSettings {
return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams, MaxStreamLifetime: maxStreamLifetime}
}
require.NoError(t, settings(true, 1).Validate())
require.NoError(t, settings(false, 1).Validate())
require.NoError(t, settings(true, 2).Validate())
require.NoError(t, settings(true, math.MaxInt).Validate())
require.NoError(t, settings(true, 1, 10 * time.Second).Validate())
require.NoError(t, settings(false, 1, 10 * time.Second).Validate())
require.NoError(t, settings(true, 2, 1 * time.Second).Validate())
require.NoError(t, settings(true, math.MaxInt, 10 * time.Second).Validate())

require.Error(t, settings(true, 0).Validate())
require.Contains(t, settings(true, 0).Validate().Error(), "stream count must be")
require.Error(t, settings(false, -1).Validate())
require.Error(t, settings(true, math.MinInt).Validate())
require.Error(t, settings(true, 0, 10 * time.Second).Validate())
require.Contains(t, settings(true, 0, 10 * time.Second).Validate().Error(), "stream count must be")
require.Contains(t, settings(true, 1, -1 * time.Second).Validate().Error(), "max stream life must be")
require.Error(t, settings(false, -1, 10 * time.Second).Validate())
require.Error(t, settings(false, 1, -1 * time.Second).Validate())
require.Error(t, settings(true, math.MinInt, 10 * time.Second).Validate())
}

func TestDefaultSettingsValid(t *testing.T) {
cfg := createDefaultConfig()
// this must be set by the user and config
// validation always checks that a value is set.
cfg.(*Config).Arrow.MaxStreamLifetime = 2 * time.Second
require.NoError(t, cfg.(*Config).Validate())

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type commonTestStream struct {
ctxCall *gomock.Call
sendCall *gomock.Call
recvCall *gomock.Call
closeSendCall *gomock.Call
}

func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream {
Expand All @@ -109,6 +110,7 @@ func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream
gomock.Any(), // *arrowpb.BatchArrowRecords
).Times(0),
recvCall: client.EXPECT().Recv().Times(0),
closeSendCall: client.EXPECT().CloseSend().AnyTimes().Return(nil),
}
return testStream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"sync"
"time"

arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1"
arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"
Expand All @@ -23,6 +24,8 @@ type Exporter struct {
// numStreams is the number of streams that will be used.
numStreams int

maxStreamLifetime time.Duration

// disableDowngrade prevents downgrade from occurring, supports
// forcing Arrow transport.
disableDowngrade bool
Expand Down Expand Up @@ -90,6 +93,7 @@ func MakeAnyStreamClient[T AnyStreamClient](clientFunc func(ctx context.Context,

// NewExporter configures a new Exporter.
func NewExporter(
maxStreamLifetime time.Duration,
numStreams int,
disableDowngrade bool,
telemetry component.TelemetrySettings,
Expand All @@ -99,6 +103,7 @@ func NewExporter(
perRPCCredentials credentials.PerRPCCredentials,
) *Exporter {
return &Exporter{
maxStreamLifetime: maxStreamLifetime,
numStreams: numStreams,
disableDowngrade: disableDowngrade,
telemetry: telemetry,
Expand Down Expand Up @@ -175,6 +180,7 @@ func (e *Exporter) runArrowStream(ctx context.Context) {
producer := e.newProducer()

stream := newStream(producer, e.ready, e.telemetry, e.perRPCCredentials)
stream.maxStreamLifetime = e.maxStreamLifetime

defer func() {
if err := producer.Close(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

const defaultMaxStreamLifetime = 11 * time.Second
type compareJSONTraces struct{ ptrace.Traces }
type compareJSONMetrics struct{ pmetric.Metrics }
type compareJSONLogs struct{ plog.Logs }
Expand Down Expand Up @@ -123,7 +124,7 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di
})
}

exp := NewExporter(numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
exp := NewExporter(defaultMaxStreamLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
// Mock the close function, use a real producer for testing dataflow.
mock := arrowRecordMock.NewMockProducerAPI(ctc.ctrl)
prod := arrowRecord.NewProducer()
Expand Down
35 changes: 33 additions & 2 deletions collector/gen/exporter/otlpexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"strings"
"sync"
"time"

arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1"
arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"
Expand All @@ -31,6 +32,12 @@ import (

// Stream is 1:1 with gRPC stream.
type Stream struct {
// maxStreamLifetime is the max timeout before stream
// should be closed on the client side. This ensures a
// graceful shutdown before max_connection_age is reached
// on the server side.
maxStreamLifetime time.Duration

// producer is exclusive to the holder of the stream.
producer arrowRecord.ProducerAPI

Expand Down Expand Up @@ -149,14 +156,17 @@ func (s *Stream) run(bgctx context.Context, streamClient StreamClientFunc, grpcO
ww.Add(1)
go func() {
defer ww.Done()
defer cancel()
writeErr = s.write(ctx)
if writeErr != nil {
cancel()
}
}()

// the result from read() is processed after cancel and wait,
// so we can set s.client = nil in case of a delayed Unimplemented.
err = s.read(ctx)


// Wait for the writer to ensure that all waiters are known.
cancel()
ww.Wait()
Expand Down Expand Up @@ -257,6 +267,8 @@ func (s *Stream) write(ctx context.Context) error {
var hdrsBuf bytes.Buffer
hdrsEnc := hpack.NewEncoder(&hdrsBuf)

timer := time.NewTimer(s.maxStreamLifetime)

for {
// Note: this can't block b/c stream has capacity &
// individual streams shut down synchronously.
Expand All @@ -265,8 +277,17 @@ func (s *Stream) write(ctx context.Context) error {
// this can block, and if the context is canceled we
// wait for the reader to find this stream.
var wri writeItem
var ok bool
select {
case wri = <-s.toWrite:
case <-timer.C:
s.prioritizer.removeReady(s)
s.client.CloseSend()
return nil
case wri, ok = <-s.toWrite:
// channel is closed
if !ok {
return nil
}
case <-ctx.Done():
// Because we did not <-stream.toWrite, there
// is a potential sender race since the stream
Expand Down Expand Up @@ -325,15 +346,25 @@ func (s *Stream) read(_ context.Context) error {
// cancel a call to Recv() but the call to processBatchStatus
// is non-blocking.
for {
// Note: if the client has called CloseSend() and is waiting for a response from the server.
// And if the server fails for some reason, we will wait until some other condition, such as a context
// timeout. TODO: possibly, improve to wait for no outstanding requests and then stop reading.
resp, err := s.client.Recv()
if err != nil {
// Once the send direction of stream is closed the server should return
// an error that mentions an EOF. The expected error code is codes.Unknown.
status, ok := status.FromError(err)
if ok && status.Message() == "EOF" && status.Code() == codes.Unknown {
return nil
}
// Note: do not wrap, contains a Status.
return err
}

if err = s.processBatchStatus(resp); err != nil {
return fmt.Errorf("process: %w", err)
}

}
}

Expand Down
33 changes: 33 additions & 0 deletions collector/gen/exporter/otlpexporter/internal/arrow/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newStreamTestCase(t *testing.T) *streamTestCase {
ctc.requestMetadataCall.AnyTimes().Return(nil, nil)

stream := newStream(producer, prio, ctc.telset, ctc.perRPCCredentials)
stream.maxStreamLifetime = 10 * time.Second

fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0)
fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0)
Expand Down Expand Up @@ -118,6 +119,38 @@ func (tc *streamTestCase) get() *Stream {
return <-tc.prioritizer.readyChannel()
}

// TestStreamEncodeError verifies that exceeding the
// max_stream_lifetime results in shutdown that
// simply restarts the stream.
func TestStreamGracefulShutdown(t *testing.T) {
tc := newStreamTestCase(t)
maxStreamLifetime := 1 * time.Second
tc.stream.maxStreamLifetime = maxStreamLifetime

tc.fromTracesCall.Times(1).Return(oneBatch, nil)
tc.closeSendCall.Times(1).Return(nil)

channel := newHealthyTestChannel()
tc.start(channel)
defer tc.cancelAndWaitForShutdown()
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
defer wg.Done()
batch := <-channel.sent
channel.recv <- statusOKFor(batch.BatchId)
}()

err := tc.get().SendAndWait(tc.bgctx, twoTraces)
require.NoError(t, err)
// let stream get closed and send again.
time.Sleep(maxStreamLifetime)
err = tc.get().SendAndWait(tc.bgctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, ErrStreamRestarting))
}

// TestStreamEncodeError verifies that an encoder error in the sender
// yields a permanent error.
func TestStreamEncodeError(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions collector/gen/exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow"
"github.com/f5/otel-arrow-adapter/collector/gen/internal/netstats"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow"
"github.com/f5/otel-arrow-adapter/collector/gen/internal/netstats"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -131,7 +131,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro
}
}

e.arrow = arrow.NewExporter(e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI {
e.arrow = arrow.NewExporter(e.config.Arrow.MaxStreamLifetime, e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI {
return arrowRecord.NewProducer()
}, e.streamClientFactory(e.config, e.clientConn), perRPCCreds)

Expand Down
Loading

0 comments on commit 9d6c208

Please sign in to comment.