OpenTelemetry Context Propagation with Rust

OpenTelemetry Context Propagation with Rust

Featured on Hashnode

OpenTelemetry Context Propagation with Rust

In this article we'll dive into distributed tracing with Rust. For a project implementing cryptographic algorithms in multiple distributed state machines we were looking for a way to analyze what's going on during the execution of the algorithms. I suggested OpenTelemetry since I've seen it used for distributed tracing in the past, however I didn't have any experience with OpenTelemetry in Rust. It was quite easy to setup an application based on the various tracing and opentelemetry crates, however neither the documentation of those crates nor any articles online1 did describe how to make tracing work accross multiple distributed services. So with this article I hope to contribute to that.

This article assumes basic knowledge about tracing and OpenTelemetry.

The Application

The example application that we'll consider in this article is a simple Ping-Pong application that uses Apache Kafka to exchange messages. The Ping Client will send a Ping message to the ping queue, the Ping Server will respond by sending a corresponding Pong message to the pong queue. The Ping Client will terminate after receiving the Pong message.

article-architecture.png

The source code for this example application can be found at https://github.com/peterpaul/kafka-ping-stm/tree/docs/article. This article will contain extracts and simplified snippets from that repository, but this article will not give the full code that is needed to create this application from scratch. In addition the source code listed in this article will concentrate on the Ping Client, and only highlight the concepts used in the Ping Server. If you want to make the code from this article compile, some blanks will need to be filled from the source code repository. Also note that since the application was developed as a proof-of-concept, the code is not production quality, especially w.r.t. error handling.

Oblivious State Machine

The Ping client and server components are implemented as state machines (STM) using the oblivious-state-machine crate. Every state can:

  • emit outgoing messages upon initialization
  • receive incoming messages which can modify the internal state
  • advance to the next state, based on the internal state

The advantage of this approach is that the network layer is disconnected from the state logic.

The relevant API that is needed to implement a State is included below.

     1  /// Message and error types
     2  pub trait StateTypes {
     3      type In;
     4      type Out;
     5      type Err: Debug;
     6  }
     7  
     8  pub enum DeliveryStatus<U, E: Debug> {
     9      Delivered,
    10      Unexpected(U),
    11      Error(E),
    12  }
    13  
    14  pub type BoxedState<Types> = Box<dyn State<Types> + Send>;
    15  
    16  pub enum Transition<Types: StateTypes> {
    17      Same,
    18      Next(BoxedState<Types>),
    19      Terminal,
    20  }
    21  
    22  pub trait State<Types: StateTypes>: Downcast {
    23      fn desc(&self) -> String;
    24      fn initialize(&self) -> Vec<Types::Out> { Vec::new() }
    25      fn deliver(&mut self, message: Types::In) -> DeliveryStatus<Types::In, Types::Err> {
    26          DeliveryStatus::Unexpected(message)
    27      }
    28      fn advance(&self) -> Result<Transition<Types>, Types::Err>;
    29  }

Ping Client

The Ping Client application is implemented as a simple command-line application. It will send a Ping, wait for the corresponding Pong and terminate.

The Ping Client STM is implemented using only a single state PingState.

article-ping.svg

The plain arrows show the state transitions, and the dotted arrows show the incoming and outgoing messages.

The PingState contains the ping_to_send. When the STM is running the PingState performs the following logic:

  1. Emits the ping_to_send upon initialization
  2. Waits until it receives the corresponding Pong
  3. Terminates the STM

Defining the State Machine

We start by defining the types we use in our Ping Client STM.

     1  use oblivious_state_machine::state::*;
     2  use serde::{Deserialize, Serialize};
     3  
     4  #[derive(Clone, Deserialize, Serialize)]
     5  struct Ping { session_id: Uuid };
     6  #[derive(Clone, Deserialize, Serialize)]
     7  struct Pong { session_id: Uuid };
     8  
     9  struct Types;
    10  impl StateTypes for Types {
    11      type In = Pong;
    12      type Out = Ping;
    13      type Err = String;
    14  }

The Ping and Pong message types contain a session_id in order to recognize corresponding messages. We do not use any errors, however because tracing requires that it implements Display we use the String type for errors.

With these types we can implement the PingState state as follows.

    15  #[derive(Clone)]
    16  struct PingState {
    17      ping_to_send: Ping,
    18      received_pong: Option<Pong>,
    19  }
    20  
    21  impl PingState {
    22      fn new(ping_to_send: Ping) -> Self {
    23          Self { ping_to_send, received_pong: None, }
    24      }
    25  }
    26  
    27  impl State<Types> for PingState {
    28      fn desc(&self) -> String { "PingState".to_owned() }
    29  
    30      fn initialize(&self) -> Vec<Ping> {
    31          vec![self.ping_to_send.clone()]
    32      }
    33  
    34      fn deliver(&mut self, message: Pong) -> DeliveryStatus<Pong, String> {
    35          self.received_pong = Some(message);
    36          DeliveryStatus::Delivered
    37      }
    38  
    39      fn advance(&self) -> Result<Transition<Types>, String> {
    40          Ok(match &self.received_pong {
    41              Some(_pong) => Transition::Terminal,
    42              None => Transition::Same,
    43          })
    44      }
    45  }

We emit the ping_to_send on initialization, and wait until a Pong has been delivered via the deliver function. When a Pong has been delivered, the received_pong field is set to the received Pong. In the advance method this field is checked, and as long as it is not set, we remain in the same state, once the Pong has been received we advance to the terminal state.

Running the State Machine

The STM can be executed using TimeBoundStateMachineRunner, which internally uses tokio channels to exchange messages. The following code snippet is an almost complete implementation of the main application, the only thing that is missing is the networking part.

     1  use tokio::sync::{mpsc, oneshot};
     2  
     3  #[tokio::main]
     4  async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
     5      let session_id = Uuid::new_v4();
     6      let ping_to_send = Ping::new(session_id);
     7      let initial_state = Box::new(SendingPing::new(ping_to_send));
     8      let mut state_machine_runner = TimeBoundStateMachineRunner::new("Ping".into(), initial_state, Duration::from_secs(15));
     9      let (state_machine_tx, mut state_machine_rx) = mpsc::unbounded_channel();
    10      state_machine_runner.run(state_machine_tx);
    11  
    12      // Open channel to receive incoming pongs from Kafka
    13      let mut incoming: mpsc::UnboundedReceiver<Pong> = todo!();
    14  
    15      let res = loop {
    16          select! {
    17              Some(pong) = incoming.recv() => {
    18                  if pong.session_id() == session_id {
    19                      state_machine_runner.deliver(pong).unwrap();
    20                  }
    21              }
    22              Some(stm_event) = state_machine_rx.recv() => {
    23                  match stm_event {
    24                      Either::Messages { messages, .. } => {
    25                          for ping in messages {
    26                              kafka_send_message(&mut producer, "ping", ping)?;
    27                          }
    28                      }
    29                      Either::Result { result, .. } => {
    30                          break result;
    31                      }
    32                  }
    33              }
    34          }
    35      }
    36  
    37      Ok(())
    38  }

Let us go through this piece of code step by step.

  • In line 8 the TimeBoundStateMachineRunner is instantiated. The returned handle can be used to interact with the STM.
  • In line 10 the STM is started, using the state_machine_tx sender to communicate back messages and the final result to the main loop.
  • Line 13 opens a channel via which incoming messages are received from the network. This will be explained in the following paragraph about Networking.
  • Line 15 is the beginning of the main application loop. The incoming and state_machine_rx channels are being polled for updates.
  • In line 17 incoming Pongs are delivered to the STM if they belong to this session.
  • Line 22 handles STM events, which can be either messages or a final result.
  • In line 24 outgoing Pings are received from the STM and sent via Kafka. The definition of kafka_send_message will be given in the following paragraph about Networking.
  • In line 29 the final result of the STM is received, and the main application loop is exited.

Networking

In our application we use Apache Kafka to exchange the messages. The library that we use is the kafka crate, which is a pure Rust library for Apache Kafka.

