-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support TLS for Kafka #1414
Support TLS for Kafka #1414
Conversation
Missing sign-off, and
|
01def27
to
695738c
Compare
sorry i rebased and force pushed; i did not realize that you had a local copy for the review! |
0983dc5
to
7d79203
Compare
Codecov Report
@@ Coverage Diff @@
## master #1414 +/- ##
==========================================
- Coverage 98.21% 98.17% -0.05%
==========================================
Files 195 195
Lines 9602 9602
==========================================
- Hits 9431 9427 -4
- Misses 134 137 +3
- Partials 37 38 +1
Continue to review full report at Codecov.
|
pkg/kafka/config/.nocover
Outdated
@@ -0,0 +1 @@ | |||
requires connection to Kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inaccurate. It seems all errors in config.go can be easily simulated by giving non-existent file names, so this package is easy to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unfortunately i was just lazy.
cmd/ingester/app/flags.go
Outdated
// SuffixKey is a suffix for the tls key path flag | ||
SuffixKey = ".tls.key" | ||
// SuffixCA is a suffix for the tls ca path flag | ||
SuffixCA = ".tls.ca" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're adding the same flags in two places. I think it would be better to refactor these common settings into the new kafka/config
package you created, to deal with these flags that are common for produce & consumer:
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
It means creating these functions:
func (c *Configuration) AddFlags(prefix string, flagSet *flag.FlagSet) {}
func (c *Configuration) InitFromViper(v *viper.Viper) {}
and delegating to them from the respective places in ingester and kafka storage impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right; i will do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be no ZipkinThriftMarshaller
. That means the encoding has different options for consumer and producer applications. Would something like that be okay for you:
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
logger.Info("Kafka factory",
zap.Any("producer builder", f.Builder),
zap.Any("topic", f.options.config.Topic))
p, err := f.NewProducer()
if err != nil {
return err
}
f.producer = p
switch f.options.config.Encoding {
case config.EncodingProto:
f.marshaller = newProtobufMarshaller()
case config.EncodingJSON:
f.marshaller = newJSONMarshaller()
case config.EncodingZipkinThrift:
return fmt.Errorf("producers do not support '%s' yet", config.EncodingZipkinThrift)
default:
return fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`,
f.options.config.Encoding, strings.Join(config.AllEncodings, "\", \""))
}
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with that. When calling AddFlags on the shared config, you may want to pass a list of allowed encodings so that the help text doesn't say zipkin for producers
@MichaHoffmann any chance to rebase this and finalize? cc) @Demonian |
hey, sorry i was busy last month! i was just now starting to rebase and wanted to finalize it this week! |
3233799
to
89402a8
Compare
Could we add a configuration for disabling hostname verification? |
@MichaHoffmann is there any active work on this? Would love to help take this across the finish if i can help! |
@backjo Hey, sorry! I was busy the last weeks; i will polish what i have done so far up on saturday. Regarding Hostname Verification: the sarama config just takes a reference to a tls.Config (https://golang.org/src/crypto/tls/common.go?s=15521:24369#L403), and this has the field |
9ffac74
to
75fff2e
Compare
@MichaHoffmann Hi, as I can see this PR is almost resolved, maybe you need some help to finish it?=) |
pkg/kafka/config/config.go
Outdated
return nil, errors.Wrapf(err, "error reading ca") | ||
} | ||
|
||
cert, err := tls.LoadX509KeyPair(filepath.Clean(tlsConfig.KeyPath), filepath.Clean(tlsConfig.CertPath)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this PR and TLS support works for me expect that I had to switch the kafka.producer.tls.cert
and kafka.producer.tls.key
command line arguments otherwise it couldn't parse the certificate.
According to the documentation the parameter order should be (i.e. swapped):
func LoadX509KeyPair(certFile, keyFile string) (Certificate, error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, i did miss that!
black-adder, jpkrohling, objectiser, pavolloffay, tiffon, vprithvi, and/or yurishkuro |
hey, sorry i did not get to work on this the last weeks, ill resolve the conflicts asap. |
008da1f
to
d7a2e21
Compare
Signed-off-by: mhoffmann <[email protected]>
d7a2e21
to
a96cd4b
Compare
Signed-off-by: mhoffmann <[email protected]>
a96cd4b
to
93ae5ea
Compare
Signed-off-by: mhoffmann <[email protected]>
3c4a832
to
93a1238
Compare
Signed-off-by: mhoffmann <[email protected]>
@MichaHoffmann would you consider some refactoring as part of this? This really bothers me:
i.e. we already have 3 places in the code that perform essentially identical work of loading TLS keys, and you're adding the 4th place. Similarly, unit tests are repeated, whereas we could've tested loading of TLS files in just one place. |
And it's not just the loading of the files, all the CLI flags are also repeated. We could have a single pkg that takes a prefix and FlagSet and adds all the usual TLS flags. |
I can do that this weekend, would you be okay with adding flags and config to |
Maybe pkg/config/tls, because auth is broader. |
Sorry for the noise, looks like I was using an older version of this PR. I'm still unable to get a working setup, but not sure it has to do with the state of this PR.
{"level":"info","ts":1569506473.3507824,"caller":"kafka/factory.go:62","msg":"Kafka factory","producer builder":{"Brokers":["my-cluster-kafka-brokers.kafka:9092"],"TLS":{"Enabled":false,"CertPath":"/var/run/secrets/kafkauser/user.crt","KeyPath":"/var/run/secrets/kafkauser/user.key","CaPath":"/var/run/secrets/kafkauser/ca.crt"}},"topic":"jaeger-spans"}
EDIT:
{"level":"info","ts":1569508180.688255,"caller":"kafka/factory.go:62","msg":"Kafka factory","producer builder":{"Brokers":["my-cluster-kafka-brokers.kafka:9093"],"TLS":{"Enabled":false,"CertPath":"/var/run/secrets/kafkauser/user.crt","KeyPath":"/var/run/secrets/kafkauser/user.key","CaPath":"/var/run/secrets/kafkauser/ca.crt"}},"topic":"jaeger-spans"}
{"level":"fatal","ts":1569508181.4714146,"caller":"collector/main.go:87","msg":"Failed to init storage factory","error":"kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","stacktrace":"main.main.func1\n\t/Users/kearls/go/src/github.com/jaegertracing/jaeger/cmd/collector/main.go:87\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).execute\n\t/Users/kearls/go/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:762\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/kearls/go/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:852\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).Execute\n\t/Users/kearls/go/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:800\nmain.main\n\t/Users/kearls/go/src/github.com/jaegertracing/jaeger/cmd/collector/main.go:177\nruntime.main\n\t/usr/local/Cellar/go/1.12.6/libexec/src/runtime/proc.go:200"}
|
@jpkrohling at the current |
@MichaHoffmann, thanks! I'm still unable to get it working, though: $ git log -1
commit e89911b67177c2b02aa2faee985960eb7d1c4754 (HEAD -> 1401_kafka_storage_tls)
Author: mhoffmann <[email protected]>
Date: Sat Sep 21 18:28:25 2019 +0200
checkout correct commits in submodules
Signed-off-by: mhoffmann <[email protected]>
$ SPAN_STORAGE_TYPE=kafka go run ./cmd/collector/main.go \
--kafka.producer.authentication=tls \
--kafka.producer.brokers=my-cluster-kafka-brokers.kafka:9093 \
--kafka.producer.tls.ca=/tmp/cluster-ca.crt \
--kafka.producer.tls.cert=/tmp/user.crt \
--kafka.producer.tls.key=/tmp/user.key \
--kafka.producer.topic=jaeger-spans In Kafka's logs, I see this:
Running OpenSSL Client with the same certs does work, though: $ openssl s_client \
-connect my-cluster-kafka-brokers.kafka:9093 \
-CAfile /tmp/cluster-ca.crt \
-cert /tmp/user.crt \
-key /tmp/user.key
CONNECTED(00000003)
depth=1 O = io.strimzi, CN = cluster-ca v0
verify return:1
depth=0 O = io.strimzi, CN = my-cluster-kafka
verify return:1
---
Certificate chain
0 s:O = io.strimzi, CN = my-cluster-kafka
i:O = io.strimzi, CN = cluster-ca v0
1 s:O = io.strimzi, CN = cluster-ca v0
i:O = io.strimzi, CN = cluster-ca v0
---
Server certificate
-----BEGIN CERTIFICATE-----
MIIDzzCCAregAwIBAgIJAMAX8YFJZFRrMA0GCSqGSIb3DQEBCwUAMC0xEzARBgNV
BAoMCmlvLnN0cmltemkxFjAUBgNVBAMMDWNsdXN0ZXItY2EgdjAwHhcNMTkwOTI2
MTMyNDM0WhcNMjAwOTI1MTMyNDM0WjAwMRMwEQYDVQQKDAppby5zdHJpbXppMRkw
FwYDVQQDDBBteS1jbHVzdGVyLWthZmthMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAqGDpx9psr6ZPQMCXYRTe+F+KNKr+AXCCwhX2pT8+aGnEd4kHwwnX
hBuAc86COBN9Uo90HQPYvfHvUOXVw0usVzjNZHGeHGpngkA7QOkr0Nrbq8HOHvqd
USCE+a+ctQq+orv2zQeFBmx9E5uH2COjO5jVcrV+TGtPlx17rLv7sOZxjUXhNs4n
j3EemEKFuFZue8zOZ2AV3a1QzMwlhhomtO2D/o8nL6gb5V1kOq3yWNvEpgNmiH+D
nR6xwhDs/Xo0kolxFOVEmxJiMCDDXTP3dhRiugup6kAXhz21sp1DXiOmfOziTsYr
klHPzZy2B76WmrMfUj09bZLWCg81ORNRnQIDAQABo4HuMIHrMIHoBgNVHREEgeAw
gd2CQ215LWNsdXN0ZXIta2Fma2EtMC5teS1jbHVzdGVyLWthZmthLWJyb2tlcnMu
a2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWyCJG15LWNsdXN0ZXIta2Fma2EtYm9vdHN0
cmFwLmthZmthLnN2Y4IybXktY2x1c3Rlci1rYWZrYS1ib290c3RyYXAua2Fma2Eu
c3ZjLmNsdXN0ZXIubG9jYWyCGm15LWNsdXN0ZXIta2Fma2EtYm9vdHN0cmFwgiBt
eS1jbHVzdGVyLWthZmthLWJvb3RzdHJhcC5rYWZrYTANBgkqhkiG9w0BAQsFAAOC
AQEAqHNDTKSm5HNm5ws1bWVmOFnu0kamTTkg79IQyFfZWHqUxstO0vMLUgHpZfVK
MAYLSaDGOZf9DWirf8slExaLcmwMkxskFHso7b5YD3hqsaUSVo3jzxLzf2IywqfK
vEbUBQFI2QD+OLrsJQmiOwxnmJoOa+6IbquJ1d9TCmWuiKBB1n8edj2mbnPyCl66
3lOfMp9Fl8y8Eh0PWJB7FUThR7vUUKfyUkD8XuZZorgdUamDGOh7kq+xIEEIykgM
IFhmXcbNCNxOfVuIwgaDdqeNa6aX7GHtlHEX3NDSb4h8wPUSxIOYVAWtOVcgAdWe
u0GmC4ER8XOwOd/JEU2aEuyBVw==
-----END CERTIFICATE-----
subject=O = io.strimzi, CN = my-cluster-kafka
issuer=O = io.strimzi, CN = cluster-ca v0
---
Acceptable client certificate CA names
O = io.strimzi, CN = clients-ca v0
Client Certificate Types: RSA sign, DSA sign, ECDSA sign
Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Peer signing digest: SHA256
Peer signature type: RSA
Server Temp Key: ECDH, P-256, 256 bits
---
SSL handshake has read 2374 bytes and written 1455 bytes
Verification: OK
---
New, TLSv1.2, Cipher is ECDHE-RSA-AES256-GCM-SHA384
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
Protocol : TLSv1.2
Cipher : ECDHE-RSA-AES256-GCM-SHA384
Session-ID: 5D8DD34C65E88EEED2EFAA0F726D4E286FB82914CC80791EE7BE78B8C1449920
Session-ID-ctx:
Master-Key: E1FA47895E2F758890D0C954D7382720A16827FCF52C9F6210ABE2499D8A2148AD4CB66C0106521F5ACE0E67E9E4CA73
PSK identity: None
PSK identity hint: None
SRP username: None
Start Time: 1569575756
Timeout : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes
--- I'm debugging the Go code now to see where the problem might be. |
Turns out, there's a tool in the $ kafka-console-consumer -brokers my-cluster-kafka-brokers.kafka:9093 -tls-client-cert /tmp/user.crt -tls-client-key /tmp/user.key -topic jaeger-spans -verbose -tls-enabled true
2019/09/27 11:23:51 Initializing new client
2019/09/27 11:23:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2019/09/27 11:23:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2019/09/27 11:23:51 client/metadata fetching metadata for all topics from broker my-cluster-kafka-brokers.kafka:9093
2019/09/27 11:23:51 Failed to connect to broker my-cluster-kafka-brokers.kafka:9093: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata got error from broker -1 while fetching metadata: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata no available broker to send metadata request to
2019/09/27 11:23:51 client/brokers resurrecting 1 dead seed brokers
2019/09/27 11:23:51 client/metadata retrying after 250ms... (3 attempts remaining)
2019/09/27 11:23:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2019/09/27 11:23:51 client/metadata fetching metadata for all topics from broker my-cluster-kafka-brokers.kafka:9093
2019/09/27 11:23:51 Failed to connect to broker my-cluster-kafka-brokers.kafka:9093: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata got error from broker -1 while fetching metadata: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata no available broker to send metadata request to
2019/09/27 11:23:51 client/brokers resurrecting 1 dead seed brokers
2019/09/27 11:23:51 client/metadata retrying after 250ms... (2 attempts remaining)
2019/09/27 11:23:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2019/09/27 11:23:51 client/metadata fetching metadata for all topics from broker my-cluster-kafka-brokers.kafka:9093
2019/09/27 11:23:51 Failed to connect to broker my-cluster-kafka-brokers.kafka:9093: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata got error from broker -1 while fetching metadata: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata no available broker to send metadata request to
2019/09/27 11:23:51 client/brokers resurrecting 1 dead seed brokers
2019/09/27 11:23:51 client/metadata retrying after 250ms... (1 attempts remaining)
2019/09/27 11:23:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2019/09/27 11:23:51 client/metadata fetching metadata for all topics from broker my-cluster-kafka-brokers.kafka:9093
2019/09/27 11:23:51 Failed to connect to broker my-cluster-kafka-brokers.kafka:9093: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata got error from broker -1 while fetching metadata: x509: certificate is valid for my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local, my-cluster-kafka-bootstrap.kafka.svc, my-cluster-kafka-bootstrap.kafka.svc.cluster.local, my-cluster-kafka-bootstrap, my-cluster-kafka-bootstrap.kafka, not my-cluster-kafka-brokers.kafka
2019/09/27 11:23:51 client/metadata no available broker to send metadata request to
2019/09/27 11:23:51 client/brokers resurrecting 1 dead seed brokers
2019/09/27 11:23:51 Closing Client
ERROR: Failed to start consumer: kafka: client has run out of available brokers to talk to (Is your cluster reachable?) Using the hostname |
@jpkrohling thanks for testing! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I booked #1837 to refactor all TLS into a single package
Which problem is this PR solving?
Short description of the changes