Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Add compression for data sent to aggregator #2082

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

robskillington
Copy link
Collaborator

What this PR does / why we need it:

Fixes #

Special notes for your reviewer:

Does this PR introduce a user-facing and/or backwards incompatible change?:


Does this PR require updating code package or user-facing documentation?:


@codecov
Copy link

codecov bot commented Dec 27, 2019

Codecov Report

Merging #2082 (78867a8) into master (01958a4) will decrease coverage by 0.2%.
The diff coverage is 53.6%.

Impacted file tree graph

@@           Coverage Diff            @@
##           master   #2082     +/-   ##
========================================
- Coverage    72.3%   72.2%   -0.2%     
========================================
  Files        1015    1014      -1     
  Lines       87961   87492    -469     
========================================
- Hits        63683   63252    -431     
+ Misses      20029   20003     -26     
+ Partials     4249    4237     -12     
Flag Coverage Δ
aggregator 81.7% <42.8%> (-0.4%) ⬇️
cluster 85.6% <ø> (ø)
collector 64.8% <ø> (ø)
dbnode 79.7% <ø> (+<0.1%) ⬆️
m3em 73.2% <ø> (ø)
m3ninx 73.9% <ø> (ø)
m3nsch 51.1% <ø> (ø)
metrics 17.7% <ø> (ø)
msg 74.3% <65.2%> (-0.7%) ⬇️
query 68.2% <ø> (-0.5%) ⬇️
x 83.3% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 01958a4...78867a8. Read the comment docs.