As discussed in the previous section, incoming messages are received via a channel that receives the incoming messages from a separate tokio task. The function below is used to setup that task. The code is generic, so that we can also use this for the Ping Server.

     1  /// Spawns a tokio task that polls `consumer`.
     2  /// Returns the task handle and a message receiver and shutdown transmitter.
     3  pub fn spawn_kafka_consumer_task<T>(
     4      mut consumer: Consumer,
     5  ) -> (
     6      tokio::task::JoinHandle<()>,
     7      mpsc::UnboundedReceiver<T>,
     8      oneshot::Sender<()>,
     9  )
    10  where
    11      T: DeserializeOwned + Debug + Send + Sync + 'static,
    12  {
    13      let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
    14      let (message_tx, message_rx) = mpsc::unbounded_channel::<T>();
    15  
    16      // Polls `consumer`, and sends all incoming messages to `message_tx`.
    17      // Exits when an event is received on `shutdown_rx`.
    18      let kafka_consumer_task = tokio::task::spawn_blocking(move || {
    19          while shutdown_rx.try_recv().is_err() {
    20              for msg_result in consumer.poll().unwrap().iter() {
    21                  for msg in msg_result.messages() {
    22                      let message: T = serde_json::from_slice(msg.value).unwrap();
    23                      message_tx.send(message).unwrap();
    24                  }
    25                  consumer.consume_messageset(msg_result).unwrap();
    26              }
    27              consumer.commit_consumed().unwrap();
    28          }
    29      });
    30  
    31      (kafka_consumer_task, message_rx, shutdown_tx)
    32  }

We open two channels, a oneshot channel which is used to receive a termination request, and a mpsc channel which is used to transmit the incoming messages. In line 18 a new tokio task is spawned that continuously polls the Kafka consumer for messages. Any incoming message is deserialized and send via the message_tx channel.

