Tracing Kafka messages in JVM apps using OpenTelemetry
Matt D'Agostino – August 8th, 2023 – ☕ 12 min read
Tracing messages both within Kafka and between Kafka applications is no trivial task. There are few real "out of the box" solutions. Meaning for a long time, if you wanted to trace messages, it was up to you as the developer to manually instrument your applications to generate and export tracing information.
I don't know about you, but I got better things to be doing. I'd rather be thinking about my actual core application and delivering features instead of developing, testing, rolling out and providing continual updates/fixes to my own tracing implementation 😕. If I had infinite resources I would love to sit there optimizing my applications to the ends of the earth including its tracing solution, but that's not realistic or necessary in many situations 🤷♂️.
Introducing the OpenTelemetry Java Agent
The OpenTelemetry Java Agent is an extremely simple agent that is attached to your JVM application at runtime. It provides tracing through automatic instrumentation for any Java 8+ application. Out of the box, the OpenTelemetry Java Agent supports a large number of popular Java frameworks and libraries allowing automatic instrumentation of HTTP calls, database calls and even the Kafka APIs!
Furthermore, because this is an OpenTelemetry project you can expect this to use open standards and work with a wide range of monitoring tools. In fact, the OpenTelemetry Agent out of the box can support traces in OTLP, Jaeger and Zipkin! 🤩
In fact, the OpenTelemetry Java Agent can do the following:
- Automatic application instrumentation
- Supports SpringBoot, Kafka Consumer/Producer API & Kafka Streams
- Export traces via HTTP/GRPC
- Supports OTLP, Jaeger and Zipkin tracing protocols
- Export Prometheus metrics
- Supports trace context propagation
- Automatic Logging MDC Injection
- Provides an ability to add extensions to the agent
- Supports OpenTelemetry Annotation library for custom AOP implemented traces
What about Micrometer?
Well simply put, at the time of writing Micrometer (even using SpringBoot 3) has no support for automatic tracing for Kafka. Micrometer provides APIs that allow manual instrumentation, however, the OpenTelemetry agent provides the same functionality with zero code changes.
Why OpenTelemetry? 🔎
Chances are if you click on this you already know what OpenTelemetry is. For those who don't though, OpenTelemetry is a collection of APIs, SDKS and tools that allow you to instrument your applications for metrics, logs and traces.
The OpenTelemetry organisation and open-source community do all the hard things like defining a standard protocol for telemetry data as well as creating tools like the OpenTelemetry collector or in our case the OpenTelemetry java agent. The OpenTelemetry framework and toolkit are designed to be vendor and tool-agnostic meaning you can easily integrate with other open-source tools like Jaeger and Prometheus.
Now if you are like me, you might get a bit sceptical about potentially implementing an entire observability suite if the rug could get pulled at any moment. However, the great part about OpenTelemetry is its backing and broad industry adoption.
In fact, OpenTelemetry is a Cloud Native Computing Foundation project similar to other large Open-Source technologies like Kubernetes. To add to this, many observability platforms have already started to add support for OpenTelemetry with platforms like Grafana making OpenTelemetry and similar open-source technologies like Prometheus the default. Even Confluent provide OpenTelemetry metrics in their cloud Kafka service and have engineering teams working on improving the OpenTelemetry Java Agent!
If you would like to read more about OpenTelemetry I would highly encourage checking out the OpenTelemetry Documentation.
How does the agent work?
The OpenTelemetry Java Agent dynamically injects bytecode into the application at runtime allowing it to integrate with popular frameworks and technologies such as SpringBoot and Kafka. This process involves inspecting the application at startup and adding hooks that allow it to intercept particular method calls. From my experience, this does impact the startup time of your application, so ensure to do any performance testing for require before implementing this.
In the case of Kafka, the agent injects tracing information directly into the headers of the Kafka messages. This allows us to trace that message between applications as it traverses Kafka topics.
What's nice about using the OpenTelemetry agent injection is it means we don't need to write any extra code! But on top of that, because the Agent is designed to work with a large number of popular technologies it also will integrate with many popular JVM logging frameworks to inject tracing information into the MDC, allowing you to link logs to traces within your monitoring visualisation software.
Grafana Log
l=INFO h=MATT-XPS c=c.m.e.c.KafkaListener t=org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 trace_id=9ca450e7b1fbd5f1e04b31289937de70, trace_flags=01, span_id=bbed7b3951995dcf | Consumed message: [ExampleTransformedMessage(userId=ee9ccb50-c755-4aea-a09e-5b92485cdeee, name=Rolland Macejkovic, age=49)]
Standard Output Log
'2023-08-08 19:35:59 - c.m.e.component.KafkaListener - trace_id=9ca450e7b1fbd5f1e04b31289937de70, trace_flags=01, span_id=bbed7b3951995dcf__ - Consumed message: [ExampleTransformedMessage(userId=ee9ccb50-c755-4aea-a09e-5b92485cdeee, name=Rolland Macejkovic, age=49)]'
Example Project
To demonstrate how to set up and run the Java Agent I have put together a repository that contains three basic Kafka SpringBoot applications all auto-instrumented with OpenTelemetry all feeding into a Grafana visualisation layer. For the most part, the important code will be in this post, but if you want to follow along or run it locally it's available here on GitHub.
Environment Architecture
This example repository is made up of some key components. To show off the full flexibility of the OpenTelemetry agent, I have created a Kafka Producer, Streams Processor and Consumer applications all auto-instrumented using the Otel Agent.
The repository also contains a Docker Compose project that spins up a single broker Kafka cluster and Grafana stack. If you are keen on trying this on your local computer I would highly appreciate any feedback. But I will preface it by saying 16GB of RAM or more is recommended due to number of containers.
For this post I will keep the agenda to showcasing the agent, but if you'd like to run the example you can follow the README.md in the repository.
Application Architecture
The applications for this example are purposefully kept extremely simple. SpringBoot is used to abstract away a lot of configuration. Messages are serialised/deserialized using JsonSerde (built into SpringBoot using Jackson) before being put on Kafka Topics.
The Producer application will produce a message containing fake user data generated using datafaker onto the
USERS--SRC
Kafka Topic.
logger.info { "Starting message producing with repetitions=[$repetitions]" }
repeat(repetitions) {``
val message = createMessage()
producer.send(message)
Thread.sleep(sendDelayMs)
}
logger.info { "Completed producing messages" }
The Streams Processor consumes from the USERS--SRC
topic concatenating the first and last name into one name field,
as well as calculating the user's age, after which it will publish the transformed message onto the
USERS--TRANSFORMED--STRM
topic. This
streamsBuilder.stream<String, ExampleProducerMessage>(inboundTopic).apply {
this.peek { k, _ -> logger.info { "Consumed and processing message with key=[$k]" } }
.mapValues { value -> mapToTransformedMessage(value) }
.filter { _, value -> value != null }
.peek { k, _ -> logger.info { "Forwarding message with key=[$k]" } }
.to(outboundTopic)
}
Lastly, the consumer consumes messages from the USERS--TRANSFORMED--STRM
topic, writing the messages to the console.
@KafkaListener(topics = ["\${app.consumer.topic}"])
fun listenMessages(message: ExampleTransformedMessage) {
logger.info { "Consumed message: [$message]" }
// Here is where you would then do something with the message such as save to a DB
}
Throughout this whole process, the OpenTelemetry Java agent exports trace information such as trace IDs and span IDs to Tempo over GRPC. The code above is added to illustrate the zero code added to support tracing. It's really that simple 😌
Configuring the OpenTelemetry Agent
To add the agent to your application the only setup that is needed is to download the OpenTelemetry Java agent jar and attach it to your application at startup. That's all you need to start exporting traces. It's seriously that easy.
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=your-service-name \
-jar myapp.jar
The service name above is will what will allow you to identify which applications the traces belong to.
OpenTelemetry has a few different ways you can configure the agent properties:
- System Properties (shown above)
- Environment variables
- Configuration file
However, in most situations, I would suggest a configuration file or environment variables. See Java Agent configuration docs.
Visualizing in Grafana
I chose Grafana for this demonstration because it provides a generous open-source free tier and in my opinion, as an organisation, does a lot for the open-source monitoring community.
After starting the environment, including the docker containers and Kafka applications, we should start to see monitoring data appearing in Grafana. From here we can jump into the Tempo tab and immediately start viewing traces for Kafka messages!
Out of the box, you will immediately notice that the Agent provides a very in-depth view of the application including the journey time as well as application-specific information like the JVM version. Not only do we get app info though, but we also get information related to what the application is doing, which in our case includes information such as the message offset, partition, key, topic etc
For this demonstration, I have also configured the applications to export logging information to Grafana (Loki) as well. The implementation for log exporting here isn't the important part, but rather what's cool is the fact that the agent automatically injects tracing information into the Logging MDC allowing you to create links between tracing information and logs! 🤯 Then within Grafana, it's as simple as modifying your Loki datasource config in Grafana to create a derived field with a link to Loki, and automatically Grafana will start displaying a link to Tempo you can use to navigate from traces to logs!
Bonuses
Open Telemetry Annotations Library
One of the great things about using the OpenTelemetry agent is that it does not limit you to just using its automated spans! Using the OpenTelemetry Annotations framework you can add custom spans to your methods. Realistically most typical operations you would want a span on in a typical Kafka/CRUD API application like database calls will automatically have spans generated for it. However, if you are doing any custom workloads the agent doesn't detect, it's as simple as an annotation.
import io.opentelemetry.instrumentation.annotations.WithSpan;
class MyClass {
@WithSpan
fun myMethod() {
<...>
}
}
You can learn more about annotations with OpenTelemetry here, or you can view my configuration here.
Metrics Exporting with OpenTelemetry Java Agent
You read that right! Using the OpenTelemetry Java Agent not only can you export tracing information, but you can also export Prometheus metrics directly from the agent. This is super handy if your application is not using a framework like SpringBoot which has support for exporting Prometheus metrics out of the box.
Currently, I have found this to be great if you are after in-depth metrics for your application. I will add, from my experience, most of the metrics exported by the OpenTelemetry Agent also appear in the Spring actuator from what I can tell. So it's up to you to experiment and decide which suits you better. My example repository has both Spring Actuator Prometheus and OpenTelemetry Agent Prometheus being ingested into Grafana if you want to compare.
You can learn more about configuring the agent to export Prometheus metrics here. Or you can take a look at my example repository where it is configured, working and ready to clone 😉
Extensions
What if you want to customise the output of tracing data from the agent? Well, OpenTelemetry has thought of that too! In a plugin-like fashion, you can develop and attach your own extensions to the OpenTelemetry java agent that let you alter tracing information before it's exported.
Want to get rid of a span? No problem. Want to edit attributes on the span? No problem. You can take a look here for some example extensions.
Limitations
At the moment, there are some limitations to using the agent to do all your tracing specifically with Kafka Streams. Currently, any operations that use a state store are not traceable, meaning any traces going into the application will end at the state store and a new trace id will be generated for the message leaving the Streams processor. This is due to state stores in Kafka Streams not storing the header information. Meaning any tracing here would need to be implemented manually. Having said that though, Confluent has been spending a lot of time with the OpenTelemetry agent and are experimenting with magic bytes as a means to storing header information with messages into state stores.
Should you use it?
This is something you will have the answer for yourself. As great as the OpenTelemetry agent is, it does add extra overhead to your applications, especially during application startup. For example, if your application does not have much business logic, you may find it easier to just add Confluent interceptors. However, I would say for most situations the agent will be applicable due to its ease of adoption, wide compatibility, open-source status and in-depth spans that can include tracing information for business logic levels of the application.
This post is my opinion and thoughts, but I'm interested to hear yours! Feel free to reach out with any questions or comments. 🙂