1. 16

I have come across this problem at work and I want to know how it done/implemented generally. I don’t know where else to ask/discuss this. I would be really grateful if there are more communities where I can post this.

This is a chat application which includes multiple rooms. The end users are typically on platforms like WhatsApp, Instagram etc. and there is one room for each user. These platforms have an API which lets me queue the messages. Once the message is sent to the user’s accounts/devices, they make a callback to our webhook APIs. So, only way to send messages and ensure sequence is to:

  1. queue the message by calling their APIs
  2. wait for the ack which says message is sent
  3. repeat

Now how do I design a system which does this, at scale?


To give more details:

  1. The messages to be sent are written to a queue (GCP PubSub) and they are ordered within a context of the user/room. However, messages aren’t batched. For example, the queue could look like this:

             +----------------------------------------------------------+
               +----+  +----+  +----+  +----+  +----+  +----+  +----+
     +------>  | A7 |  | B3 |  | A6 |  | A5 |  | C5 |  | C4 |  | B2 |  +----->
               +----+  +----+  +----+  +----+  +----+  +----+  +----+
             +----------------------------------------------------------+
    

If A, B, C are different users, you can see that in this queue, for a user, their messages are ordered.

  1. I would like to have multiple workers which consume from this queue.
  2. I would like to have multiple webhook servers which listen to the acks from the external APIs
  3. Sometimes I never get the ack from the external APIs. So, I would like have a timer so that if I haven’t received the ack within X milliseconds, then mark the current message as sent and proceed to next.

Problems:

  1. Sharing the state between worker and webhook servers. The webhook server somehow needs to communicate to the worker that this message been sent to the user.
  2. Processing messages for different rooms in parallel. If for some reason, it is taking more time to send the message to one user, then it should not slow down for other users.
  1. 10

    For 20-25k rooms and 100ms latencies, this should be easily doable with one process, unless your message count is very very very high, which you don’t talk about.

    Scaling state across processes, especially distributed state is a very hard problem. See all of the Jepsen testing as examples of how hard a problem it is.

    The easiest way is just have 1 process that manages the state and handles the queue(s). So I’d recommend avoiding trying to scale it until you MUST

    Your time, effort and $$‘s will probably be better spent throwing hardware at the problem before it’s worth trying to go down the software scalability problem for high-throughput distributed state.

    You can:

    1. Shard it, and have each process take ownership of a particular room/set of rooms.

    2. Let RabbitMQ fan out the queue(s) for you. See (@algesten’s comment) This is only part of the solution though, see 1.

    3. Try to actually solve the distributed state problem, which again, nobody has managed to solve very well. I’d consider hiring an expert like aphyr(the jepsen person) to help get you on the right track.

    But seriously, avoid scaling it across processes for as long as possible; your uptime and mental health will thank you.

    1. 4

      If choice is between handling ordering at the application level or offloading the complexity to the queue, by all means offload to the queue. Ordering is a constraint that you need to respect apparently, so lets drop the almost ordered version for this theoretical exercise.

      The simplest implementation I can think of is:

      queue <=> worker <=> [API <=> client]
      

      The part of the pipeline you don’t control is in brackets.

      The idea is to have one queue per channel, per worker. Profiling the application will show you the bottlenecks and you can start optimising from there.

      NOTE: If existing message publisher cannot handle message separation, you can offload message routing to a system like RabbitMQ.

      Using back-of-the-envelope calculations and assuming your payload is text (e.g. 32 kb + metadata) I expect each message to take up to 10ms from queue to worker. Now the worker needs to communicate with the API and receive an ACK. I expect this operation to take ~100ms or less on the 99th percentile. Last part, the worker should return an ACK to RabbitMQ. If within 250ms no response has been returned from the remote API, then we should let RabbitMQ know and retry re-sending the same message until it’s properly delivered.

      NOTE: The pipeline roundtrip for our system ends when RabbitMQ receives an ACK from the worker.

      Your pipeline SLI at the 99th percentile should be less than 150ms per message from the time the message enters your infrastructure. This means up to ~7 messages per second, per channel for the 99th percentile. I am not sure if that’s good enough, you should figure that out by looking at your existing load.

      I bet the bottleneck will be the remote API, especially if doesn’t handle batches and ordering on it’s end, my understanding is it doesn’t.

      Very interesting problem, enjoy :-)

      1. 2

        If you have a billion users with trillions of messages the first thing you need to do is design a feed topology that allows you to shard your input events and your output events independently. A single process can easily handle hundreds of thousands of messages per second (and top tier stuff like q is millions) so a “feed” can really be quite thick – maybe you will do this regionally, or maybe you’ll take a few letters from the user’s name. Experiment (you know your data set better than we do) and find a way to get each feed into the 100-400k/sec range.

        This means you might have 1000 feeds for input, and 1000 feeds for output, and in-between is a piece of middleware which consumes the input feeds, makes a routing decision, and emits them to the appropriate output feeds. If your routing policy is static this will be trivial, so let us consider dynamic routing: You can have a single “routing feed” whose consumer is responsible for putting the event onto each of those other 1000 feeds (so the routing topology will propagate). If routing changes too frequently, you may shard this as well into tiers using the same strategy as your main message bus, since (hopefully) the routing topology of your cluster is static, if it’s not, repeat with another set of tiers until it is.

        Now, your question specifically asks what to do if you want multiple consumers for a single feed. One strategy is to set a stride: If you want four workers, every worker takes the fourth element and discards the other three trusting there’s another worker to handle those.

        1. 2

          How many users? What’s the latency requirement for messaging?

          1. 1
            1. The rooms are short lived, maximum duration does not exceed more than an hour. However, at any point there will be 20k to 25k rooms.

            2. Right now, the messages are sent from the queue without considering the ordering. The latency is in sub 100ms. I don’t mind increase in the latency if the order is guaranteed.

          2. 2

            Random anecdote: Google PubSub is marketed as a general purpose queue service, but note that it has some rather sharp edges if you dont match its original use case: queue for indexing the web, ie. extremely high throughput, latency per message not important.

            We routinely saw multi-hour delays in message deliveries for low throughput queues before we abandoned it at Neo4j Aura.

            1. 1

              Thanks for the anecdote! We have had few hiccups with it too.

              btw, the queue I mentioned in OP can be replaced it with any other which supports pub sub mechanism and provides order guarantees

            2. 2

              One solution I am thinking is, creating one queue per room. I can use Apache Pulsar, it is supposed handle millions of queues. However, just about anyone who have used it in production advising me to not to use it. From their experience, it is not easy to maintain infra wise and much complex than Kafka. So, I am exploring of alternate solutions too.

              1. 3

                If you’re looking into different queueing solutions you might find Tim Bray’s Eventing Facets series helpful.

                1. 1

                  I’ve been courting pulsar quite a bit lately. I’d like to use it for an observability stack, I think I’m very in love with the idea of the pulsar functions… the main problem i have is that while everything can speak with kafka, there’s not much around for pulsar yet.

                2. 2

                  I’ve had success modeling problems in RabbitMQ. You might want to consider one queue per user which would guarantee the ordering. The webhooks could potentially be modelled like one does RPC over RabbitMQ, i.e. posting the “response” back to a specific queue.

                  1. 1

                    I’m not an expert for this problem but had this idea:

                    • maintain a map binding users to workers to enforce ordered delivery through that worker
                    • when no binding exists as a message enters the queue, create one
                    • the binding is released when no longer messages for the user are in the queue (such that a new one will be created next time)
                    1. 1

                      A couple of clarifying questions:

                      1. You state that if you haven’t received an ack within X milliseconds, to mark the current message as sent and proceed. If you don’t care about retries, why not remove the requirement to listen to acks in the first place?
                      2. How important is event ordering to you? For most event architectures, it’s worth it to quash that requirement due to increased complexity.
                      3. What’s worse: a user not receiving a message, or a user receiving more than one copy of a message?
                      1. 2
                        1. I get acks 85%-90% of the times. So, I would like to optimise it so that it is ordered for maximum number of users and let it go out of order for few. Also, by adding this X amount of delay, the message is usually sent to user as ordered. The messages are going out of order when I send them instantly.

                        2. The current system is unordered and works really well (scale, maintainability). However, a lot of messages are sent out of order. So, ordering is very important. My naive solution is to add a delay of X ms after every message and it should solve for most cases. However, I would be slowing down simply and I don’t want to do that.

                        3. User not receiving a message is worse. But I would try not send multiple times either.

                        1. 4

                          Have you considered enabling PubSub ordering, with the ordering key being the user/room? Some of the tradeoffs are that you will be limited in your throughput (1MB/s) per ordering key, and will be vulnerable to hot sharding issues.

                          After enabling ordering, if the ordering issue still impacts a greater fraction of users than you would like, then the problem is most likely on the upstream side (Insta/WhatsApp). AFAIK there is no ordering guarantee for those services, even if you wait for acks.

                          My advice: if the current solution is working great without ordering, I would strongly suggest sticking with it.

                          1. 2

                            Once I enable ordering on the queue, it becomes difficult to distribute the work within multiple workers, right?

                            if the current solution is working great without ordering, I would strongly suggest sticking with it.

                            I so wish to do this, but seems I can’t :(

                            1. 3

                              Has someone actually quantified how impactful out of order messages are to the business? This is the kind of bug that a less-technical manager or PM can prioritize highly without doing due diligence.

                              Another suggestion is to make a design, and be sure to highlight whatever infrastructure costs are changing (increasing most likely), as well as calling out the risks of increasing the complexity of the system. You have the agency to advocate for what you think is right. If they decide to proceed with the design then go ahead and get the experience and you’ll find out if the warnings were warranted over time.

                              Quantifying the impact is a good exercise for you to do anyway, since if you build the system you can then put an estimate of the value you created on your resume.

                              1. 2

                                Correct; you will only be able to have one worker per ordering key, or you lose your ordering guarantee.

                            2. 2

                              If you want to avoid duplicates and lost messages, the only solution is to use idempotent APIs to send messages. If you do not receive an ack within some time, resend the message idempotently; lost sends/acks are repeated and the API provider filters the duplicates. Only proceed to sending message N+1 once you eventually succeed sending message N.

                              If your API provider does not provide an idempotent API, then you could try to query their API for your last sent message and compare it with the one you plan to send. But this is slow and, since it’s not atomic / transactional, is very racey.

                          2. 1
                            1. Get a data store that supports compare-and-set on a single row/document. Any acid database will do, many non-acid databases will do that as well (dynamodb, dunno what google cloud has)
                            2. Define a mapping like (old_state, input) => new_state (define a state machine), where the inputs are the messages on the queue and webhook responses.
                            3. every user gets a row in the datastore holding their state value.
                            4. on the queue processors pull from the queue, look up the user state, for the given input and state find the new state, compare-and-set the users state to the new state
                            5. on the webhook processors receive a webhook request, look up the user state, for the given input and state find the new state, compare-and-set the users state to the new state

                            The only tricky thing is in defining the state machine almost every state will be able to accept a webhook response because they come in asynchronously.

                            edit: and of course, if the compare-and-sets fail, do the usual thing and loop, read the new state, and cas again