Table of contents
OpenTelemetry Context Propagation with Rust
In this article we'll dive into distributed tracing with Rust.
For a project implementing cryptographic algorithms in multiple distributed state machines we were looking for a way to analyze what's going on during the execution of the algorithms.
I suggested OpenTelemetry since I've seen it used for distributed tracing in the past, however I didn't have any experience with OpenTelemetry in Rust.
It was quite easy to setup an application based on the various tracing
and opentelemetry
crates, however neither the documentation of those crates nor any articles online1 did describe how to make tracing work accross multiple distributed services.
So with this article I hope to contribute to that.
This article assumes basic knowledge about tracing and OpenTelemetry.
The Application
The example application that we'll consider in this article is a simple Ping-Pong application that uses Apache Kafka to exchange messages.
The Ping Client will send a Ping
message to the ping
queue, the Ping Server will respond by sending a corresponding Pong
message to the pong
queue.
The Ping Client will terminate after receiving the Pong
message.
The source code for this example application can be found at https://github.com/peterpaul/kafka-ping-stm/tree/docs/article. This article will contain extracts and simplified snippets from that repository, but this article will not give the full code that is needed to create this application from scratch. In addition the source code listed in this article will concentrate on the Ping Client, and only highlight the concepts used in the Ping Server. If you want to make the code from this article compile, some blanks will need to be filled from the source code repository. Also note that since the application was developed as a proof-of-concept, the code is not production quality, especially w.r.t. error handling.
Oblivious State Machine
The Ping client and server components are implemented as state machines (STM) using the oblivious-state-machine crate. Every state can:
- emit outgoing messages upon initialization
- receive incoming messages which can modify the internal state
- advance to the next state, based on the internal state
The advantage of this approach is that the network layer is disconnected from the state logic.
The relevant API that is needed to implement a State
is included below.
1 /// Message and error types
2 pub trait StateTypes {
3 type In;
4 type Out;
5 type Err: Debug;
6 }
7
8 pub enum DeliveryStatus<U, E: Debug> {
9 Delivered,
10 Unexpected(U),
11 Error(E),
12 }
13
14 pub type BoxedState<Types> = Box<dyn State<Types> + Send>;
15
16 pub enum Transition<Types: StateTypes> {
17 Same,
18 Next(BoxedState<Types>),
19 Terminal,
20 }
21
22 pub trait State<Types: StateTypes>: Downcast {
23 fn desc(&self) -> String;
24 fn initialize(&self) -> Vec<Types::Out> { Vec::new() }
25 fn deliver(&mut self, message: Types::In) -> DeliveryStatus<Types::In, Types::Err> {
26 DeliveryStatus::Unexpected(message)
27 }
28 fn advance(&self) -> Result<Transition<Types>, Types::Err>;
29 }
Ping Client
The Ping Client application is implemented as a simple command-line application.
It will send a Ping
, wait for the corresponding Pong
and terminate.
The Ping Client STM is implemented using only a single state PingState
.
The plain arrows show the state transitions, and the dotted arrows show the incoming and outgoing messages.
The PingState
contains the ping_to_send
. When the STM is running the PingState
performs the following logic:
- Emits the
ping_to_send
upon initialization - Waits until it receives the corresponding
Pong
- Terminates the STM
Defining the State Machine
We start by defining the types we use in our Ping Client STM.
1 use oblivious_state_machine::state::*;
2 use serde::{Deserialize, Serialize};
3
4 #[derive(Clone, Deserialize, Serialize)]
5 struct Ping { session_id: Uuid };
6 #[derive(Clone, Deserialize, Serialize)]
7 struct Pong { session_id: Uuid };
8
9 struct Types;
10 impl StateTypes for Types {
11 type In = Pong;
12 type Out = Ping;
13 type Err = String;
14 }
The Ping
and Pong
message types contain a session_id
in order to recognize corresponding messages.
We do not use any errors, however because tracing requires that it implements Display
we use the String
type for errors.
With these types we can implement the PingState
state as follows.
15 #[derive(Clone)]
16 struct PingState {
17 ping_to_send: Ping,
18 received_pong: Option<Pong>,
19 }
20
21 impl PingState {
22 fn new(ping_to_send: Ping) -> Self {
23 Self { ping_to_send, received_pong: None, }
24 }
25 }
26
27 impl State<Types> for PingState {
28 fn desc(&self) -> String { "PingState".to_owned() }
29
30 fn initialize(&self) -> Vec<Ping> {
31 vec![self.ping_to_send.clone()]
32 }
33
34 fn deliver(&mut self, message: Pong) -> DeliveryStatus<Pong, String> {
35 self.received_pong = Some(message);
36 DeliveryStatus::Delivered
37 }
38
39 fn advance(&self) -> Result<Transition<Types>, String> {
40 Ok(match &self.received_pong {
41 Some(_pong) => Transition::Terminal,
42 None => Transition::Same,
43 })
44 }
45 }
We emit the ping_to_send
on initialization, and wait until a Pong
has been delivered via the deliver
function.
When a Pong
has been delivered, the received_pong
field is set to the received Pong
.
In the advance
method this field is checked, and as long as it is not set, we remain in the same state, once the Pong
has been received we advance to the terminal state.
Running the State Machine
The STM can be executed using TimeBoundStateMachineRunner
, which internally uses tokio channels to exchange messages.
The following code snippet is an almost complete implementation of the main application, the only thing that is missing is the networking part.
1 use tokio::sync::{mpsc, oneshot};
2
3 #[tokio::main]
4 async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
5 let session_id = Uuid::new_v4();
6 let ping_to_send = Ping::new(session_id);
7 let initial_state = Box::new(SendingPing::new(ping_to_send));
8 let mut state_machine_runner = TimeBoundStateMachineRunner::new("Ping".into(), initial_state, Duration::from_secs(15));
9 let (state_machine_tx, mut state_machine_rx) = mpsc::unbounded_channel();
10 state_machine_runner.run(state_machine_tx);
11
12 // Open channel to receive incoming pongs from Kafka
13 let mut incoming: mpsc::UnboundedReceiver<Pong> = todo!();
14
15 let res = loop {
16 select! {
17 Some(pong) = incoming.recv() => {
18 if pong.session_id() == session_id {
19 state_machine_runner.deliver(pong).unwrap();
20 }
21 }
22 Some(stm_event) = state_machine_rx.recv() => {
23 match stm_event {
24 Either::Messages { messages, .. } => {
25 for ping in messages {
26 kafka_send_message(&mut producer, "ping", ping)?;
27 }
28 }
29 Either::Result { result, .. } => {
30 break result;
31 }
32 }
33 }
34 }
35 }
36
37 Ok(())
38 }
Let us go through this piece of code step by step.
- In line 8 the
TimeBoundStateMachineRunner
is instantiated. The returned handle can be used to interact with the STM. - In line 10 the STM is started, using the
state_machine_tx
sender to communicate back messages and the final result to the main loop. - Line 13 opens a channel via which incoming messages are received from the network. This will be explained in the following paragraph about Networking.
- Line 15 is the beginning of the main application loop. The
incoming
andstate_machine_rx
channels are being polled for updates. - In line 17 incoming Pongs are delivered to the STM if they belong to this session.
- Line 22 handles STM events, which can be either messages or a final result.
- In line 24 outgoing Pings are received from the STM and sent via Kafka. The definition of
kafka_send_message
will be given in the following paragraph about Networking. - In line 29 the final result of the STM is received, and the main application loop is exited.
Networking
In our application we use Apache Kafka to exchange the messages. The library that we use is the kafka crate, which is a pure Rust library for Apache Kafka.
As discussed in the previous section, incoming messages are received via a channel that receives the incoming messages from a separate tokio task. The function below is used to setup that task. The code is generic, so that we can also use this for the Ping Server.
1 /// Spawns a tokio task that polls `consumer`.
2 /// Returns the task handle and a message receiver and shutdown transmitter.
3 pub fn spawn_kafka_consumer_task<T>(
4 mut consumer: Consumer,
5 ) -> (
6 tokio::task::JoinHandle<()>,
7 mpsc::UnboundedReceiver<T>,
8 oneshot::Sender<()>,
9 )
10 where
11 T: DeserializeOwned + Debug + Send + Sync + 'static,
12 {
13 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
14 let (message_tx, message_rx) = mpsc::unbounded_channel::<T>();
15
16 // Polls `consumer`, and sends all incoming messages to `message_tx`.
17 // Exits when an event is received on `shutdown_rx`.
18 let kafka_consumer_task = tokio::task::spawn_blocking(move || {
19 while shutdown_rx.try_recv().is_err() {
20 for msg_result in consumer.poll().unwrap().iter() {
21 for msg in msg_result.messages() {
22 let message: T = serde_json::from_slice(msg.value).unwrap();
23 message_tx.send(message).unwrap();
24 }
25 consumer.consume_messageset(msg_result).unwrap();
26 }
27 consumer.commit_consumed().unwrap();
28 }
29 });
30
31 (kafka_consumer_task, message_rx, shutdown_tx)
32 }
We open two channels, a oneshot
channel which is used to receive a termination request, and a mpsc
channel which is used to transmit the incoming messages.
In line 18 a new tokio task is spawned that continuously polls the Kafka consumer for messages.
Any incoming message is deserialized and send via the message_tx
channel.
The code that we use to send messages to a Kafka queue is the following. Again this code is generic so that we can also use it for the Ping Server.
1 pub fn kafka_send_message<T>(
2 producer: &mut Producer, // Kafka Producer
3 queue: &str, // Name of Kafka queue
4 message: T, // Message to send
5 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
6 where
7 T: serde::Serialize, // Message must be Serializable
8 {
9 let json_message = serde_json::to_string_pretty(&message)?;
10 let record = Record::from_value(queue, json_message);
11 producer.send(&record).map_err(|e| e.into())
12 }
Messages are serialized to JSON, and sent using the Kafka producer.
Ping Server
The Ping Server is implemented as a server application.
It continuously polls the Kafka consumer for Ping
messages, and for each Ping
message it spawns a new StateMachineRunner
.
All active STMs are stored in a HashMap.
The Ping Server STM is implemented using two states, ListeningForPing
and SendingPong
.
After emitting the Pong, the SendingPong
state waits until it gets a PongSent
event with the confirmation that the Pong
has been sent.
This ensures that the Pong
has actually been forwarded to Apache Kafka, and is not still in the outgoing
channel when the STM is closed.
We will not include the source code for these states in this article, but refer to the GitHub repository instead.
Distributed Tracing
We want our application to emit tracing events so that we can understand what exactly happens in our distributed application.
The repository contains a try_init_tracing
function, that will setup tracing and OpenTelemetry with Jaeger.
Jaeger can be started by running docker-compose up
.
To emit the tracing events, we must instrument our application to do so. When adding tracing to these services, we face 2 main challenges.
- The STMs are running concurrently as different processes, potentially even on different machines, so we don't have a shared Span which lives in memory.
- We want to see a combined trace for both the Client and Server.
Tracing an asynchronous application
We are interested in the progression through the STM, and want to instrument all State
operations.
Luckily the oblivious_state_machine
library has a feature tracing
2 that enables just this and ensures that the instrumented spans fall under the same span for the whole STM.
In the following code block we instrument all the state operations. We omit self
from the trace using the skip
directive, and add a new field state
that contains the state's description.
1 struct PingState {
2 ping_to_send: Ping,
3 received_pong: Option<Pong>,
4 }
5
6 impl State<Types> for PingState {
7 #[tracing::instrument(skip(self), fields(state = self.desc()))]
8 fn initialize(&self) -> Vec<Ping> { todo!() }
9
10 #[tracing::instrument(skip(self), fields(state = self.desc()))]
11 fn deliver(&mut self, message: Pong) -> DeliveryStatus<Pong, ()> { todo!() }
12
13 #[tracing::instrument(skip(self), fields(state = self.desc()))]
14 fn advance(&self) -> Result<Transition<Types>, ()> { todo!() }
15 }
When the tracing
feature is enabled, the TimeBoundStateMachineRunner
must be constructed with a Span
:
1 let ping_span = tracing::info_span!("ping span");
2 let session_id = Uuid::new_v4();
3 let ping_to_send = Ping::new(session_id);
4 let initial_state: BoxedState<Types> = Box::new(PingState::new(ping_to_send));
5 let mut state_machine_runner = TimeBoundStateMachineRunner::new(
6 format!("Ping:{}", session_id).into(),
7 initial_state,
8 Duration::from_secs(15),
9 ping_span,
10 );
The tracing
feature also adds a span
field to the emitted messages and final result.
This field is used in the next section where we will need to communicate the span from the Ping client to the Ping Server.
We can obtain the span
field when we deconstruct Either::Messages
like this:
1 Some(stm_event) = state_machine_rx.recv() => {
2 match stm_event {
3 Either::Messages { messages, span, .. } => {
4 for ping in messages {
5 // include `span` when sending `ping`
6 todo!();
7 }
8 }
9 // ...
10 }
11 }
Context Propagation
In order to nest the Ping server spans under the Ping client spans, we have to use the concept of context propagators. Context propagation works in two steps, the client injects the span context into messages that are sent to the server, and the server extracts the span context from messages received from clients. The server then creates a new span and sets its parent context to the extracted context from the incoming message.
The opentelemetry
crate provides a TextMapPropagator
that allows us to inject or extract context.
Context Injection
The TextMapPropagator
can inject the context into any type that implements the Injector
trait.
The context is a collection of key-value pairs, so a HashMap
is a good fit to hold the context.
We define our own PropagationContext
struct, so that we can define the inject
function, which performs the actual injection.
1 use opentelemetry::{
2 global,
3 propagation::Injector,
4 };
5
6 /// Serializable datastructure to hold the opentelemetry propagation context.
7 #[derive(Debug, Clone, Serialize, Deserialize)]
8 pub struct PropagationContext(HashMap<String, String>);
9
10 impl PropagationContext {
11 fn empty() -> Self {
12 Self(HashMap::new())
13 }
14
15 pub fn inject(context: &opentelemetry::Context) -> Self {
16 global::get_text_map_propagator(|propagator| {
17 let mut propagation_context = PropagationContext::empty();
18 propagator.inject_context(context, &mut propagation_context);
19 propagation_context
20 })
21 }
22 }
23
24 impl Injector for PropagationContext {
25 fn set(&mut self, key: &str, value: String) {
26 self.0.insert(key.to_owned(), value);
27 }
28 }
Since the PropagationContext
must be serialized over the network, we wrap the message body and the propagation context together in a SpannedMessage
.
1 #[derive(Debug, Clone, Serialize, Deserialize)]
2 pub struct SpannedMessage<T: Debug + Clone> {
3 context: PropagationContext,
4 body: T,
5 }
6
7 impl<T: Debug + Clone> SpannedMessage<T> {
8 pub fn new(context: PropagationContext, body: T) -> Self {
9 Self { context, body }
10 }
11
12 pub fn unwrap(self) -> T {
13 self.body
14 }
15
16 pub fn context(&self) -> &PropagationContext {
17 &self.context
18 }
19 }
With all this in place we can wrap everything together from the message sending code.
We'll instrument the kafka_send_message
and introduce a new variant that takes an additional Span
and creates and sends a SpannedMessage
to kafka.
1 #[tracing::instrument(skip(producer), err)]
2 pub fn kafka_send_message<T>(
3 producer: &mut Producer,
4 queue: &str,
5 message: T,
6 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
7 where
8 T: serde::Serialize + Debug,
9 {
10 let json_message = serde_json::to_string_pretty(&message)?;
11 let record = Record::from_value(queue, json_message);
12 producer.send(&record).map_err(|e| e.into())
13 }
14
15 pub fn kafka_send_message_with_span<T>(
16 producer: &mut Producer,
17 queue: &str,
18 message: T,
19 span: tracing::Span,
20 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
21 where
22 T: serde::Serialize + Debug + Clone,
23 {
24 span.in_scope(|| { // trace kafka_send_message in this span
25 let propagation_context = PropagationContext::inject(&span.context());
26 let spanned_message = SpannedMessage::new(propagation_context, message);
27 kafka_send_message(producer, queue, spanned_message)
28 })
29 }
When we run the application with the above modifications, we can observe a new context
field in the Ping
messages.
This field includes a uber-trace-id
that contains the trace id, the span id, and some flags.
{
"context": {
"uber-trace-id": "a22cd50e4943b37770c4363d91d2a68e:a9391a3307c15cc7:0:1"
},
"body": {
"session_id": "0ff45015-0618-41f1-bc2f-d1b960588f36"
}
}
Context Extraction
In the Ping server we will have to extract the span context from the incoming message, and instantiate a new span with it.
The TextMapPropagator
can extract the span context from anything that implements the Extractor
trait.
So we implement Extractor
for our PropagationContext
and add the extract
function to it that performs the actual extraction.
1 use opentelemetry::{
2 global,
3 propagation::Extractor,
4 };
5
6 /// Serializable datastructure to hold the opentelemetry propagation context.
7 #[derive(Debug, Clone, Serialize, Deserialize)]
8 pub struct PropagationContext(HashMap<String, String>);
9
10 impl PropagationContext {
11 // ...
12 pub fn extract(&self) -> opentelemetry::Context {
13 global::get_text_map_propagator(|propagator| propagator.extract(self))
14 }
15 }
16
17 impl Extractor for PropagationContext {
18 fn get(&self, key: &str) -> Option<&str> {
19 let key = key.to_owned();
20 self.0.get(&key).map(|v| v.as_ref())
21 }
22
23 fn keys(&self) -> Vec<&str> {
24 self.0.keys().map(|k| k.as_ref()).collect()
25 }
26 }
27
Now there is only one thing left for us to do.
In the Ping Server we have to extract the context from the incoming message. This is done in the following manner:
1 let ping: SpannedMessage<Ping> = todo!(); // Incoming message
2
3 // Create new span, and set the parent to the extracted context
4 let pong_span = info_span!("pong span");
5 let parent_context = ping.context().extract();
6 pong_span.set_parent(parent_context);
7
8 // Unwrap the message
9 let ping: Ping = ping.unwrap();
10
11 let initial_state: BoxedState<Types> = Box::new(ListeningForPing::new());
12
13 // Construct the STM runner with the pong_span
14 let mut state_machine_runner = TimeBoundStateMachineRunner::new(
15 "pong".into(),
16 initial_state,
17 Duration::from_secs(5),
18 pong_span,
19 );
This code will setup a new pong_span
with as parent_context
the extracted context from the message.
This pong_span
is then passed to the TimeBoundStateMachineRunner
in the constructor.
Now the span of the Ping server application is nicely nested under the span of the Ping client as we can see in the following screenshot.
Note that in the screenshot above all state machine operations are nested under the run
span, which is created by the tracing
feature of the oblivious_state_machine
crate.
kafka_send_message
is performed from the main thread/task when receiving the message from the state_machine_rx
receiver, and thus is not nested under the run
span.
This also explains why, when looking at the time progression, the kafka_send_message
span overlaps with the advance
span, these spans are running in two different concurrent tokio tasks.
Conclusion
In this article you have seen how distributed tracing can be added to a distributed application that uses Apache Kafka for message exchange. We have implemented this application with the Oblivious State Machine library.
The principles that we have used are:
- We implemented our own serializable
PropagationContext
that implementsInjector
andExtractor
, to inject and extract the span context. - The
PropagationContext
is included in the json message that is sent to Kafka, by injecting thespan
field from the emitted state machine messages. - The Ping server extracts the parent span from the
PropagationContext
and creates a new state on top of that.
If there is one thing that I want you to remember from this article, it is this:
Whenever you need a trace that includes multiple services, think about context propagation.
Footnotes
1 In the meantime some articles have been published, e.g. Distributed Tracing in Rust
2 Implemented by yours truly