Minimalistic blocking bounded queue in C++

While working on speeding up a Python extension written in C++, I needed a queue to distribute work among threads, and possibly to gather their results. What I had in mind was something akin to Python’s queue module – more specifically:

  • a blocking MPMC queue with fixed capacity;
  • that supports push, try_push, pop, and try_pop;
  • that depends only on the C++11 standard library;
  • and has a simple and robust header-only implementation.

Since I could choose the size of the unit of work arbitrarily, the performance of the queue as measured by microbenchmarks wasn’t a priority, as long as it wasn’t abysmal. Likewise, the queue capacity would be fairly small, on the order of the number of cores on the system, so the queue wouldn’t need to be particularly efficient at allocation – while a ring buffer would be optimal for storage, a linked list or deque would work as well. Other non-requirements included:

  • strong exception safety – while certainly an indicator of quality, the work queue did not require it because its payload would consist of smart pointers, either std::unique_ptr<T> or std::shared_ptr<T>, neither of which throws on move/copy.
  • lock-free/wait-free mutation – again, most definitely a plus (so long as the queue can fall back to blocking when necessary), but not a requirement because the workers would spend the majority of time doing actual work.
  • timed versions of blocking operations – useful in general, but I didn’t need them.

Googling C++ bounded blocking MPMC queue gives quite a few results, but surprisingly, most of them don’t really meet the requirements set out at the beginning.

For example, this queue is written by Dmitry Vyukov and endorsed by Martin Thompson, so it’s definitely of the highest quality, and way beyond something I (or most people) would be able to come up with on my own. However, it’s a lock-free queue that doesn’t expose any kinds of blocking operations by design. According to the author, waiting for events such as the queue no longer being empty or full is a concern that should be handled outside the queue. This position has a lot of merit, especially in more complex scenarios where you might need to wait on multiple queues at once, but it complicates the use in many simple cases, such as the work distribution I was implementing. In these use cases the a thread is communicating with only queue at one time. When the queue is unavailable due to being empty or full, that is a signal of backpressure and there is nothing else the thread can do but sleep until the situation changes. If done right, this sleep should neither hog the CPU nor introduce unnecessary latency, so it is at least convenient that it be implemented as part of the queue.

The next implementation that looks really promising is Erik Rigtorp’s MPMCQueue. It has a small implementation, supports both blocking and non-blocking variants of enqueue and dequeue operations, and covers inserting elements by move, copy, and emplace. The author claims that it’s battle-tested in several high-profile EA games, as well as in a low-latency trading platform. However, a closer look at push() and pop() betrays a problem with the blocking operations. For example, pop() contains the following loop:

    while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
      ;

Waiting is implemented with a busy loop, so that the waiting thread is not put to sleep when unable to pop, but it instead loops until this is possible. In usage that means that if the producers stall for any reason, such as reading new data from a slow network disk, consumers will angrily pounce on the CPU waiting for new items to appear in the queue. And if the producers are faster than the consumers, then it’s them who will spend CPU to wait for some free space to appear in the queue. In the latter case, the busy-looping producers will actually take the CPU cycles away from the workers doing useful work, prolonging the wait. This is not to say that Erik’s queue is not of high quality, just that it doesn’t fit this use case. I suspect that applications the queue was designed for very rarely invoke the blocking versions of these operations, and when they do, they do so in scenarios where they are confident that the blockage is about to clear up.

Then there are a lot of queue implementations on stackoverflow and various personal blogs that seem like they could be used, but don’t stand up to scrutiny. For example, this queue comes pretty high in search results, definitely supports blocking, and appears to achieve the stated requirements. Compiling it does produce some nasty warnings, though:

warning: operation on 'm_popIndex' may be undefined [-Wsequence-point]

Looking at the source, the assignment m_popIndex = ++m_popIndex % m_size is an infamous case of undefined behavior in C and C++. The author probably thought he found a way to avoid parentheses in m_popIndex = (m_popIndex + 1) % m_size, but C++ doesn’t work like that. The problem is easily fixed, but it leads a bad overall impression. Another issue is that it requires a semaphore implementation, using boost’s inter-process semaphore by default. The page does provide a replacement semaphore, but the two in combination become unreasonably heavy-weight: the queue contains no less than three mutexes, two condition variables, and two counters. The simple queues implemented with mutexes and condition variables tend to have a single mutex and one or two condition variables, depending on whether the queue is bounded.

It was at this point that it occurred to me that it would actually be more productive to just take a simple and well-tested classic queue and port it to C++ than to review and correct examples found by googling. I started from Python’s queue implementation and ended up with the following:

template<typename T>
class queue {
  std::deque<T> content;
  size_t capacity;

  std::mutex mutex;
  std::condition_variable not_empty;
  std::condition_variable not_full;

  queue(const queue &) = delete;
  queue(queue &&) = delete;
  queue &operator = (const queue &) = delete;
  queue &operator = (queue &&) = delete;

 public:
  queue(size_t capacity): capacity(capacity) {}

  void push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_full.wait(lk, [this]() { return content.size() < capacity; });
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
  }

  bool try_push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.size() == capacity)
        return false;
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
    return true;
  }

  void pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_empty.wait(lk, [this]() { return !content.empty(); });
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
  }

  bool try_pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.empty())
        return false;
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
    return true;
  }
};

Notes on the implementation:

  • push() and try_push() only accept rvalue references, so elements get moved into the queue, and if you have them in a variable, you might need to use queue.push(std::move(el)). This property allows the use of std::unique_ptr<T> as the queue element. If you have copyable types that you want to copy into the queue, you can always use queue.push(T(el)).

  • pop() and try_pop() accept a reference to the item rather than returning the item. This provides the strong exception guarantee – if moving the element from the front of the queue to item throws, the queue remains unchanged. Yes, I know I said I didn’t care for exception guarantees in my use case, but this design is shared by most C++ queues, and the exception guarantee follows from it quite naturally, so it’s a good idea to embrace it. I would have preferred pop() to just return an item, but that would require the C++17 std::optional (or equivalent) for the return type of try_pop(), and I couldn’t use C++17 in the project. The downside of accepting a reference is that to pop an item you must first be able to construct an empty one. Of course, that’s not a problem if you use the smart pointers which default-construct to hold nullptr, but it’s worth mentioning. Again, the same design is used by other C++ queues, so it’s apparently an acceptable choice.

  • The use of mutex and condition variables make this queue utterly boring to people excited about parallel and lock-free programming. Still, those primitives are implemented very efficiently in modern C++, without the need to use system calls in the non-contended case. In benchmarks against fancier queues I was never able to measure the difference on the workload of the application I was using.

Is this queue as battle-tested as the ones by Mr Vyukov and Mr Rigtorp? Certainly not! But it does work fine in production, it has passed code review by in-house experts, and most importantly, the code is easy to follow and understand.

The remaining question is – is this a classic example of NIH? Is there really no other way than to implement your own queue? How do others do it? To shed some light on this, I invite the reader to read Dmitry Vyukov’s classification of queues. In short, he categorizes queues across several dimensions: by whether they produce multiple producers, consumers, or both, by the underlying data structure, by maximum size, overflow behavior, support for priorities, ordering guarantees, and many many others! Differences in these choices vastly affect the implementation, and there is no one class that fits into all use cases. If you have needs for extremely low latency, definitely look into the lock-free queues like the ones from Vyukov or Rigtorp. If your needs are matched by the “classic” blocking queues, then a simple implementation like the one showed in this article might be the right choice. I would still prefer to have one good implementation written by experts that tries to cover the middle ground, for example a C++ equivalent of Rust’s crossbeam channels. But that kind of thing doesn’t appear to exist for C++ yet, at least not as a free-standing class.

EDIT
As pointed out on reddit, the last paragraph is not completely correct. There is a queue implementation that satisfies the requirements and has most of the crossbeam channels functionality: the moodycamel queue. I did stumble on that queue when originally researching the queues, but missed it when writing the article later, since it didn’t come up at the first page of results when googling for C++ bounded blocking MPMC queue. This queue was too complex to drop into the project I was working on, since its size would have dwarfed the rest of the code. But if size doesn’t scare you and you’re looking for a full-featured high quality queue implementation, do consider that one.

4 thoughts on “Minimalistic blocking bounded queue in C++”

  1. I think you should use notify_all. For push scenario, notify_one can notify a thread that is waiting on push while we want to notify a thread on pop. Likewise think about the pop scenario where we want to notify a thread that waiting on push. To achieve this you need to use notify_all.

  2. Thanks for the post! Just posting another version similar to yours using ring buffer (use int as an example but could be easily changed to T)

    class BoundedBlockingQueue {
    public:
    BoundedBlockingQueue(int capacity) {
    data_.resize(capacity + 1);
    front_ = back_ = 0;
    }

    void enqueue(int element) {
    {
    std::unique_lock<std::mutex> lock(guard_);

    not_full_.wait(lock, [this]{
    return !this->isFull();
    });

    data_[back_] = element;
    back_ = next(back_);
    }

    not_empty_.notify_one();
    }

    int dequeue() {
    int payload = 0;

    {
    std::unique_lock<std::mutex> lock(guard_);

    not_empty_.wait(lock, [this]{
    return !this->isEmpty();
    });

    payload = data_[front_];
    front_ = next(front_);
    }

    not_full_.notify_one();
    return payload;
    }

    int size() const noexcept {
    if (back_ >= front_) {
    return back_ - front_;
    } else {
    return static_cast<int>(data_.size()) - (front_ - back_);
    }
    }

    private:
    bool isFull() const noexcept {
    return next(back_) == front_;
    }

    bool isEmpty() const noexcept {
    return back_ == front_;
    }

    int next(int index) const noexcept {
    int next = index + 1;
    if (next == static_cast<int>(data_.size())) {
    next = 0;
    }

    return next;
    }

    std::mutex guard_;
    std::condition_variable not_empty_;
    std::condition_variable not_full_;
    std::vector<int> data_;
    int front_;
    int back_;
    };

Leave a Reply