Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M3DB client should not reject multiple blocks from a peer with the same blockstart #1640

Closed
richardartoul opened this issue May 16, 2019 · 3 comments · Fixed by #1707
Closed
Assignees

Comments

@richardartoul
Copy link
Contributor

richardartoul commented May 16, 2019

Right now the streamBlocksBatchFromPeer will reject responses from peers that return more than one block for a given blockStart. I assume this exists for historical reasons, but it makes it so that peer bootstrapping sometimes gets stuck trying to make progress until its peers stop returning multiple blocks (which can happen after a flush when the blocks have not been evicted from memory yet by the tick so they exist in the buffer and on disk or in the near future when we have out of order writes).

It seems to me like we can just delete the following code from streamBlocksBatchFromPeer in session.go

// We only ever fetch a single block for a series
		if len(result.Elements[i].Blocks) != 1 {
			errMsg := "stream blocks returned more blocks than expected"
			blocksErr := fmt.Errorf(errMsg+": expected=%d, actual=%d",
				1, len(result.Elements[i].Blocks))
			failed := []receivedBlockMetadata{batch[i]}
			s.reattemptStreamBlocksFromPeersFn(failed, enqueueCh, blocksErr,
				respErrReason, nextRetryReattemptType, m)
			m.fetchBlockError.Inc(int64(len(req.Elements[i].Starts)))
			s.log.Error(errMsg,
				zap.Stringer("id", id),
				zap.Times("expectedStarts", newTimesByUnixNanos(req.Elements[i].Starts)),
				zap.Times("actualStarts", newTimesByRPCBlocks(result.Elements[i].Blocks)),
				zap.Stringer("peer", peer.Host()),
			)
			continue
		}

then the merging logic in addBlockFromPeer should just work itself out:

func (r *bulkBlocksResult) addBlockFromPeer(
	id ident.ID,
	encodedTags checked.Bytes,
	peer topology.Host,
	block *rpc.Block,
) error {
	start := time.Unix(0, block.Start)
	result, err := r.newDatabaseBlock(block)
	if err != nil {
		return err
	}

	var (
		tags                ident.Tags
		attemptedDecodeTags bool
	)
	for {
		r.Lock()
		currBlock, exists := r.result.BlockAt(id, start)
		if !exists {
			if encodedTags == nil || attemptedDecodeTags {
				r.result.AddBlock(id, tags, result)
				r.Unlock()
				break
			}
			r.Unlock()

			// Tags not decoded yet, attempt decoded and then reinsert
			attemptedDecodeTags = true
			tagDecoder := r.tagDecoderPool.Get()
			tags, err = newTagsFromEncodedTags(id, encodedTags,
				tagDecoder, r.idPool)
			tagDecoder.Close()
			if err != nil {
				return err
			}
			continue
		}

		// Remove the existing block from the result so it doesn't get
		// merged again
		r.result.RemoveBlockAt(id, start)
		r.Unlock()

		// If we've already received data for this block, merge them
		// with the new block if possible
		tmpCtx := r.contextPool.Get()
		currReader, err := currBlock.Stream(tmpCtx)
		if err != nil {
			return err
		}

		// If there are no data in the current block, there is no
		// need to merge
		if currReader.IsEmpty() {
			continue
		}

		resultReader, err := result.Stream(tmpCtx)
		if err != nil {
			return err
		}
		if resultReader.IsEmpty() {
			return nil
		}

		readers := []xio.SegmentReader{currReader.SegmentReader, resultReader.SegmentReader}
		blockSize := currReader.BlockSize

		encoder, err := r.mergeReaders(start, blockSize, readers)

		if err != nil {
			return err
		}

		result.Close()

		result = r.blockOpts.DatabaseBlockPool().Get()
		result.Reset(start, blockSize, encoder.Discard(), r.nsCtx)

		tmpCtx.Close()
	}

	return nil
}
@richardartoul
Copy link
Contributor Author

@robskillington This is the issue we were talking about. Let me know if this makes sense, seems like an easy fix if we're right

@robskillington
Copy link
Collaborator

Correct yeah, this defensive code did not take into account we would have multiple buffers.

Easy fix would be to let it actually merge yes.

@richardartoul
Copy link
Contributor Author

@robskillington @prateek

I was looking through some of the server-side code because I was working on the repair stuff and now that I'm looking at that, I'm wondering if maybe we need to make a change there. The code is from reader.go in the series package:

func (r Reader) fetchBlocksWithBlocksMapAndBuffer(
	ctx context.Context,
	starts []time.Time,
	seriesBlocks block.DatabaseSeriesBlocks,
	seriesBuffer databaseBuffer,
	nsCtx namespace.Context,
) ([]block.FetchBlockResult, error) {
	var (
		// TODO(r): pool these results arrays
		res         = make([]block.FetchBlockResult, 0, len(starts))
		cachePolicy = r.opts.CachePolicy()
		// NB(r): Always use nil for OnRetrieveBlock so we don't cache the
		// series after fetching it from disk, the fetch blocks API is called
		// during streaming so to cache it in memory would mean we would
		// eventually cache all series in memory when we stream results to a
		// peer.
		onRetrieve block.OnRetrieveBlock
	)
	for _, start := range starts {
		if seriesBlocks != nil {
			if b, exists := seriesBlocks.BlockAt(start); exists {
				streamedBlock, err := b.Stream(ctx)
				if err != nil {
					r := block.NewFetchBlockResult(start, nil,
						fmt.Errorf("unable to retrieve block stream for series %s time %v: %v",
							r.id.String(), start, err))
					res = append(res, r)
				}
				if streamedBlock.IsNotEmpty() {
					b := []xio.BlockReader{streamedBlock}
					r := block.NewFetchBlockResult(start, b, nil)
					res = append(res, r)
				}
				continue
			}
		}
		switch {
		case cachePolicy == CacheAll:
			// No-op, block metadata should have been in-memory
		case r.retriever != nil:
			// Try to stream from disk
			if r.retriever.IsBlockRetrievable(start) {
				streamedBlock, err := r.retriever.Stream(ctx, r.id, start, onRetrieve, nsCtx)
				if err != nil {
					r := block.NewFetchBlockResult(start, nil,
						fmt.Errorf("unable to retrieve block stream for series %s time %v: %v",
							r.id.String(), start, err))
					res = append(res, r)
				}
				if streamedBlock.IsNotEmpty() {
					b := []xio.BlockReader{streamedBlock}
					r := block.NewFetchBlockResult(start, b, nil)
					res = append(res, r)
				}
			}
		}
	}

	if seriesBuffer != nil && !seriesBuffer.IsEmpty() {
		bufferResults := seriesBuffer.FetchBlocks(ctx, starts, nsCtx)
		res = append(res, bufferResults...)
	}

	block.SortFetchBlockResultByTimeAscending(res)

	return res, nil
}

Now that I've been reading through some of this code path on the client and server side more carefully, I wonder if the issue is actually that in fact only one block should be returned, but that block should have multiple streams in it.

For example if the code linked above finds data both in the buffer and on disk (which can happen right now after a flush but before the eviction triggered by the tick, and will happen more often when the out-of-order writes stuff lands) we'll return two separate block.FetchBlockResult. It seems like actually it should return one block.FetchBlockResult but the FetchBlockResult should have two xio.BlockReaders in it.

Can one of you look through that and tell me if I'm right and if not explain to me why? Just want to make sure I understand this correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants