-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RabbitMQ collector #1742
RabbitMQ collector #1742
Conversation
} | ||
|
||
@Test | ||
@Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ignored because it can't actually connect to a RabbitMQ server at localhost and the application context fails to refresh. This test was taken from the Kafka collector tests, but Kafka doesn't fail to start if it cannot connect to Kafka, I believe. It would still be nice to test that the conditional and rest of the auto-configuration is working. Let me know what you want to do about this.
public static Iterable<Object[]> data() { | ||
return Arrays.asList( | ||
parameters("addresses", "localhost:5671, localhost:5673", | ||
properties -> properties.getAddresses().toString().substring(1, properties.getAddresses().toString().length() - 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit wonky due to the mismatch of types: the properties uses a List<String>
, the builder Address[]
, and of course we are passing just a String via the environment/YAML but Spring converts to a list for us. We could add a converter for Address
but this felt like more work than it is worth right now.
public class RabbitMqCollectorTest { | ||
|
||
private static final int RABBIT_PORT = 5672; | ||
private static final String RABBIT_DOCKER_IMAGE = "rabbitmq:3.6-alpine"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests against the latest patch version of RabbitMQ 3.6. We could probably fairly easily parameterize this if we wanted to test against multiple versions. A 3.7 release candidate was published recently, for instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the alpine
varient just because it is a smaller image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.. we might make a test image at some point to reduce layers needed to be downloaded
final RabbitMqSpanConsumer consumer = new RabbitMqSpanConsumer(this.builder, channel); | ||
channel.basicConsume(this.builder.queue, true, this.name, consumer); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to start RabbitMQ consumer " + this.name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IllegalStateException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. nothing major pops out at me. great work!
zipkin-server/README.md
Outdated
### RabbitMQ collector | ||
The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the RabbitMQ server(s) `addresses` are set. | ||
|
||
Property | Environment Variable | Description |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might suggest following conventions in the file and/or restructuring them generally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand. I copied this convention from the corresponding table under "HTTP Collector". There seems to be a lack of consistency throughout the README. The "Self-Tracing" section reverses two of the columns, for the order of: "Variable", "Property", "Description". All the Storage sections and the "Scribe Collector" only give the environment variable and a description. Is this perhaps the convention you meant?
return this; | ||
} | ||
|
||
public Builder addresses(Address[] addresses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd accept a list, so that we can later use things like AutoValue (which don't allow arrays), and just convert to array at the site you need.
I can see the inconsistency. The reason why http collector stuff is only
listed in the zipkin-server readme is because the code only exists here
(there is no separate module even if maybe we should have one). You'll
notice a difference in how kafka etc are noted.. listing the env variables
here, but linking to a table in the kafka module's readme.
Make sense?
|
d682051
to
59b6f10
Compare
zipkin-collector/rabbitmq/README.md
Outdated
``` | ||
3. Save an array of spans to a file like `sample-spans.json` | ||
```json | ||
[{"traceId":"73321ecb02595c39","id":"73321ecb02595c39","name":"get","annotations":[{"timestamp":1505990621526000,"value":"sr","endpoint":{"serviceName":"service","ipv4":"127.0.0.1","port":8080}},{"timestamp":1505990621527000,"value":"ss","endpoint":{"serviceName":"service","ipv4":"127.0.0.1","port":8080}}],"binaryAnnotations":[{"key":"http.method","value":"GET","endpoint":{"serviceName":"service","ipv4":"127.0.0.1","port":8080}}]}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a v2 one!
[{
"traceId": "9032b04972e475c5",
"id": "9032b04972e475c5",
"kind": "SERVER",
"name": "get",
"timestamp": 1505990621526000,
"duration": 612898,
"localEndpoint": {
"serviceName": "brave-webmvc-example",
"ipv4": "192.168.1.113"
},
"remoteEndpoint": {
"serviceName": "",
"ipv4": "127.0.0.1",
"port": 60149
},
"tags": {
"error": "500 Internal Server Error",
"http.path": "/a"
}
}]
zipkin-collector/rabbitmq/README.md
Outdated
``` | ||
4. Publish them using the CLI | ||
```bash | ||
$ rabbitmqadmin publish exchange=amq.default routing_key=zipkin.spans < sample-spans.json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the topic-like things have always defaulted to "zipkin" If that becomes ambiguous, we can append for whatever else we do. I'd prefer consistency and something obvious to guess.
(you might suggest this consistency is violated in ES, but that's a bit different as we have no dependency link transport, even if we have dependency link storage. if we did, we'd have to append to all the others, too.
public static final class Builder implements CollectorComponent.Builder { | ||
Collector.Builder delegate = Collector.builder(RabbitMqCollector.class); | ||
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; | ||
String queue = "zipkin.spans"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer plain old zipkin here, as otherwise folks might accidentally suggest it in support and have that be a lie!
tested locally. good! |
c284fc4
to
6695045
Compare
This adds a RabbitMQ collector module along with its corresponding auto-configuration.
6695045
to
0113c9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spent an hour or so last night and found some things..
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an indirect dep of rabbit? if not, let's not use it (as it is unnecessary). use JUL instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slf4j-api
is the RabbitMQ client library's only required dependency but we don't need to use it ourselves. I'm not sure why I did... probably just habit.
final Builder builder; | ||
final int concurrency; | ||
final AtomicReference<CheckResult> failure = new AtomicReference<>(); | ||
private Connection connection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
volatile and start and close are different threads
|
||
static final class LazyRabbitWorkers extends LazyCloseable<ExecutorService> { | ||
|
||
final Builder builder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
careful about saving builder here as it is mutable and start is different thread than .build(). one way is to make your builder auto-value and make a copy of the builder (or just save off the params). otherwise add a TODO mentioning the edge-case mutability concern
Span span = SpanDecoder.THRIFT_DECODER.readSpan(body); | ||
this.collector.accept(Collections.singletonList(span), NOOP); | ||
} else { | ||
this.collector.acceptSpans(body, DETECTING_DECODER, NOOP); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill all this for collector.acceptSpans as single-span encoding was a legacy of kafka specifically (we don't need to carry that here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in latest force pushed commit.
import static zipkin.TestObjects.TRACE; | ||
import static zipkin.collector.rabbitmq.RabbitMqCollector.convertAddresses; | ||
|
||
public class RabbitMqCollectorTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is insulation and also the ability to test locally in helpers like LazyCassandra3Storage might want to do that here
This adds a RabbitMQ collector module along with its corresponding auto-configuration.
3eacdbb
to
2a22de4
Compare
} | ||
|
||
/** | ||
* Queue zipkin spans will be consumed from. Defaults to "zipkin-spans". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit doc drift here perhaps elsewhere (default)
/** Lazy creates a connection and a queue before starting consumers */ | ||
static final class LazyInit extends LazyCloseable<Connection> { | ||
final Builder builder; | ||
final AtomicReference<CheckResult> failure = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops this isn't used anymore (or needed)
@Override | ||
public CheckResult check() { | ||
try { | ||
CheckResult failure = connection.failure.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should check the connection, if there's a way to
import static zipkin.TestObjects.LOTS_OF_SPANS; | ||
import static zipkin.collector.rabbitmq.RabbitMQCollector.builder; | ||
|
||
public class RabbitMQCollectorTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit should be renamed to ITRabbitMQCollector (as it uses docker)
assertThat(collector.get().check().ok).isTrue(); | ||
} | ||
|
||
@Test public void startFailsWithInvalidRabbitMqServer() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could possibly be moved to a normal unit test or left.. no matter
@bsideup we are stumped as to why this fails in travis.. do you have some time to help? |
Hi Adrian! I'm happy to help, but can take a look only tomorrow, I hope
it's fine :)
…On Sat, 23 Sep 2017 at 10:19, Adrian Cole ***@***.***> wrote:
@bsideup <https://github.com/bsideup> we are stumped as to why this fails
in travis.. do you have some time to help?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1742 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABAIip9jVuCLMyhO9nN7r7Vx0yCIkmjJks5slL73gaJpZM4PeAF2>
.
|
sounds good to me! thanks, sergei
On Sat, Sep 23, 2017 at 4:31 PM, Sergei Egorov <[email protected]>
wrote:
… Hi Adrian! I'm happy to help, but can take a look only tomorrow, I hope
it's fine :)
On Sat, 23 Sep 2017 at 10:19, Adrian Cole ***@***.***>
wrote:
> @bsideup <https://github.com/bsideup> we are stumped as to why this
fails
> in travis.. do you have some time to help?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <#1742 (comment)>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/
ABAIip9jVuCLMyhO9nN7r7Vx0yCIkmjJks5slL73gaJpZM4PeAF2>
> .
>
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#1742 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAD616uMeGkkRIncw6TvEHbfK3WmoLMMks5slMHzgaJpZM4PeAF2>
.
|
Connection connection; | ||
try { | ||
connection = builder.connectionFactory.newConnection(builder.addresses); | ||
connection.createChannel().queueDeclare(builder.queue, true, false, false, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Channel
created here will remain open until the connection is closed, which is not a big deal but it isn't necessary and it may be asked about by operators looking at open channels on RabbitMQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is implicitly closed via the connection, ya?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, closing the connection will close any Channels that it opened. Ref. https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#close--
for (int i = 0; i < builder.concurrency; i++) { | ||
String name = RabbitMQSpanConsumer.class.getName() + i; | ||
try { | ||
// TODO: Is channel 1-1 with consumer? if not re-use the same channel for each consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, yes. Channels should not be shared among threads. From the Channel JavaDoc:
Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with Publisher Confirms. As such, applications need to use a Channel per thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for the ref
try { | ||
container = new GenericContainer(image) | ||
.withExposedPorts(RABBIT_PORT) | ||
.waitingFor(new HostPortWaitStrategy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for it, it's a default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k
} finally { | ||
if (container != null) { | ||
System.out.println("Stopping docker image " + image); | ||
container.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compute
runs once, lazily, but you call close
many times (after every JUnit statement). This is why the tests are failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm why would this pass locally!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a class rule, it shouldn't run for each test, right?
The error looks like it is failing even before tests are run...
Running zipkin.collector.rabbitmq.ITRabbitMQCollector
ℹ︎ Checking the system...
✔ Docker version is newer than 1.6.0
✔ Docker environment has more than 2GB free
✔ File should be mountable
✔ Exposed port is accessible
Starting docker image rabbitmq:3.6-alpine
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 5.925 s <<< FAILURE! - in zipkin.collector.rabbitmq.ITRabbitMQCollector
[ERROR] zipkin.collector.rabbitmq.ITRabbitMQCollector Time elapsed: 5.925 s <<< ERROR!
java.lang.IllegalStateException: Unable to establish connection to RabbitMQ server
Caused by: java.io.IOException
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
Caused by: java.io.EOFException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsideup
For our other tests run with test containers, I see the following output before the Checking the system...
sequence:
[INFO] Running zipkin.storage.mysql.ITMySQLStorage
07:07:47.617 [main] INFO o.t.d.EnvironmentAndSystemPropertyClientProviderStrategy - Found docker client settings from environment
07:07:47.633 [main] INFO o.t.d.DockerClientProviderStrategy - Found Docker environment with Environment variables, system properties and defaults. Resolved:
dockerHost=unix:///var/run/docker.sock
apiVersion='{UNKNOWN_VERSION}'
registryUrl='https://index.docker.io/v1/'
registryUsername='travis'
registryPassword='null'
registryEmail='null'
dockerConfig='DefaultDockerClientConfig[dockerHost=unix:///var/run/docker.sock,registryUsername=travis,registryPassword=<null>,registryEmail=<null>,registryUrl=https://index.docker.io/v1/,dockerConfig=/home/travis/.docker,sslConfig=<null>,apiVersion={UNKNOWN_VERSION}]'
07:07:47.638 [main] INFO o.testcontainers.DockerClientFactory - Docker host IP address is localhost
07:07:47.780 [main] INFO o.testcontainers.DockerClientFactory - Connected to docker:
Server Version: 17.03.1-ce
API Version: 1.27
Operating System: Ubuntu 14.04.5 LTS
Total Memory: 7479 MB
ℹ︎ Checking the system...
✔ Docker version is newer than 1.6.0
✔ Docker environment has more than 2GB free
✔ File should be mountable
✔ Exposed port is accessible
Note it is entirely missing for the RabbitMQ tests case (as you can see from the output Adrian posted above), which leads me to believe that a different Docker discovery strategy is being used, but looking at the code for the tests I can't figure out why this would be different.
added a commit to undo the lazy stuff (still using a class rule) |
@adriancole does it work locally if you stop your local RabbitMQ and run tests with Docker? I have a feeling that some tests use local RabbitMQ (it explains why it passes locally) |
@adriancole <https://github.com/adriancole> does it work locally if you
stop your local RabbitMQ and run tests with Docker? I have a feeling that
some tests use local RabbitMQ (it explains why it passes locally)
very good guess. sadly after killing my rabbit still passes. I added a log
statement to verify
|
pushed a commit to use travis rabbitmq service (to see if local passes) |
nice work, @shakuzen! |
This adds a RabbitMQ collector module along with its corresponding auto-configuration.
The implementation is largely inspired by the existing collector implementations. In fact, I think some of the logic and tests could be shared between them, but I did not try to tackle that in this PR.
Resolves #1614