Distributed message queue overview

Exploring the Inner Workings Behind Message Queue Systems

Yat Man, Wong
8 min readApr 1, 2024
Another way of transferring data

Disclaimer: This article is largely a summary of System Design Interview Volume 2 Chapter 4 Distributed Message Queue. This article does reference some images from the chapter purely for education purpose.

What it is?

Producers send messages to a queue, and consumers consume messages from it

Benefit

Benefit of message queue is decoupling components

  • they can be updated and scale independently
  • if one part of the system is offline, the other components can still interact with the queue (availability)
  • make asynchronous communication easy, components don’t have to wait for each other (performance)
  • notify multiple downstream consumers subscribing to the same event

Message queue vs event streaming platforms

  • Nowaday these platforms provide additional features that the distinction start to blur

Topics, partitions, and brokers

  • messages are categorized into topics
  • messages inside a topic are evenly distributed across partitions
  • data across partitions isn’t necessary ordered by time; some data in a partition can have earlier index compare to another message in another partition, but the message was produced at a later time
  • the servers that hold partitions are called brokers
  • a broker can hold partitions from any topics and their replica
  • replicas are clones of the partitions stored in different brokers in case of hardware failure
  • each partition is a queue
  • the position of a message in a queue is call offset
  • producer will usually sent messages with partition key

Consumer group

  • consumers can be organized into groups, each group can subscribe to multiple topics
Figure 4.7
  • consumer group 1 subscribes to topic A
  • consumer group 2 subscribes to topic A and B
  • there is a constraint that a partition can only be consumed by one consumer in the group
  • this is to maintain the consumption ordering in the partition
  • Otherwise it would be extremely inefficient, the partition would need to wait for all consumers to receive the message before sending the next one

High-level architecture

Figure 4.8

Components:

Storage:

  • Data storage: store messages in partitions
  • State storage: store consumer states
  • Metadata storage: store configuration and properties of topics

Coordination service:

  • service discovery: which brokers are alive
  • leader election: one of the brokers is selected as the active controller. There is only one active controller in the cluster. The active controller is responsible for assigning partitions

Design Deep Dive

This section is not a comprehensive list of all the problems the chapter covered.

Data Storage

message queue traffic pattern:

  • writer-heavy, read-heavy
  • no update or delete operations
  • mainly sequential read/write access

Option 1: Relational DB and SQL DB

  • Database can handle storage requirement
  • but they are not ideal to support write and read heavy use at large scale

Option 2: Write-ahead log (WAL)

  • WAL is just a plain file where new entries are appended to the end
  • persist messages as WAL log files on disk
  • WAL has a pure sequential read/write access pattern
  • the disk performance of sequential access is very good
  • rotational disks have large capacity and they are affordable
  • a file cannot grow infinitely, so data are break into segments
  • when the file grow too big, old non-active segment can be deleted

Message data structure

  • try to avoid unnecessary data copying while the messages are in transit
  • sample schema:
Table 4.1
  • crc stands for Cyclic redundancy check used to ensure integrity of raw data

Message key

  • also known as partition key, it is hash into partition number to determine which partition the message belong
  • the partition key usually carries some business info, and the producer will have some mapping algorithm to hash it into a partition number
  • a proper mapping algorithm can evenly distribute messages to all partitions even if the number of partitions changes

Message value

  • value can be plain text or binary to support whatever format the project need

Batching

Batching is critical to improve performance

  • it group messages together in a single network request
  • the broker writes messages to the append logs in large chunks, lead to much greater sequential disk access throughput

But there is a trade off between throughput and latency. The larger the request, the longer the processing time

  • longer wait time to accumulate the patch
  • reducing batch size can shorten latency
  • increasing the number of partitions per topic can improve throughput

Producer flow

Simple Design

Explanation

  • there are multiple replicas of the same partition store in different brokers, for fault tolerance
  • the producer will just send new message to the leader replica, the clone replica will copy from leader

Step:

  1. the producer send messages to the routing layer
  2. the routing layer find the leader replica from the metadata storage
  3. the leader replica receives the message and follower replicas pull data from the leader
  4. when “enough” replicas have synchronized the message, the leader commits the data to disk
  5. after data is persisted on disk, it responds to the producer

Drawback:

  • network layer = additional network latency
  • this design doesn’t take into consideration batching

Improved design