The code that we use to send messages to a Kafka queue is the following. Again this code is generic so that we can also use it for the Ping Server.

     1  pub fn kafka_send_message<T>(
     2      producer: &mut Producer,       // Kafka Producer
     3      queue: &str,                   // Name of Kafka queue
     4      message: T,                    // Message to send
     5  ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
     6  where
     7      T: serde::Serialize,           // Message must be Serializable
     8  {
     9      let json_message = serde_json::to_string_pretty(&message)?;
    10      let record = Record::from_value(queue, json_message);
    11      producer.send(&record).map_err(|e| e.into())
    12  }

Messages are serialized to JSON, and sent using the Kafka producer.

Ping Server

The Ping Server is implemented as a server application. It continuously polls the Kafka consumer for Ping messages, and for each Ping message it spawns a new StateMachineRunner. All active STMs are stored in a HashMap.

The Ping Server STM is implemented using two states, ListeningForPing and SendingPong.

article-pong.svg

After emitting the Pong, the SendingPong state waits until it gets a PongSent event with the confirmation that the Pong has been sent. This ensures that the Pong has actually been forwarded to Apache Kafka, and is not still in the outgoing channel when the STM is closed.

We will not include the source code for these states in this article, but refer to the GitHub repository instead.

Distributed Tracing

We want our application to emit tracing events so that we can understand what exactly happens in our distributed application. The repository contains a try_init_tracing function, that will setup tracing and OpenTelemetry with Jaeger. Jaeger can be started by running docker-compose up.

To emit the tracing events, we must instrument our application to do so. When adding tracing to these services, we face 2 main challenges.

  1. The STMs are running concurrently as different processes, potentially even on different machines, so we don't have a shared Span which lives in memory.
  2. We want to see a combined trace for both the Client and Server.

Tracing an asynchronous application

We are interested in the progression through the STM, and want to instrument all State operations. Luckily the oblivious_state_machine library has a feature tracing2 that enables just this and ensures that the instrumented spans fall under the same span for the whole STM.

In the following code block we instrument all the state operations. We omit self from the trace using the skip directive, and add a new field state that contains the state's description.

     1  struct PingState {
     2      ping_to_send: Ping,
     3      received_pong: Option<Pong>,
     4  }
     5  
     6  impl State<Types> for PingState {
     7      #[tracing::instrument(skip(self), fields(state = self.desc()))]
     8      fn initialize(&self) -> Vec<Ping> { todo!() }
     9  
    10      #[tracing::instrument(skip(self), fields(state = self.desc()))]
    11      fn deliver(&mut self, message: Pong) -> DeliveryStatus<Pong, ()> { todo!() }
    12  
    13      #[tracing::instrument(skip(self), fields(state = self.desc()))]
    14      fn advance(&self) -> Result<Transition<Types>, ()> { todo!() }
    15  }

When the tracing feature is enabled, the TimeBoundStateMachineRunner must be constructed with a Span:

     1  let ping_span = tracing::info_span!("ping span");
     2  let session_id = Uuid::new_v4();
     3  let ping_to_send = Ping::new(session_id);
     4  let initial_state: BoxedState<Types> = Box::new(PingState::new(ping_to_send));
     5  let mut state_machine_runner = TimeBoundStateMachineRunner::new(
     6      format!("Ping:{}", session_id).into(),
     7      initial_state,
     8      Duration::from_secs(15),
     9      ping_span,
    10  );

The tracing feature also adds a span field to the emitted messages and final result. This field is used in the next section where we will need to communicate the span from the Ping client to the Ping Server. We can obtain the span field when we deconstruct Either::Messages like this:

     1  Some(stm_event) = state_machine_rx.recv() => {
     2      match stm_event {
     3          Either::Messages { messages, span, .. } => {
     4              for ping in messages {
     5                  // include `span` when sending `ping`
     6                  todo!();
     7              }
     8          }
     9          // ...
    10      }
    11  }

Context Propagation

In order to nest the Ping server spans under the Ping client spans, we have to use the concept of context propagators. Context propagation works in two steps, the client injects the span context into messages that are sent to the server, and the server extracts the span context from messages received from clients. The server then creates a new span and sets its parent context to the extracted context from the incoming message.

The opentelemetry crate provides a TextMapPropagator that allows us to inject or extract context.

Context Injection

The TextMapPropagator can inject the context into any type that implements the Injector trait. The context is a collection of key-value pairs, so a HashMap is a good fit to hold the context. We define our own PropagationContext struct, so that we can define the inject function, which performs the actual injection.

     1  use opentelemetry::{
     2      global,
     3      propagation::Injector,
     4  };
     5  
     6  /// Serializable datastructure to hold the opentelemetry propagation context.
     7  #[derive(Debug, Clone, Serialize, Deserialize)]
     8  pub struct PropagationContext(HashMap<String, String>);
     9  
    10  impl PropagationContext {
    11      fn empty() -> Self {
    12          Self(HashMap::new())
    13      }
    14  
    15      pub fn inject(context: &opentelemetry::Context) -> Self {
    16          global::get_text_map_propagator(|propagator| {
    17              let mut propagation_context = PropagationContext::empty();
    18              propagator.inject_context(context, &mut propagation_context);
    19              propagation_context
    20          })
    21      }
    22  }
    23  
    24  impl Injector for PropagationContext {
    25      fn set(&mut self, key: &str, value: String) {
    26          self.0.insert(key.to_owned(), value);
    27      }
    28  }

Since the PropagationContext must be serialized over the network, we wrap the message body and the propagation context together in a SpannedMessage.

     1  #[derive(Debug, Clone, Serialize, Deserialize)]
     2  pub struct SpannedMessage<T: Debug + Clone> {
     3      context: PropagationContext,
     4      body: T,
     5  }
     6  
     7  impl<T: Debug + Clone> SpannedMessage<T> {
     8      pub fn new(context: PropagationContext, body: T) -> Self {
     9          Self { context, body }
    10      }
    11  
    12      pub fn unwrap(self) -> T {
    13          self.body
    14      }
    15  
    16      pub fn context(&self) -> &PropagationContext {
    17          &self.context
    18      }
    19  }

With all this in place we can wrap everything together from the message sending code. We'll instrument the kafka_send_message and introduce a new variant that takes an additional Span and creates and sends a SpannedMessage to kafka.

     1  #[tracing::instrument(skip(producer), err)]
     2  pub fn kafka_send_message<T>(
     3      producer: &mut Producer,
     4      queue: &str,
     5      message: T,
     6  ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
     7  where
     8      T: serde::Serialize + Debug,
     9  {
    10      let json_message = serde_json::to_string_pretty(&message)?;
    11      let record = Record::from_value(queue, json_message);
    12      producer.send(&record).map_err(|e| e.into())
    13  }
    14  
    15  pub fn kafka_send_message_with_span<T>(
    16      producer: &mut Producer,
    17      queue: &str,
    18      message: T,
    19      span: tracing::Span,
    20  ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
    21  where
    22      T: serde::Serialize + Debug + Clone,
    23  {
    24      span.in_scope(|| {        // trace kafka_send_message in this span
    25          let propagation_context = PropagationContext::inject(&span.context());
    26          let spanned_message = SpannedMessage::new(propagation_context, message);
    27          kafka_send_message(producer, queue, spanned_message)
    28      })
    29  }

When we run the application with the above modifications, we can observe a new context field in the Ping messages. This field includes a uber-trace-id that contains the trace id, the span id, and some flags.

    {
        "context": {
            "uber-trace-id": "a22cd50e4943b37770c4363d91d2a68e:a9391a3307c15cc7:0:1"
        },
        "body": {
            "session_id": "0ff45015-0618-41f1-bc2f-d1b960588f36"
        }
    }

Context Extraction

In the Ping server we will have to extract the span context from the incoming message, and instantiate a new span with it. The TextMapPropagator can extract the span context from anything that implements the Extractor trait. So we implement Extractor for our PropagationContext and add the extract function to it that performs the actual extraction.

     1  use opentelemetry::{
     2      global,
     3      propagation::Extractor,
     4  };
     5  
     6  /// Serializable datastructure to hold the opentelemetry propagation context.
     7  #[derive(Debug, Clone, Serialize, Deserialize)]
     8  pub struct PropagationContext(HashMap<String, String>);
     9  
    10  impl PropagationContext {
    11      // ...
    12      pub fn extract(&self) -> opentelemetry::Context {
    13          global::get_text_map_propagator(|propagator| propagator.extract(self))
    14      }
    15  }
    16  
    17  impl Extractor for PropagationContext {
    18      fn get(&self, key: &str) -> Option<&str> {
    19          let key = key.to_owned();
    20          self.0.get(&key).map(|v| v.as_ref())
    21      }
    22  
    23      fn keys(&self) -> Vec<&str> {
    24          self.0.keys().map(|k| k.as_ref()).collect()
    25      }
    26  }
    27

Now there is only one thing left for us to do.

In the Ping Server we have to extract the context from the incoming message. This is done in the following manner:

     1  let ping: SpannedMessage<Ping> = todo!(); // Incoming message
     2  
     3  // Create new span, and set the parent to the extracted context
     4  let pong_span = info_span!("pong span");
     5  let parent_context = ping.context().extract();
     6  pong_span.set_parent(parent_context);
     7  
     8  // Unwrap the message
     9  let ping: Ping = ping.unwrap();
    10  
    11  let initial_state: BoxedState<Types> = Box::new(ListeningForPing::new());
    12  
    13  // Construct the STM runner with the pong_span
    14  let mut state_machine_runner = TimeBoundStateMachineRunner::new(
    15      "pong".into(),
    16      initial_state,
    17      Duration::from_secs(5),
    18      pong_span,
    19  );

This code will setup a new pong_span with as parent_context the extracted context from the message. This pong_span is then passed to the TimeBoundStateMachineRunner in the constructor.

Now the span of the Ping server application is nicely nested under the span of the Ping client as we can see in the following screenshot.

jaeger.png

Note that in the screenshot above all state machine operations are nested under the run span, which is created by the tracing feature of the oblivious_state_machine crate. kafka_send_message is performed from the main thread/task when receiving the message from the state_machine_rx receiver, and thus is not nested under the run span. This also explains why, when looking at the time progression, the kafka_send_message span overlaps with the advance span, these spans are running in two different concurrent tokio tasks.

Conclusion

In this article you have seen how distributed tracing can be added to a distributed application that uses Apache Kafka for message exchange. We have implemented this application with the Oblivious State Machine library.

The principles that we have used are:

  • We implemented our own serializable PropagationContext that implements Injector and Extractor, to inject and extract the span context.
  • The PropagationContext is included in the json message that is sent to Kafka, by injecting the span field from the emitted state machine messages.
  • The Ping server extracts the parent span from the PropagationContext and creates a new state on top of that.

If there is one thing that I want you to remember from this article, it is this:

Whenever you need a trace that includes multiple services, think about context propagation.

Footnotes

1 In the meantime some articles have been published, e.g. Distributed Tracing in Rust

2 Implemented by yours truly