RabbitMQ
RabbitMQ is message broker, a software that allows a producer of messages (arbitrary sequences of bytes) to send for such bitstreams to queues and which can then be read by consumers in decoupled fashion.
Message creation
A producer creates a message and send it to an specific exchange with an specific binding key.
The exchange will then route this message to zero or more queues, this process takes place "immediately", i.e.: the exchange will check its rules, the queues that it knows about, the binding key of the message and will send the message to the applicable queues (each queue that receives an independent copy of the message). After that, the exchange will forget about the existence of this message (i.e.: if you change rules afterwards, previous messages will not be affected).
Consuming messages
A consumer is on the other hand, a piece code subscribing to an specific queue, listing for new messages that might come (or a backlog of messages before it started running). It takes the oldest message on the queue, processes it (this is an application level definition, e.g.: maybe processing means storing the message at some database, sending an email message, etc) and the message is removed from queue (and not retained, unlike Kafka streams for instance).
Exchange types and routing
There are four types of exchanges:
-
Default exchange: this one will route the message to the queue which has the same name as the binding key, .e.g.: messages with binding key
example
will be routed to the queueexample
if and only if the queueexample
exists, otherwise the message if discarded. -
Fanout type: this exchange will deliver the message to all queues it has been bound to, ignoring the binding key of the message. E.g.: create a exchange
my-exchange
of typefanout
, then create queuesqueue1
andqueue2
and bind each of those queues tomy-exchange
. When a new message is sent tomy-exchange
, it will route the message to bothqueue1
andqueue2
regardless of the routing queue -
Direct exchange: this exchange will deliver the message to the queues that were binding
-
Topic: a superset of the direct example, allows usage of
*
and#
for matching criteria.
Channels, connections and durability of messages and queues
Starting a RabbitMQ server using Docker
docker run --rm -d --net host --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=myuser -e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3
docker exec -it some-rabbit rabbitmqctl list_bindings
docker exec -it some-rabbit rabbitmqctl list_queues
Rust Code examples
1 [package]
2 name = "rabbitmq-example"
3 version = "0.1.0"
4 edition = "2021"
5
6 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7
8 [dependencies]
9
10 lapin = "2"
11 tokio = { version = "1", features = ["full"] }
12 futures-lite = "2"
13 rand = "0.8"
1 use futures_lite::stream::StreamExt;
2 use lapin::{
3 options::*, types::FieldTable, BasicProperties, Channel, Connection,
4 ConnectionProperties,
5 };
6 use rand::{distributions::Uniform, thread_rng, Rng};
7 use std::time::SystemTime;
8 use tokio::time::{sleep, Duration};
9
10 async fn open_channel() -> Result<Channel, Box<dyn std::error::Error>> {
11 let addr = std::env::var("AMQP_ADDR")
12 .unwrap_or_else(|_| "amqp://127.0.0.1:5672".into());
13 let conn = Connection::connect(&addr, ConnectionProperties::default())
14 .await?;
15 let channel = conn.create_channel().await?;
16 Ok(channel)
17 }
18
19 async fn producer() -> Result<(), Box<dyn std::error::Error>> {
20 let channel = open_channel().await?;
21
22 // Enable publisher confirms on the channel
23 // For more details, check
24 // https://www.rabbitmq.com/confirms.html#publisher-confirms
25 channel
26 .confirm_select(ConfirmSelectOptions::default())
27 .await?;
28
29 channel
30 .queue_declare(
31 "my_queue",
32 QueueDeclareOptions::default(),
33 FieldTable::default(),
34 )
35 .await?;
36
37 let base_message = b"Some message at ";
38
39 let mut rng = thread_rng();
40 loop {
41 // Simulate some random latency for producing messages
42 // e.g. maybe the process needs to wait some time for
43 // new events that generate message to happen again
44 sleep(Duration::from_millis(rng.sample(Uniform::new(200, 2000))))
45 .await;
46
47 // let add some timestamp to the message to helps us
48 // visualize the dynamic of producing and consuming messages
49 let now = SystemTime::now()
50 .duration_since(SystemTime::UNIX_EPOCH)
51 .map_err(|e| format!("Error: {:?}", e))?
52 .as_micros();
53 let message =
54 &[base_message, now.to_string().as_bytes()].concat()[..];
55 while !channel
56 .basic_publish(
57 "",
58 "my_queue",
59 BasicPublishOptions::default(),
60 message,
61 BasicProperties::default(),
62 )
63 .await?
64 .await?
65 .is_ack()
66 {
67 println!("Did not get ack message, will retry sending it");
68 sleep(Duration::from_millis(100)).await;
69 }
70 println!("Sent message: {}", std::str::from_utf8(message)?);
71 }
72 }
73
74 async fn consumer() -> Result<(), Box<dyn std::error::Error>> {
75 let channel = open_channel().await?;
76
77 let mut consumer = channel
78 .basic_consume(
79 "my_queue",
80 "my_consumer",
81 BasicConsumeOptions::default(),
82 FieldTable::default(),
83 )
84 .await?;
85 let mut rng = thread_rng();
86 while let Some(delivery) = consumer.next().await {
87 let delivery = delivery?;
88 println!(
89 "Received message: {}",
90 std::str::from_utf8(&delivery.data[..])?
91 );
92
93 // Simulate some random latency of consuming messages
94 // e.g. after obtaining the message, maybe the process needs
95 // to do some heavy computation, or maybe it needs to wait
96 // a network call to finish, etc.
97 sleep(Duration::from_millis(rng.sample(Uniform::new(200, 2000))))
98 .await;
99
100 delivery.ack(BasicAckOptions::default()).await?;
101 }
102
103 Ok(())
104 }
105
106 #[tokio::main]
107 async fn main() -> Result<(), Box<dyn std::error::Error>> {
108 // In practice, the producer and consumer could run
109 // on distinct proccesses/code, potentially distinct
110 // machines, maybe even distinct parts of the world
111 tokio::join!(
112 async {
113 loop {
114 if let Err(e) = producer().await {
115 println!("Error in producer: {e}.");
116 }
117 println!("Restarting producer in 1 second.");
118 sleep(Duration::from_secs(1)).await;
119 }
120 },
121 async {
122 loop {
123 if let Err(e) = consumer().await {
124 println!("Error in consumer: {e}.");
125 }
126 println!("Restarting consumer in 1 second.");
127 sleep(Duration::from_secs(1)).await;
128 }
129 },
130 );
131
132 Ok(())
133 }
Note that you need to pass your previously defined username and password as environment variable, e.g.:
AMQP_ADDR="amqp://myuser:mypassword@127.0.0.1:5672" cargo watch -c -x r
RPC pattern
It's also possible to have an RPC pattern in RabbitMQ, for a more theoretical explanation, see this. For a Rust code example, see this.
If you found this project helpful, please consider making a donation.