Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add input plugin for McRouter #4077

Merged
merged 1 commit into from
May 1, 2018
Merged

Conversation

cthayer
Copy link
Contributor

@cthayer cthayer commented Apr 25, 2018

McRouter is a smart proxy for Memcached

https://github.com/facebook/mcrouter

Code is based on the memcached input plugin

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

Copy link
Contributor

@danielnelson danielnelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is based heavily on the memcached input, but this is one of the oldest plugins we have and it could stand to be brought up to the current style/standards. Would you be interested in doing some of the updates on this plugin and then we can bring memcached up to match afterwards?

# with optional port. ie localhost, 10.0.0.1:11211, etc.
servers = ["localhost:11211"]
# An array of unix mcrouter sockets to gather stats about.
# unix_sockets = ["/var/run/mcrouter.sock"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a single list of this style:

servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock", "udp://localhost:11211"]

We don't need to add support for UDP at first, assuming it is even available with mcrouter, but this will give us a nice way to add it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated to use a single array of servers and pull the protocol from the connection string.

(mcrouter does not support UDP connections)

var defaultTimeout = 5 * time.Second

// The list of metrics that should be sent
var sendMetrics = []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to have these locked to specific types, since it would be a problem if a field changed values. Maybe we can turn this into a map[string]statType where statType is an enum being int64, float64 or string? Then when we parse we try the expected value and log an error if it doesn't match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to map with types and only attempt a convert for the expected type

# unix_sockets = ["/var/run/mcrouter.sock"]
`

var defaultTimeout = 5 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this configurable and have it apply to all servers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the zookeeper example here

address = address + ":11211"
}

conn, err = net.DialTimeout("tcp", address, defaultTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be a nice touch to create a context.Context at the beginning of Gather, and use DialContext, I recently updated the zookeeper input to do something like this:

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/zookeeper/zookeeper.go#L81

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I followed the zookeeper example here

conn.SetDeadline(time.Now().Add(defaultTimeout))

// Read and write buffer
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary, just use conn.Write( and you won't need to flush.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the zookeeper example and just do fmt.Fprint(conn

return nil
}

func parseResponse(r *bufio.Reader) (map[string]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a bufio.Scanner to read lines, the ReadLine() function can return partial lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using bufio.Scanner now

// Read values
s := bytes.SplitN(line, []byte(" "), 3)
if len(s) != 3 || !bytes.Equal(s[0], []byte("STAT")) {
return values, fmt.Errorf("unexpected line in stats response: %q", line)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return nil as the map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change. I assume that the standard is to return no stats on error rather than partial stats?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, returning an error is probably best here.

@cthayer cthayer force-pushed the master branch 3 times, most recently from 2801c28 to d79b823 Compare April 27, 2018 14:55
@cthayer
Copy link
Contributor Author

cthayer commented Apr 27, 2018

I think I could update the memcached plugin afterwards.

It seems like a breaking change for the memcached plugin to remove the UnixSockets member from the struct?

var protocol string
var dialer net.Dialer

s := strings.SplitN(address, "://", 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use url.Parse here, I think it will help simplify things and prevent errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code to use url.Parse

I feel like it makes it a bit stricter on the server connection string format, but I think that's ok.

I make some attempt to tease partial connection strings into a proper format, but ultimately default to tcp://localhost:11211 if the server connection string does not parse properly.

@danielnelson
Copy link
Contributor

Yeah, for memcached we will need to keep backwards compatibility. I usually try to translate the old format to the new one on the first call to Gather.

}

if u.Scheme != "tcp" && u.Scheme != "unix" {
protocol = defaultServerURL.Scheme
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning error

address = u.Path
} else {
if u.Host == "" {
u.Host = defaultServerURL.Host
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning error


host, port, err = net.SplitHostPort(u.Host)

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a few quick tests and unfortunately I think we will need to handle the case when either host or port are empty strings:

func Check(input string) {
	host, port, err := net.SplitHostPort(input)
	fmt.Printf("%q %q %q: %v\n", input, host, port, err)
}

func TestSplitHostPort(t *testing.T) {
	Check("localhost:8086")
	Check("localhost") # we should accept this and use default port
	Check("localhost:") # use default port
	Check(":8086") # use default hostname
	Check(":") # default host:port or reject
	Check("") # reject, but we already do that above
}
"localhost:8086" "localhost" "8086": <nil>
"localhost" "" "": address localhost: missing port in address
"localhost:" "localhost" "": <nil>
":8086" "" "8086": <nil>
":" "" "": <nil>
"" "" "": missing port in address

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for parsing addresses and separated the address parsing into its own function.

I'm just using the url.URL functions to get the hostname and port parts and not using net.SplitHostPort any more.

@danielnelson danielnelson added this to the 1.7.0 milestone May 1, 2018
@danielnelson danielnelson merged commit 83345ec into influxdata:master May 1, 2018
@danielnelson
Copy link
Contributor

Great job, merged for 1.7.0

@cthayer
Copy link
Contributor Author

cthayer commented May 2, 2018

Thanks!

arkady-emelyanov pushed a commit to arkady-emelyanov/telegraf that referenced this pull request May 18, 2018
maxunt pushed a commit that referenced this pull request Jun 26, 2018
otherpirate pushed a commit to otherpirate/telegraf that referenced this pull request Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants