Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MySQL Compressed Protocol #787

Merged
merged 4 commits into from
May 23, 2023

Conversation

dveeden
Copy link
Collaborator

@dveeden dveeden commented May 5, 2023

Closes #645, #647

Protocol docs: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_compression.html

This supports both zlib (CLIENT_COMPRESS) and zstandard (CLIENT_ZSTD_COMPRESSION_ALGORITHM).

This can help with cross-AZ data transfer cost in the cloud and with slower network links.

This probably needs some unittests.

To test:

package main

import (
	"fmt"
	"strings"

	"github.com/go-mysql-org/go-mysql/client"
	"github.com/go-mysql-org/go-mysql/mysql"
)

func run(url, user, password string, flag uint32) {
	println("======================================")
	conn, _ := client.Connect(url, user, password, "test", func(c *client.Conn) {
		c.SetCapability(flag)
	})

	// Trigger a small, uncompresed return packet
	_, err := conn.Execute(`DO 1`)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 2; i++ {
		// With and without a 30 char comment to trigger compressed
		// packets with and without compressed payload
		comment := "/* " + strings.Repeat("x", 30*i) + " */"
		r, err := conn.Execute(`SELECT VERSION(), NOW(6)` + comment)
		if err != nil {
			panic(err)
		}

		for _, row := range r.Values {
			fmt.Printf("%s, %s\n", row[0].AsString(), row[1].AsString())
		}
	}

	// conn.Quit()

	println("======================================")
}

func main() {
	run("127.0.0.1:4000", "root", "", mysql.CLIENT_COMPRESS)
	run("127.0.0.1:3306", "root", "root", mysql.CLIENT_COMPRESS)

	run("127.0.0.1:4000", "root", "", mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM)
	run("127.0.0.1:3306", "root", "root", mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM)

	run("127.0.0.1:4000", "root", "", mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM|mysql.CLIENT_COMPRESS)
	run("127.0.0.1:3306", "root", "root", mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM|mysql.CLIENT_COMPRESS)
}

When inspecting the traffic with Wireshark I would recommend a recent build from the master branch of wireshark.

@dveeden
Copy link
Collaborator Author

dveeden commented May 5, 2023

cc @lance6716

Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll review later, quite busy these days. Do you have spare time to review @atercattus

@@ -112,6 +112,12 @@ func ConnectWithDialer(ctx context.Context, network string, addr string, user st
return nil, errors.Trace(err)
}

if c.ccaps&CLIENT_COMPRESS > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should notify the user about their priority at the comment of SetCapability

Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest lgtm

var err error
switch c.Compression {
case MYSQL_COMPRESS_ZLIB:
c.compressedReader, err = zlib.NewReader(c.reader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify the c.compressedReader with c.reader by interface?

packet/conn.go Outdated Show resolved Hide resolved
} else if n != len(data) {
return errors.Wrapf(ErrBadConn, "Write failed. only %v bytes written, while %v expected", n, len(data))
}
c.compressedReader = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set it to nil?

Copy link
Collaborator Author

@dveeden dveeden May 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 138:

        if c.compressedReader != nil {
                if err := c.ReadPacketTo(buf, c.compressedReader); err != nil {
                        return nil, errors.Trace(err)
                }
        } else {

I could move the c.compressedReader = nil line to here (line 122) if that would make things more obvious:

                        if uncompressedLength > 0 {
                                var err error
                                switch c.Compression {
                                case MYSQL_COMPRESS_ZLIB:
                                        c.compressedReader, err = zlib.NewReader(c.reader)
                                case MYSQL_COMPRESS_ZSTD:
                                        c.compressedReader = zstd.NewReader(c.reader)
                                }
                                if err != nil {
                                        return nil, err
                                }
                        }

The payload of a compressed packet is not always actually compressed. Payloads of less than 50 bytes are often uncompressed as the compression overhead isn't worth it. If the uncompressedLength is 0 then the payload isn't actually compressed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

does replication support mysql compress protocol?
3 participants