@@ -141,6 +141,7 @@ func (c *Configuration) newClientOptions(

// ConnectionConfiguration contains the connection configuration.
type ConnectionConfiguration struct {
Compress CompressType `yaml:"compress"`
Copy link
Collaborator

@arnikola arnikola Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CompressionType? The other names seem to be fairly verbose

Also compression as the yaml key reads a little better imo

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, compress would be passable if it was a bool (but not future-proof), but for enums this doesn't work at all

// immediately write once it should be written to connection
// so always flush immediately.
if err := c.conn.Flush(); err != nil {
c.metrics.writeError.Inc(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth logging too? Or is it likely to be super spammy if it fails?

@@ -228,9 +286,10 @@ func (c *connection) resetWithLock() {
}

func (c *connection) closeWithLock() {
if c.conn != nil {
c.conn.Close() // nolint: errcheck
if c.tcpConn != nil {
Copy link
Collaborator

@arnikola arnikola Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also close c.conn or is tcpConn guaranteed to marshall the Close()?

@@ -44,6 +45,55 @@ const (
defaultWriteRetryJitterEnabled = true
)

// CompressType is a compression type.
type CompressType uint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CompressionType reads a little better? Up to you

FlateCompressType
)

var (
Copy link
Collaborator

@arnikola arnikola Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meganit: instead of the var(...) block, var validCompressTypes = ... since there's only the one?

if err := unmarshal(&str); err != nil {
return err
}
for _, validType := range validCompressTypes {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a default case?

if len(validCompressTypes) == 0 {
 *t = NoCompressType
 return nil
}

}
}

return fmt.Errorf("invalid compress type: %s, valid types are: %v",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; invalid compression type

@@ -85,14 +90,19 @@ func newConsumerMetrics(scope tally.Scope) metrics {
}
}

type connWriter interface {
Copy link
Collaborator

@arnikola arnikola Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; May be better to expose the client/conn interface from src/aggregator/client/conn.go here?

var writer connWriter
switch connOpts.CompressType() {
case client.SnappyCompressType:
writer = snappy.NewBufferedWriter(u)
Copy link
Collaborator

@arnikola arnikola Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably include these in the switch statement above if not using a common builder function?

connReader = snappy.NewReader(connReader)
case client.FlateCompressType:
connReader = flate.NewReader(connReader)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be worth adding a default: //noop case just for clarity (also in case the linter complains)?

Comment on lines +43 to +44
Compress client.CompressType `yaml:"compress"`

Copy link
Collaborator

@arnikola arnikola Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: similar case re: naming here, compression over compress? Also comments in this file don't seem to have correct formatting, but happy to punt on that... may also be worth making it a pointer so it's not mandatory

@@ -37,6 +38,7 @@ type Configuration struct {
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
Compress client.CompressType `yaml:"compress"`
Copy link
Collaborator

@arnikola arnikola Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same name comment re: compression vs compress, also perhaps make it a pointer so it's not mandatory?

var writer connWriter
switch opts.CompressType() {
case client.SnappyCompressType:
writer = snappy.NewBufferedWriter(conn)
Copy link
Collaborator

@arnikola arnikola Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not building a common reader/writer builder that creates these, looks like we can roll both of these into a single switch statement, i.e.

case client.SnappyCompressType:
  reader = snappy.NewReader(conn)
  writer = snappy.NewBufferedWriter(conn)
case ...

writer = c.snappyWriter
case FlateCompressType:
c.flateWriter.Reset(tcpConn)
writer = c.flateWriter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we do this and the the connReader init in a couple of different places; might be worth adding a function that takes in a CompressType and a *net.TCPConn and returns the compression-wrapped connection?

FlushInterval *time.Duration `yaml:"flushInterval"`
WriteBufferSize *int `yaml:"writeBufferSize"`
ReadBufferSize *int `yaml:"readBufferSize"`
Compress aggclient.CompressType `yaml:"compress"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a pointer so it's not mandatory?

Comment on lines +53 to +72
type connWriter interface {
io.Writer
Flush() error
}

type rawConnWriter struct {
writer io.Writer
}

func newRawConnWriter(w io.Writer) connWriter {
return &rawConnWriter{writer: w}
}

func (w *rawConnWriter) Write(p []byte) (int, error) {
return w.writer.Write(p)
}

func (w *rawConnWriter) Flush() error {
return nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should pull these into a common file, as well as flateReader and flateConnReader, then we can add a method that will build the correct readers and writers? Otherwise looks like there's a few places that are doing the same thing

Comment on lines +122 to +124
compressType: opts.CompressType(),
snappyWriter: snappy.NewBufferedWriter(nil),
flateWriter: flateWriter,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these all need to be created here? Doesn't seem to be a way to change the compressType, so maybe better to add a Reset(c *net.TCPConn) method to connWriter interface and just create the appropriate writer here, then call Reset(tcpConn) in connectWithLock()?

@@ -141,6 +141,7 @@ func (c *Configuration) newClientOptions(

// ConnectionConfiguration contains the connection configuration.
type ConnectionConfiguration struct {
Compress CompressType `yaml:"compress"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, compress would be passable if it was a bool (but not future-proof), but for enums this doesn't work at all

case client.FlateCompressType:
reader = newFlateConnReader()
default:
reader = bufio.NewReaderSize(u, connOpts.ReadBufferSize())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should document that ReadBufferSize has no effect if using compression

connOpts := opts.ConnectionOptions()

var reader connReader
switch connOpts.CompressType() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be really nice in terms of ease of migration to actually prepend a header eg. {0xFF, 0xF0+compressType} and fall back to uncompressed en/decoders in case the header's wrong.

@jorgelbg
Copy link
Contributor

jorgelbg commented Mar 11, 2020

Hi team 👋

I know that this PR is not yet merged, but we have started testing it and the results look quite promising.

After we deployed the docker images built with this PR we've seen ~25 write errors/s. From the m3aggregator logs:

{"level":"error","ts":1583968033.1887946,"msg":"decode error","remoteAddress":"10.2.10.207:34966","error":"snappy: corrupt input"}

I thought to provide some feedback here and also check if you have any advice on how to track the underline issue. Some time ago we had a problem with some particular metrics that had a super long value for a label (enough to trigger the id too large error). Wondering if something similar might be affecting the compression?

}

func (w *rawConnWriter) Flush() error {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the underlying writer is in some kind of an error state, for example, the connection is closed?

@pavelnikolov
Copy link
Contributor

This change is something we need as we are hitting network congestion limits too in many of our m3coordinator pods.

@jorgelbg I believe that the errors that you are facing are because the connection from the coordinator are timing out and get closed immediately. This in turn causes issues when decoding on the m3aggregator side. In order to test this hypothesis you can increase the connection timoeut in your m3coordinator:

downsample:
  remoteAggregator:
    client:
      ...
      connection:
          writeTimeout: 15000ms   # set this value to something higher and monitor in the e2e dashboard

I was having the same issue and after I increased this timeout the errors almost disappeared. See screenshot attached:
Screen Shot 2020-05-06 at 11 54 54 pm

@pavelnikolov
Copy link
Contributor

Do you need help with this PR? I would be happy to resolve the conflicts and action the feedback comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants