Skip to content

Commit

Permalink
Merge branch 'master' into linasn/bootstrap-profiler
Browse files Browse the repository at this point in the history
* master:
  [dbnode] Remove unused Shard.ScanData method (#3148)
  [tests] Add option to skip setup for docker integration tests (#3146)
  [dbnode] Add source propagation to aggregate query (#3153)
  [aggregator] Prevent tcp client panic on nil placement (#3139)
  [dtest] endpoint to fetch tagged (#3138)
  • Loading branch information
soundvibe committed Feb 2, 2021
2 parents a44993e + f23e2d1 commit 2f2ba51
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 222 deletions.
4 changes: 3 additions & 1 deletion scripts/docker-integration-tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ if ! command -v nc && [[ "$BUILDKITE" == "true" ]]; then
trap cleanup_nc EXIT
fi

scripts/docker-integration-tests/setup.sh
if [[ -z "$SKIP_SETUP" ]] || [[ "$SKIP_SETUP" == "false" ]]; then
scripts/docker-integration-tests/setup.sh
fi

NUM_TESTS=${#TESTS[@]}
MIN_IDX=$((NUM_TESTS*BUILDKITE_PARALLEL_JOB/BUILDKITE_PARALLEL_JOB_COUNT))
Expand Down
30 changes: 12 additions & 18 deletions scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,18 @@ curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{
}'

echo "Read data"
queryResult=$(curl -sSf -X POST 0.0.0.0:9003/query -d '{
"namespace": "unagg",
"query": {
"regexp": {
"field": "city",
"regexp": ".*"
}
},
"rangeStart": 0,
"rangeEnd":'"$(date +"%s")"'
}' | jq '.results | length')

if [ "$queryResult" -lt 1 ]; then
echo "Result not found"
exit 1
else
echo "Result found"
fi
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff \
'[ "$(curl -sSf -X POST 0.0.0.0:9003/query -d "{
\"namespace\": \"unagg\",
\"query\": {
\"regexp\": {
\"field\": \"city\",
\"regexp\": \".*\"
}
},
\"rangeStart\": 0,
\"rangeEnd\":'\"$(date +\"%s\")\"'
}" | jq ".results | length")" == "1" ]'

echo "Deleting placement"
curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/services/m3db/placement
Expand Down
17 changes: 16 additions & 1 deletion src/aggregator/client/tcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"errors"
"fmt"
"math"
"time"
Expand All @@ -39,7 +40,11 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"
)

var _ AdminClient = (*TCPClient)(nil)
var (
_ AdminClient = (*TCPClient)(nil)

errNilPlacement = errors.New("placement is nil")
)

// TCPClient sends metrics to M3 Aggregator via over custom TCP protocol.
type TCPClient struct {
Expand Down Expand Up @@ -229,6 +234,9 @@ func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) {
return nil, 0, err
}
defer onStagedPlacementDoneFn()
if stagedPlacement == nil {
return nil, 0, errNilPlacement
}

placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
if err != nil {
Expand All @@ -247,6 +255,9 @@ func (c *TCPClient) ActivePlacementVersion() (int, error) {
return 0, err
}
defer onStagedPlacementDoneFn()
if stagedPlacement == nil {
return 0, errNilPlacement
}

return stagedPlacement.Version(), nil
}
Expand Down Expand Up @@ -274,6 +285,10 @@ func (c *TCPClient) write(
if err != nil {
return err
}
if stagedPlacement == nil {
onStagedPlacementDoneFn()
return errNilPlacement
}
placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
if err != nil {
onStagedPlacementDoneFn()
Expand Down
25 changes: 25 additions & 0 deletions src/aggregator/client/tcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,31 @@ func TestTCPClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) {
}
}

func TestTCPClientWriteUntimedMetricActiveStagedPlacementNil(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().
Return(nil, func() {}, nil).
MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.placementWatcher = watcher

for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} {
var err error
switch input.Type {
case metric.CounterType:
err = c.WriteUntimedCounter(input.Counter(), testStagedMetadatas)
case metric.TimerType:
err = c.WriteUntimedBatchTimer(input.BatchTimer(), testStagedMetadatas)
case metric.GaugeType:
err = c.WriteUntimedGauge(input.Gauge(), testStagedMetadatas)
}
require.Equal(t, errNilPlacement, err)
}
}

func TestTCPClientWriteUntimedMetricActivePlacementError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
18 changes: 18 additions & 0 deletions src/cmd/tools/dtest/docker/harness/resources/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Node interface {
AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error)
// Fetch fetches datapoints.
Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error)
// FetchTagged fetches datapoints by tag.
FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error)
// Exec executes the given commands on the node container, returning
// stdout and stderr from the container.
Exec(commands ...string) (string, error)
Expand Down Expand Up @@ -267,6 +269,22 @@ func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) {
return dps, nil
}

func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) {
if c.resource.closed {
return nil, errClosed
}

logger := c.resource.logger.With(zapMethod("fetchtagged"))
result, err := c.tchanClient.TChannelClientFetchTagged(timeout, req)
if err != nil {
logger.Error("could not fetch", zap.Error(err))
return nil, err
}

logger.Info("fetched", zap.Int("series_count", len(result.GetElements())))
return result, nil
}

func (c *dbNode) Restart() error {
if c.resource.closed {
return errClosed
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func (client *TestTChannelClient) TChannelClientFetch(
return client.node.Fetch(ctx, req)
}

// TChannelClientFetchTagged fulfills a fetch by tag request using a tchannel client.
func (client *TestTChannelClient) TChannelClientFetchTagged(
timeout time.Duration, req *rpc.FetchTaggedRequest,
) (*rpc.FetchTaggedResult_, error) {
ctx, _ := thrift.NewContext(timeout)
return client.node.FetchTagged(ctx, req)
}

// TChannelClientAggregateTiles runs a request for AggregateTiles.
func (client *TestTChannelClient) TChannelClientAggregateTiles(
timeout time.Duration, req *rpc.AggregateTilesRequest,
Expand Down
5 changes: 5 additions & 0 deletions src/dbnode/network/server/tchannelthrift/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR
if err := results.Err(); err != nil {
return nil, convert.ToRPCError(err)
}

return result, nil
}

Expand Down Expand Up @@ -281,6 +282,10 @@ func (s *service) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest)
return nil, tterrors.NewBadRequestError(err)
}

if len(req.Source) > 0 {
opts.Source = req.Source
}

iter, metadata, err := session.Aggregate(ns, query, opts)
if err != nil {
return nil, convert.ToRPCError(err)
Expand Down
53 changes: 2 additions & 51 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,11 +704,3 @@ type StreamedMetadataEntry struct {

// NewReaderFn creates a new DataFileSetReader.
type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error)

// DataEntryProcessor processes StreamedDataEntries.
type DataEntryProcessor interface {
// SetEntriesCount sets the number of entries to be processed.
SetEntriesCount(int)
// ProcessEntry processes a single StreamedDataEntry.
ProcessEntry(StreamedDataEntry) error
}
60 changes: 0 additions & 60 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2839,66 +2839,6 @@ func (s *dbShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReade
return reader, nil
}

func (s *dbShard) ScanData(
blockStart time.Time,
processor fs.DataEntryProcessor,
) error {
latestVolume, err := s.LatestVolume(blockStart)
if err != nil {
return err
}

reader, err := s.newReaderFn(s.opts.BytesPool(), s.opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return err
}

openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: blockStart,
VolumeIndex: latestVolume,
},
FileSetType: persist.FileSetFlushType,
StreamingEnabled: true,
}

if err := reader.Open(openOpts); err != nil {
return err
}

readEntriesErr := s.scanDataWithReader(reader, processor)
// Always close the reader regardless of if failed, but
// make sure to propagate if an error occurred closing the reader too.
readCloseErr := reader.Close()
if err := readEntriesErr; err != nil {
return readEntriesErr
}
return readCloseErr
}

func (s *dbShard) scanDataWithReader(
reader fs.DataFileSetReader,
processor fs.DataEntryProcessor,
) error {
processor.SetEntriesCount(reader.Entries())

for {
entry, err := reader.StreamingRead()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}

if err := processor.ProcessEntry(entry); err != nil {
return err
}
}
}

func (s *dbShard) logFlushResult(r dbShardFlushResult) {
s.logger.Debug("shard flush outcome",
zap.Uint32("shard", s.ID()),
Expand Down
Loading

0 comments on commit 2f2ba51

Please sign in to comment.