From 66950451cf1dda21dbee2d20a026bd5bef092211 Mon Sep 17 00:00:00 2001 From: Tommy Ludwig Date: Wed, 30 Aug 2017 22:16:56 +0900 Subject: [PATCH] RabbitMQ collector This adds a RabbitMQ collector module along with its corresponding auto-configuration. --- pom.xml | 12 + .../collector-rabbitmq/pom.xml | 40 +++ ...kinRabbitMqCollectorAutoConfiguration.java | 62 ++++ .../ZipkinRabbitMqCollectorProperties.java | 113 +++++++ .../collector/rabbitmq/package-info.java | 15 + .../main/resources/META-INF/spring.factories | 2 + ...abbitMqCollectorAutoConfigurationTest.java | 99 ++++++ ...bbitMqCollectorPropertiesOverrideTest.java | 124 +++++++ zipkin-autoconfigure/pom.xml | 1 + zipkin-collector/pom.xml | 1 + zipkin-collector/rabbitmq/README.md | 57 ++++ zipkin-collector/rabbitmq/pom.xml | 77 +++++ .../collector/rabbitmq/RabbitMqCollector.java | 213 ++++++++++++ .../rabbitmq/RabbitMqSpanConsumer.java | 67 ++++ .../collector/rabbitmq/RabbitMqWorker.java | 47 +++ .../collector/rabbitmq/package-info.java | 15 + .../rabbitmq/RabbitMqCollectorTest.java | 303 ++++++++++++++++++ .../rabbitmq/src/test/resources/logback.xml | 15 + zipkin-server/README.md | 9 + zipkin-server/pom.xml | 7 + .../main/resources/zipkin-server-shared.yml | 10 + 21 files changed, 1289 insertions(+) create mode 100644 zipkin-autoconfigure/collector-rabbitmq/pom.xml create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfiguration.java create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorProperties.java create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/package-info.java create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/main/resources/META-INF/spring.factories create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfigurationTest.java create mode 100644 zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorPropertiesOverrideTest.java create mode 100644 zipkin-collector/rabbitmq/README.md create mode 100644 zipkin-collector/rabbitmq/pom.xml create mode 100644 zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqCollector.java create mode 100644 zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqSpanConsumer.java create mode 100644 zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqWorker.java create mode 100644 zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/package-info.java create mode 100644 zipkin-collector/rabbitmq/src/test/java/zipkin/collector/rabbitmq/RabbitMqCollectorTest.java create mode 100644 zipkin-collector/rabbitmq/src/test/resources/logback.xml diff --git a/pom.xml b/pom.xml index ad78af218de..e9d704e08e9 100755 --- a/pom.xml +++ b/pom.xml @@ -282,6 +282,18 @@ ${project.version} + + ${project.groupId} + zipkin-collector-rabbitmq + ${project.version} + + + + ${project.groupId} + zipkin-autoconfigure-collector-rabbitmq + ${project.version} + + ${project.groupId} zipkin-collector-scribe diff --git a/zipkin-autoconfigure/collector-rabbitmq/pom.xml b/zipkin-autoconfigure/collector-rabbitmq/pom.xml new file mode 100644 index 00000000000..01d4add6173 --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/pom.xml @@ -0,0 +1,40 @@ + + + + 4.0.0 + + + io.zipkin.java + zipkin-autoconfigure + 2.0.2-SNAPSHOT + + + zipkin-autoconfigure-collector-rabbitmq + Auto Configuration: RabbitMQ Collector + + + ${project.basedir}/../.. + + + + + ${project.groupId} + zipkin-collector-rabbitmq + + + + diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfiguration.java new file mode 100644 index 00000000000..47248a8ec2f --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfiguration.java @@ -0,0 +1,62 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.autoconfigure.collector.rabbitmq; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.AnnotatedTypeMetadata; +import zipkin.collector.CollectorMetrics; +import zipkin.collector.CollectorSampler; +import zipkin.collector.rabbitmq.RabbitMqCollector; +import zipkin.storage.StorageComponent; + +/** + * Auto-configuration for {@link RabbitMqCollector}. + */ +@Configuration +@Conditional(ZipkinRabbitMqCollectorAutoConfiguration.RabbitMqAddressesSet.class) +@EnableConfigurationProperties(ZipkinRabbitMqCollectorProperties.class) +public class ZipkinRabbitMqCollectorAutoConfiguration { + + @Bean(initMethod = "start") RabbitMqCollector rabbitMq( + ZipkinRabbitMqCollectorProperties properties, + CollectorSampler sampler, CollectorMetrics metrics, StorageComponent storage) { + return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build(); + } + + /** + * This condition passes when {@link ZipkinRabbitMqCollectorProperties#getAddresses()} is set to + * non-empty. + * + *

This is here because the yaml defaults this property to empty like this, and Spring Boot + * doesn't have an option to treat empty properties as unset. + * + *

{@code
+   * addresses: ${RABBIT_ADDRESSES:}
+   * }
+ */ + static final class RabbitMqAddressesSet implements Condition { + @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) { + return !isEmpty(context.getEnvironment().getProperty("zipkin.collector.rabbitmq.addresses")); + } + + private static boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + } +} diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorProperties.java b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorProperties.java new file mode 100644 index 00000000000..26038441cbb --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/ZipkinRabbitMqCollectorProperties.java @@ -0,0 +1,113 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.autoconfigure.collector.rabbitmq; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; +import java.util.List; +import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin.collector.rabbitmq.RabbitMqCollector; + +/** + * Properties for configuring and building a {@link RabbitMqCollector}. + */ +@ConfigurationProperties("zipkin.collector.rabbitmq") +public class ZipkinRabbitMqCollectorProperties { + + /** RabbitMQ server addresses in the form of a (comma-separated) list of host:port pairs */ + private List addresses; + /** Number of concurrent consumers */ + private Integer concurrency = 1; + /** TCP connection timeout in milliseconds */ + private Integer connectionTimeout; + /** RabbitMQ user password */ + private String password; + /** RabbitMQ queue from which to collect the Zipkin spans */ + private String queue; + /** RabbitMQ username */ + private String username; + /** RabbitMQ virtual host */ + private String virtualHost; + + public List getAddresses() { + return addresses; + } + + public void setAddresses(List addresses) { + this.addresses = addresses; + } + + public int getConcurrency() { + return concurrency; + } + + public void setConcurrency(int concurrency) { + this.concurrency = concurrency; + } + + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(Integer connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getVirtualHost() { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public RabbitMqCollector.Builder toBuilder() { + final RabbitMqCollector.Builder result = RabbitMqCollector.builder(); + ConnectionFactory connectionFactory = new ConnectionFactory(); + if (addresses != null) result.addresses(addresses); + if (concurrency != null) result.concurrency(concurrency); + if (connectionTimeout != null) connectionFactory.setConnectionTimeout(connectionTimeout); + if (password != null) connectionFactory.setPassword(password); + if (queue != null) result.queue(queue); + if (username != null) connectionFactory.setUsername(username); + if (virtualHost != null) connectionFactory.setVirtualHost(virtualHost); + result.connectionFactory(connectionFactory); + return result; + } + +} diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/package-info.java b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/package-info.java new file mode 100644 index 00000000000..8f64a721f67 --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/main/java/zipkin/autoconfigure/collector/rabbitmq/package-info.java @@ -0,0 +1,15 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +@javax.annotation.ParametersAreNonnullByDefault +package zipkin.autoconfigure.collector.rabbitmq; diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-rabbitmq/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000000..203ce9a870f --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + zipkin.autoconfigure.collector.rabbitmq.ZipkinRabbitMqCollectorAutoConfiguration diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfigurationTest.java b/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfigurationTest.java new file mode 100644 index 00000000000..fd21097e48a --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorAutoConfigurationTest.java @@ -0,0 +1,99 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import zipkin.autoconfigure.collector.rabbitmq.ZipkinRabbitMqCollectorAutoConfiguration; +import zipkin.collector.CollectorMetrics; +import zipkin.collector.CollectorSampler; +import zipkin.collector.rabbitmq.RabbitMqCollector; +import zipkin.storage.InMemoryStorage; +import zipkin.storage.StorageComponent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment; + +public class ZipkinRabbitMqCollectorAutoConfigurationTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private AnnotationConfigApplicationContext context; + + @After + public void close() { + if (context != null) { + context.close(); + } + } + + @Test + public void doesNotProvideCollectorComponent_whenAddressNotSet() { + context = new AnnotationConfigApplicationContext(); + context.register(PropertyPlaceholderAutoConfiguration.class, + ZipkinRabbitMqCollectorAutoConfiguration.class, InMemoryConfiguration.class); + context.refresh(); + + thrown.expect(NoSuchBeanDefinitionException.class); + context.getBean(RabbitMqCollector.class); + } + + @Test + public void doesNotProvideCollectorComponent_whenAddressesIsEmptyString() { + context = new AnnotationConfigApplicationContext(); + addEnvironment(context, "zipkin.collector.rabbitmq.addresses:"); + context.register(PropertyPlaceholderAutoConfiguration.class, + ZipkinRabbitMqCollectorAutoConfiguration.class, InMemoryConfiguration.class); + context.refresh(); + + thrown.expect(NoSuchBeanDefinitionException.class); + context.getBean(RabbitMqCollector.class); + } + + @Test + @Ignore + public void providesCollectorComponent_whenAddressesSet() { + context = new AnnotationConfigApplicationContext(); + addEnvironment(context, "zipkin.collector.rabbitmq.addresses=localhost:5672"); + context.register(PropertyPlaceholderAutoConfiguration.class, + ZipkinRabbitMqCollectorAutoConfiguration.class, InMemoryConfiguration.class); + context.refresh(); + + assertThat(context.getBean(RabbitMqCollector.class)).isNotNull(); + } + + @Configuration + static class InMemoryConfiguration { + @Bean CollectorSampler sampler() { + return CollectorSampler.ALWAYS_SAMPLE; + } + + @Bean CollectorMetrics metrics() { + return CollectorMetrics.NOOP_METRICS; + } + + @Bean StorageComponent storage() { + return new InMemoryStorage(); + } + } +} diff --git a/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorPropertiesOverrideTest.java b/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorPropertiesOverrideTest.java new file mode 100644 index 00000000000..cedff0cfa9c --- /dev/null +++ b/zipkin-autoconfigure/collector-rabbitmq/src/test/java/zipkin/collector/rabbitmq/ZipkinRabbitMqCollectorPropertiesOverrideTest.java @@ -0,0 +1,124 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import com.rabbitmq.client.Address; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Configuration; +import zipkin.autoconfigure.collector.rabbitmq.ZipkinRabbitMqCollectorProperties; +import zipkin.collector.rabbitmq.RabbitMqCollector; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment; + +@RunWith(Parameterized.class) +public class ZipkinRabbitMqCollectorPropertiesOverrideTest { + + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + + @After + public void close() { + if (context != null) context.close(); + } + + @Parameterized.Parameter(0) public String property; + @Parameterized.Parameter(1) public Object value; + @Parameterized.Parameter(2) public Function + propertiesExtractor; + @Parameterized.Parameter(3) public Function builderExtractor; + + @Parameterized.Parameters(name = "{0}") + public static Iterable data() { + return Arrays.asList( + parameters("addresses", "localhost:5671,localhost:5673", + properties -> convertToStringWithoutListBrackets(properties.getAddresses()), + builder -> convertToStringWithoutListBrackets(builder.addresses)), + parameters("concurrency", 2, + ZipkinRabbitMqCollectorProperties::getConcurrency, + builder -> builder.concurrency), + parameters("connectionTimeout", 30_000, + ZipkinRabbitMqCollectorProperties::getConnectionTimeout, + builder -> builder.connectionFactory.getConnectionTimeout()), + parameters("password", "admin", + ZipkinRabbitMqCollectorProperties::getPassword, + builder -> builder.connectionFactory.getPassword()), + parameters("queue", "zapkin", + ZipkinRabbitMqCollectorProperties::getQueue, + builder -> builder.queue), + parameters("username", "admin", + ZipkinRabbitMqCollectorProperties::getUsername, + builder -> builder.connectionFactory.getUsername()), + parameters("virtualHost", "/hello", + ZipkinRabbitMqCollectorProperties::getVirtualHost, + builder -> builder.connectionFactory.getVirtualHost()) + ); + } + + /** to allow us to define with a lambda */ + static Object[] parameters(String propertySuffix, T value, + Function propertiesExtractor, + Function builderExtractor) { + return new Object[] {"zipkin.collector.rabbitmq." + propertySuffix, value, propertiesExtractor, + builderExtractor}; + } + + @Test + public void canOverrideValueOf() { + addEnvironment(context, property + ":" + value); + + context.register( + PropertyPlaceholderAutoConfiguration.class, + EnableRabbitMqCollectorProperties.class + ); + context.refresh(); + + assertThat(context.getBean(ZipkinRabbitMqCollectorProperties.class)) + .extracting(propertiesExtractor) + .containsExactly(value); + } + + @Test + public void propertyTransferredToCollectorBuilder() { + addEnvironment(context, property + ":" + value); + + context.register( + PropertyPlaceholderAutoConfiguration.class, + EnableRabbitMqCollectorProperties.class + ); + context.refresh(); + + assertThat(context.getBean(ZipkinRabbitMqCollectorProperties.class).toBuilder()) + .extracting(builderExtractor) + .containsExactly(value); + } + + @Configuration + @EnableConfigurationProperties(ZipkinRabbitMqCollectorProperties.class) + static class EnableRabbitMqCollectorProperties { + } + + private static String convertToStringWithoutListBrackets(List list) { + return list.toString().substring(1, list.toString().length() - 1).replaceAll(" ", ""); + } +} diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml index c55a048c8c3..05857a77293 100644 --- a/zipkin-autoconfigure/pom.xml +++ b/zipkin-autoconfigure/pom.xml @@ -35,6 +35,7 @@ ui collector-kafka collector-kafka10 + collector-rabbitmq collector-scribe storage-cassandra storage-cassandra3 diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml index 4dff3d6a0b2..1b432709174 100644 --- a/zipkin-collector/pom.xml +++ b/zipkin-collector/pom.xml @@ -35,5 +35,6 @@ scribe kafka kafka10 + rabbitmq diff --git a/zipkin-collector/rabbitmq/README.md b/zipkin-collector/rabbitmq/README.md new file mode 100644 index 00000000000..81e4fd6979b --- /dev/null +++ b/zipkin-collector/rabbitmq/README.md @@ -0,0 +1,57 @@ +# collector-rabbitmq + +## RabbitMqCollector +This collector consumes a RabbitMQ queue for messages that contain a list of spans. +Its only dependencies besides Zipkin core are the `slf4j-api` and the [RabbitMQ Java Client](https://github.com/rabbitmq/rabbitmq-java-client). + +### Configuration + +The following configuration can be set for the RabbitMQ Collector. + +Property | Environment Variable | Description +--- | --- | --- +`zipkin.collector.rabbitmq.addresses` | `RABBIT_ADDRESSES` | Comma-separated list of RabbitMQ addresses, ex. `localhost:5672,localhost:5673` +`zipkin.collector.rabbitmq.concurrency` | `RABBIT_CONCURRENCY` | Number of concurrent consumers. Defaults to `1` +`zipkin.collector.rabbitmq.connectionTimeout` | `RABBIT_CONNECTION_TIMEOUT` | Milliseconds to wait establishing a connection. Defaults to `60000` (1 minute) +`zipkin.collector.rabbitmq.password` | `RABBIT_PASSWORD`| Password to use when connecting to RabbitMQ. Defaults to `guest` +`zipkin.collector.rabbitmq.queue` | `RABBIT_QUEUE` | Queue from which to collect span messages. Defaults to `zipkin` +`zipkin.collector.rabbitmq.username` | `RABBIT_USER` | Username to use when connecting to RabbitMQ. Defaults to `guest` +`zipkin.collector.rabbitmq.virtualHost` | `RABBIT_VIRTUAL_HOST` | RabbitMQ virtual host to use. Defaults to `/` + +### Caveats + +The configured queue will be idempotently declared as a durable queue. + +This collector uses one connection to RabbitMQ, with the configured `concurrency` number of threads +each using one channel to consume messages. + +Consumption is done with `autoAck` on, so messages that fail to process successfully are not retried. + +## Encoding spans into RabbitMQ messages +The message's body should be the bytes of an encoded list of spans. + +### JSON +A list of Spans in JSON. The first character must be '[' (decimal 91). + +`SpanBytesEncoder.JSON_V2.encodeList(spans)` performs the correct JSON encoding. + +## Local testing + +The following assumes you are running an instance of RabbitMQ locally on the default port (5672). +You can download and install RabbitMQ following [instructions available here](https://www.rabbitmq.com/download.html). +With the [RabbitMQ Management CLI](https://www.rabbitmq.com/management-cli.html) you can easily publish +one-off spans to RabbitMQ to be collected by this collector. + +1. Start RabbitMQ server +2. Start Zipkin server +```bash +$ RABBIT_ADDRESSES=localhost java -jar zipkin.jar +``` +3. Save an array of spans to a file like `sample-spans.json` +```json +[{"traceId":"9032b04972e475c5","id":"9032b04972e475c5","kind":"SERVER","name":"get","timestamp":1505990621526000,"duration":612898,"localEndpoint":{"serviceName":"brave-webmvc-example","ipv4":"192.168.1.113"},"remoteEndpoint":{"serviceName":"","ipv4":"127.0.0.1","port":60149},"tags":{"error":"500 Internal Server Error","http.path":"/a"}}] +``` +4. Publish them using the CLI +```bash +$ rabbitmqadmin publish exchange=amq.default routing_key=zipkin < sample-spans.json +``` diff --git a/zipkin-collector/rabbitmq/pom.xml b/zipkin-collector/rabbitmq/pom.xml new file mode 100644 index 00000000000..5eaa41eacd0 --- /dev/null +++ b/zipkin-collector/rabbitmq/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + + io.zipkin.java + zipkin-collector + 2.0.2-SNAPSHOT + + + zipkin-collector-rabbitmq + Collector: RabbitMQ + Zipkin span collector for RabbitMQ transport + + + ${project.basedir}/../.. + + 4.2.0 + + + + + ${project.groupId} + zipkin + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + com.rabbitmq + amqp-client + ${rabbitmq-client.version} + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + + ${project.groupId} + zipkin + test-jar + test + + + + diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqCollector.java b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqCollector.java new file mode 100644 index 00000000000..42ced1f3783 --- /dev/null +++ b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqCollector.java @@ -0,0 +1,213 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin.collector.Collector; +import zipkin.collector.CollectorComponent; +import zipkin.collector.CollectorMetrics; +import zipkin.collector.CollectorSampler; +import zipkin.internal.LazyCloseable; +import zipkin.storage.StorageComponent; + +import static zipkin.internal.Util.checkNotNull; + +/** + * This collector consumes encoded binary messages from a RabbitMQ queue. + */ +public final class RabbitMqCollector implements CollectorComponent { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMqCollector.class); + + public static Builder builder() { + return new Builder(); + } + + /** Configuration including defaults needed to consume spans from a RabbitMQ queue. */ + public static final class Builder implements CollectorComponent.Builder { + Collector.Builder delegate = Collector.builder(RabbitMqCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + String queue = "zipkin"; + ConnectionFactory connectionFactory; + List addresses; + int concurrency = 1; + + @Override public Builder storage(StorageComponent storage) { + this.delegate.storage(storage); + return this; + } + + @Override public Builder sampler(CollectorSampler sampler) { + this.delegate.sampler(sampler); + return this; + } + + @Override public Builder metrics(CollectorMetrics metrics) { + this.metrics = checkNotNull(metrics, "metrics").forTransport("rabbitmq"); + this.delegate.metrics(this.metrics); + return this; + } + + public Builder addresses(List addresses) { + this.addresses = addresses; + return this; + } + + public Builder concurrency(int concurrency) { + this.concurrency = concurrency; + return this; + } + + public Builder connectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = checkNotNull(connectionFactory, "connectionFactory"); + return this; + } + + /** + * Queue zipkin spans will be consumed from. Defaults to "zipkin-spans". + */ + public Builder queue(String queue) { + this.queue = checkNotNull(queue, "queue"); + return this; + } + + @Override public RabbitMqCollector build() { + return new RabbitMqCollector(this); + } + } + + private final LazyRabbitWorkers rabbitWorkers; + + RabbitMqCollector(Builder builder) { + this.rabbitWorkers = new LazyRabbitWorkers(builder); + } + + @Override + public RabbitMqCollector start() { + this.rabbitWorkers.get(); + return this; + } + + @Override + public CheckResult check() { + try { + CheckResult failure = this.rabbitWorkers.failure.get(); + if (failure != null) return failure; + return CheckResult.OK; + } catch (RuntimeException e) { + return CheckResult.failed(e); + } + } + + @Override + public void close() throws IOException { + this.rabbitWorkers.close(); + } + + static final class LazyRabbitWorkers extends LazyCloseable { + + final Builder builder; + final int concurrency; + final AtomicReference failure = new AtomicReference<>(); + private Connection connection; + + LazyRabbitWorkers(Builder builder) { + this.builder = builder; + this.concurrency = builder.concurrency; + } + + @Override + protected ExecutorService compute() { + ExecutorService pool = concurrency == 1 + ? Executors.newSingleThreadExecutor() + : Executors.newFixedThreadPool(concurrency); + + try { + this.connection = + this.builder.connectionFactory.newConnection(convertAddresses(this.builder.addresses)); + } catch (IOException | TimeoutException e) { + throw new RabbitCollectorStartupException( + "Unable to establish connection to RabbitMQ server", e); + } + + for (int i = 0; i < concurrency; i++) { + final RabbitMqWorker worker = + new RabbitMqWorker(this.builder, this.connection, RabbitMqWorker.class.getName() + i); + pool.execute(guardFailures(worker)); + } + + return pool; + } + + Runnable guardFailures(final Runnable delegate) { + return () -> { + try { + delegate.run(); + } catch (RuntimeException e) { + LOG.error("RabbitMQ collector consumer exited with exception", e); + this.failure.set(CheckResult.failed(e)); + } + }; + } + + @Override + public void close() { + ExecutorService maybeNull = maybeNull(); + if (maybeNull != null) { + maybeNull.shutdownNow(); + try { + maybeNull.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // at least we tried + } finally { + try { + this.connection.close(); + } catch (IOException ignore) { + LOG.info("Failed to close RabbitMQ connection when stopping the collector."); + } + } + } + } + + static class RabbitCollectorStartupException extends RuntimeException { + RabbitCollectorStartupException(String message, Throwable cause) { + super(message, cause); + } + } + } + + static Address[] convertAddresses(List addresses) { + Address[] addressArray = new Address[addresses.size()]; + for (int i = 0; i < addresses.size(); i++) { + String[] splitAddress = addresses.get(i).split(":"); + String host = splitAddress[0]; + Integer port = null; + try { + if (splitAddress.length == 2) port = Integer.parseInt(splitAddress[1]); + } catch (NumberFormatException ignore) { } + addressArray[i] = (port != null) ? new Address(host, port) : new Address(host); + } + return addressArray; + } +} diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqSpanConsumer.java b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqSpanConsumer.java new file mode 100644 index 00000000000..3391166b23d --- /dev/null +++ b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqSpanConsumer.java @@ -0,0 +1,67 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import java.io.IOException; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin.Span; +import zipkin.SpanDecoder; +import zipkin.collector.Collector; +import zipkin.collector.CollectorMetrics; + +import static zipkin.SpanDecoder.DETECTING_DECODER; +import static zipkin.storage.Callback.NOOP; + +/** + * Consumes spans from messages on a RabbitMQ queue. Malformed messages will be discarded. Errors in + * the storage component will similarly be ignored, with no retry of the message. + */ +class RabbitMqSpanConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMqSpanConsumer.class); + + private final Collector collector; + private final CollectorMetrics metrics; + + RabbitMqSpanConsumer(RabbitMqCollector.Builder builder, Channel channel) { + super(channel); + + this.collector = builder.delegate.build(); + this.metrics = builder.metrics; + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + try { + this.metrics.incrementMessages(); + // If we received legacy single-span encoding, decode it into a singleton list + if (body[0] <= 16 && body[0] != 12 /* thrift, but not a list */) { + this.metrics.incrementBytes(body.length); + Span span = SpanDecoder.THRIFT_DECODER.readSpan(body); + this.collector.accept(Collections.singletonList(span), NOOP); + } else { + this.collector.acceptSpans(body, DETECTING_DECODER, NOOP); + } + } catch (RuntimeException e) { + this.metrics.incrementMessagesDropped(); + LOG.debug("Exception while collecting message from RabbitMQ", e); + } + } +} diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqWorker.java b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqWorker.java new file mode 100644 index 00000000000..fdededd2c56 --- /dev/null +++ b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/RabbitMqWorker.java @@ -0,0 +1,47 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import java.io.IOException; + +/** + * A worker thread for concurrent processing of a RabbitMQ queue. This will idempotently declare the + * queue from which it will consume. Auto-acknowledge is enabled so there will be no retrying of + * messages. + */ +class RabbitMqWorker implements Runnable { + private final Connection connection; + private final RabbitMqCollector.Builder builder; + private final String name; + + RabbitMqWorker(RabbitMqCollector.Builder builder, Connection connection, String name) { + this.builder = builder; + this.connection = connection; + this.name = name; + } + + @Override + public void run() { + try { + Channel channel = this.connection.createChannel(); + channel.queueDeclare(this.builder.queue, true, false, false, null); + final RabbitMqSpanConsumer consumer = new RabbitMqSpanConsumer(this.builder, channel); + channel.basicConsume(this.builder.queue, true, this.name, consumer); + } catch (IOException e) { + throw new IllegalStateException("Failed to start RabbitMQ consumer " + this.name, e); + } + } +} diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/package-info.java b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/package-info.java new file mode 100644 index 00000000000..db17a9b17e9 --- /dev/null +++ b/zipkin-collector/rabbitmq/src/main/java/zipkin/collector/rabbitmq/package-info.java @@ -0,0 +1,15 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +@javax.annotation.ParametersAreNonnullByDefault +package zipkin.collector.rabbitmq; diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin/collector/rabbitmq/RabbitMqCollectorTest.java b/zipkin-collector/rabbitmq/src/test/java/zipkin/collector/rabbitmq/RabbitMqCollectorTest.java new file mode 100644 index 00000000000..b4e57eaf09f --- /dev/null +++ b/zipkin-collector/rabbitmq/src/test/java/zipkin/collector/rabbitmq/RabbitMqCollectorTest.java @@ -0,0 +1,303 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; +import org.testcontainers.containers.GenericContainer; +import zipkin.Codec; +import zipkin.Span; +import zipkin.collector.InMemoryCollectorMetrics; +import zipkin.collector.rabbitmq.RabbitMqCollector.Builder; +import zipkin.collector.rabbitmq.RabbitMqCollector.LazyRabbitWorkers.RabbitCollectorStartupException; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.V2SpanConverter; +import zipkin.storage.AsyncSpanConsumer; +import zipkin.storage.AsyncSpanStore; +import zipkin.storage.SpanStore; +import zipkin.storage.StorageComponent; +import zipkin2.codec.SpanBytesEncoder; + +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.LOTS_OF_SPANS; +import static zipkin.TestObjects.TRACE; +import static zipkin.collector.rabbitmq.RabbitMqCollector.convertAddresses; + +public class RabbitMqCollectorTest { + + private static final int RABBIT_PORT = 5672; + private static final String RABBIT_DOCKER_IMAGE = "rabbitmq:3.6-alpine"; + + @ClassRule public static GenericContainer rabbitmq = + new GenericContainer(RABBIT_DOCKER_IMAGE) + .withExposedPorts(RABBIT_PORT); + @ClassRule public static Timeout globalTimeout = Timeout.seconds(180); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + private List dockerRabbitAddress = Collections.singletonList( + rabbitmq.getContainerIpAddress() + ":" + rabbitmq.getMappedPort(RABBIT_PORT)); + + private InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + private InMemoryCollectorMetrics rabbitMetrics = metrics.forTransport("rabbitmq"); + + private CopyOnWriteArraySet threadsProvidingSpans = new CopyOnWriteArraySet<>(); + private LinkedBlockingQueue> receivedSpans = new LinkedBlockingQueue<>(); + private AsyncSpanConsumer consumer = (spans, callback) -> { + threadsProvidingSpans.add(Thread.currentThread()); + receivedSpans.add(spans); + callback.onSuccess(null); + }; + + @Test + public void checkPasses() throws Exception { + try (RabbitMqCollector collector = builder().build()) { + assertThat(collector.check().ok).isTrue(); + } + } + + @Test + public void startFailsWithInvalidRabbitMqServer() throws Exception { + // we can be pretty certain RabbitMQ isn't running on localhost port 80 + String notRabbitMqAddress = "localhost:80"; + try (RabbitMqCollector collector = builder() + .addresses(Collections.singletonList(notRabbitMqAddress)).build()) { + thrown.expect(RabbitCollectorStartupException.class); + thrown.expectMessage("Unable to establish connection to RabbitMQ server"); + collector.start(); + } + } + + /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */ + @Test + public void messageWithSingleThriftSpan() throws Exception { + Builder builder = builder(); + + byte[] bytes = Codec.THRIFT.writeSpan(TRACE.get(0)); + produceSpans(bytes, builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.take()).containsExactly(TRACE.get(0)); + } + + assertThat(rabbitMetrics.messages()).isEqualTo(1); + assertThat(rabbitMetrics.bytes()).isEqualTo(bytes.length); + assertThat(rabbitMetrics.spans()).isEqualTo(1); + } + + /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */ + @Test + public void messageWithMultipleSpans_thrift() throws Exception { + Builder builder = builder(); + + byte[] bytes = Codec.THRIFT.writeSpans(TRACE); + produceSpans(bytes, builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.take()).containsExactlyElementsOf(TRACE); + } + + assertThat(rabbitMetrics.messages()).isEqualTo(1); + assertThat(rabbitMetrics.bytes()).isEqualTo(bytes.length); + assertThat(rabbitMetrics.spans()).isEqualTo(TRACE.size()); + } + + /** Ensures list encoding works: a json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json() throws Exception { + Builder builder = builder(); + + byte[] bytes = Codec.JSON.writeSpans(TRACE); + produceSpans(bytes, builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.take()).containsExactlyElementsOf(TRACE); + } + + assertThat(rabbitMetrics.messages()).isEqualTo(1); + assertThat(rabbitMetrics.bytes()).isEqualTo(bytes.length); + assertThat(rabbitMetrics.spans()).isEqualTo(TRACE.size()); + } + + /** Ensures list encoding works: a version 2 json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json2() throws Exception { + Builder builder = builder(); + + List spans = Arrays.asList( + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]), + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]) + ); + + byte[] message = SpanBytesEncoder.JSON_V2.encodeList(V2SpanConverter.fromSpans(spans)); + + produceSpans(message, builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + // don't wait forever if no messages are in the queue + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsAll(spans); + } + + assertThat(rabbitMetrics.messages()).isEqualTo(1); + assertThat(rabbitMetrics.messagesDropped()).isZero(); + assertThat(rabbitMetrics.bytes()).isEqualTo(message.length); + assertThat(rabbitMetrics.spans()).isEqualTo(spans.size()); + } + + /** Ensures malformed spans don't hang the collector */ + @Test + public void skipsMalformedData() throws Exception { + Builder builder = builder(); + + produceSpans(Codec.THRIFT.writeSpans(TRACE), builder); + produceSpans(new byte[0], builder); + produceSpans("[\"='".getBytes(), builder); // screwed up json + produceSpans("malformed".getBytes(), builder); + produceSpans(Codec.THRIFT.writeSpans(TRACE), builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + // the only way we could read this is if the malformed spans were skipped. + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + } + + assertThat(rabbitMetrics.messagesDropped()).isEqualTo(3); + } + + /** Guards against errors that leak from storage, such as InvalidQueryException */ + @Test + public void skipsOnSpanConsumerException() throws Exception { + AtomicInteger counter = new AtomicInteger(); + final StorageComponent storage = buildStorage((spans, callback) -> { + if (counter.getAndIncrement() == 1) { + callback.onError(new RuntimeException("storage fell over")); + } else { + receivedSpans.add(spans); + callback.onSuccess(null); + } + }); + Builder builder = builder().storage(storage); + + produceSpans(Codec.THRIFT.writeSpans(TRACE), builder); + produceSpans(Codec.THRIFT.writeSpans(TRACE), builder); // tossed on error + produceSpans(Codec.THRIFT.writeSpans(TRACE), builder); + + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + // the only way we could read this, is if the malformed span was skipped. + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + } + + assertThat(rabbitMetrics.spansDropped()).isEqualTo(TRACE.size()); + } + + @Test + public void messagesDistributedAcrossMultipleThreadsSuccessfully() throws Exception { + Builder builder = builder().concurrency(2); + + final byte[] traceBytes = Codec.THRIFT.writeSpans(TRACE); + try (RabbitMqCollector collector = builder.build()) { + collector.start(); + produceSpans(traceBytes, builder); + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + produceSpans(traceBytes, builder); + assertThat(receivedSpans.poll(1, TimeUnit.SECONDS)).containsExactlyElementsOf(TRACE); + } + + assertThat(threadsProvidingSpans.size()).isEqualTo(2); + + assertThat(rabbitMetrics.messages()).isEqualTo(2); + assertThat(rabbitMetrics.bytes()).isEqualTo(traceBytes.length * 2); + assertThat(rabbitMetrics.spans()).isEqualTo(TRACE.size() * 2); + } + + private void produceSpans(byte[] message, Builder builder) throws IOException, TimeoutException { + new RabbitMqProducer(builder).publishMessage(message, builder.queue).close(); + } + + private Builder builder() { + return new Builder() + .addresses(dockerRabbitAddress) + .connectionFactory(new ConnectionFactory()) + .metrics(metrics) + .storage(buildStorage(consumer)); + } + + private StorageComponent buildStorage(final AsyncSpanConsumer spanConsumer) { + return new StorageComponent() { + @Override public SpanStore spanStore() { + throw new AssertionError(); + } + + @Override public AsyncSpanStore asyncSpanStore() { + throw new AssertionError(); + } + + @Override public AsyncSpanConsumer asyncSpanConsumer() { + return spanConsumer; + } + + @Override public CheckResult check() { + return CheckResult.OK; + } + + @Override public void close() { + throw new AssertionError(); + } + }; + } + + private class RabbitMqProducer { + private final Connection connection; + private final Channel channel; + + RabbitMqProducer(Builder builder) throws IOException, TimeoutException { + this.connection = + builder.connectionFactory.newConnection(convertAddresses(builder.addresses)); + this.channel = this.connection.createChannel(); + // without a durable queue existing, messages published before the collector is started are lost + this.channel.queueDeclare(builder.queue, true, false, false, null); + } + + RabbitMqProducer publishMessage(byte[] message, String queue) throws IOException { + this.channel.basicPublish("", queue, null, message); + return this; + } + + void close() throws IOException { + this.connection.close(); + } + } +} diff --git a/zipkin-collector/rabbitmq/src/test/resources/logback.xml b/zipkin-collector/rabbitmq/src/test/resources/logback.xml new file mode 100644 index 00000000000..cb6af65c481 --- /dev/null +++ b/zipkin-collector/rabbitmq/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 578768f6af0..c8f79b4f9a0 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -293,6 +293,15 @@ prefixed system property: $ KAFKA_ZOOKEEPER=127.0.0.1:2181 java -Dzipkin.collector.kafka.overrides.auto.offset.reset=largest -jar zipkin.jar ``` +### RabbitMQ collector +The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the `addresses` for the RabbitMQ server(s) is set. + +Example usage: + +```bash +$ RABBIT_ADDRESSES=localhost java -jar zipkin.jar +``` + ### 128-bit trace IDs Zipkin supports 64 and 128-bit trace identifiers, typically serialized diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 3289756131e..7154bf20d74 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -124,6 +124,13 @@ true
+ + + ${project.groupId} + zipkin-autoconfigure-collector-rabbitmq + true + + ${project.groupId} diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 4f5c9133ce0..d64bc930811 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -27,6 +27,16 @@ zipkin: enabled: ${SCRIBE_ENABLED:false} category: zipkin port: ${COLLECTOR_PORT:9410} + rabbitmq: + # RabbitMQ server address list (comma-separated list of host:port) + addresses: ${RABBIT_ADDRESSES:} + concurrency: ${RABBIT_CONCURRENCY:1} + # TCP connection timeout in milliseconds + connection-timeout: ${RABBIT_CONNECTION_TIMEOUT:60000} + password: ${RABBIT_PASSWORD:guest} + queue: ${RABBIT_QUEUE:zipkin} + username: ${RABBIT_USER:guest} + virtual-host: ${RABBIT_VIRTUAL_HOST:/} query: enabled: ${QUERY_ENABLED:true} # 1 day in millis