Figure 4.12

Explanation:

  • both the routing layer and buffer is warped within producer
  • fewer network hops means lower latency
  • producer can have their own logic to determine which partition the message should send to
  • batching buffers messages in memory and sends out larger batches in a single request, increase throughput

Consumer flow

  • multiple consumers can read from the same partition from different position
Figure 4.14

Push vs pull

an important design decision is whether brokers should push data to consumers, or if consumers should pull data from brokers.

Most message queues choose the pull model.

Push model

Pros:

  • low latency. The broker can push messages to consumer immediately upon receiving them

Cons:

  • if the rate of consumption falls below the rate of production, consumers could overwhelm
  • it is difficult for consumers to adjust processing power because the brokers control the data transfer rate

Pull model

Pros:

  • consumers control the consumption rate
  • we can have some consumers process messages in real time while other process batch messages
  • if the rate of consumption falls below the rate of production, we can scale out the consumers
  • better for batch processing, the consumer pull all available messages after the current position in the log

Con:

  • when there is no message in the broker, a consumer might still keep pulling data

Consumer rebalancing

  • done when new consumer subscribe or existing consumer drop
  • multiple consumers can subscribe to the same topic
  • remember there is a constraint that a partition can only be consumed by one consumer in the group
  • this rebalancing process decides which consumer is responsible for which partition

Page 108 to 110 shows the flow chart for when a consumer join, leave, or crashed

Basic idea:

  • each consumer group has a broker as the coordinator
  • this coordinator monitor heartbeats from all consumers in the group
  • whenever there is a new consumer want to join, leave or give no heart beat, the coordinator knows to start the rebalance
  • the coordinator will assign one of the consumer as leader
  • the leader will then produce the partition dispatch plan which describe which consumer handle which partition
  • the coordinator will sync the plan to all consumers

State storage

The state include:

  1. the mapping between partitions and consumer
  2. the last consumed offsets of consumer groups for each partition

Data access patterns for consumer states:

  1. Frequent read and write operations but volume is low
  2. data is updated frequently and is rarely deleted
  3. random read and write operations
  4. data consistency is important
  • KV store like zookeeper is a great choice for state storage

Replication

  • to make sure the data are not lost in a distributed system, the same partition is replicated to multiple brokers
  • the producer will only send message to the leader replica
  • the follower replicas keep pulling new messages from the leader
  • the coordination service make the replica distribution plan and persist it in metadata storage

In-sync replicas

  • in-sync replicas are a set of brokers that are in sync with the leader
  • ISR reflects the trade-off between performance and durability
  • producers can choose to receive acknowledgments until the k number of ISRs has received the message
  • K can be from 0 to all, the larger K means stronger message durability, but it takes longer time to send a message because we need to wait for the slowest ISR
  • when K is 1, the leader replica will send ACK as soon as it persists the message. BUT, the data is lost if the leader replica go down and before it is replicated to other followers
  • when K = 0, the producer just keep pushing data to the broker without waiting for any ACK

Scalability

Producer

  • producer doesn’t need group coordination. We can easily add or remove producer instances.

Consumer

  • consumer groups are isolated from each other so we can add or remove group
  • inside a consumer group, the consumer rebalancing mechanism handle when a consumer get add or remove

Broker

  • recall the partitions are replicated over multiple brokers for durability
  • in case of a broker crash, new distribution plan will be created to evenly distribute all replicas to remaining brokers
  • scaling broker doesn’t just mean add a new instance, a new broker will handle replicas from the existing brokers
  • increasing the minimum number of replicas clone the same partitional over to more brokers, which make the data more fault tolerance but increase latency
  • replicas of a partition will not be in the same node because all of the replicas will be gone if the broker crash
  • it is safer to distribute replicas across data centers, but will incur more latency and cross
  • to add new broker, the system will temporarily allow both the new and old brokers hold the same replica. But when the new broker finish copying all data from leader and is up to date, the system will remove the replica from old broker

Partition

  • when the data size in a topic increase, we may add more partitions (scale the topic)
  • the producer will be notify and the consumer will trigger consumer rebalancing
  • adding a partition do not require data migration because the persisted data is already in old partition
  • removing a partition do not get remove immediately because consumer might still be subscribing. It will get removed after the configurated retention period.

--

--

Yat Man, Wong

Android developer, problem solver, real man in training