Idempotent receivers using message store
In Microservices/Distributed systems, messaging is a preferred form of integration. I covered some of the reasons in one of my previous posts. In these type of systems, whether we have pub-sub/event notification/sending commands, it is a good practice to make the receiving endpoints idempotent. It simply means the endpoint can receive the same message multiple times.
One way of ensuring this is by designing this message body itself. For example, in an accounting system, instead of sending a message to deduct the withdrawn amount, we can send the resulting total. To explain further, if I had $100 in my account and I took $20 out, sending a message of setting the total to $80 would be naturally more idempotent than sending the deduct by $20 message.
However, there are times when this is not possible. In the simple accounting system described above, to send a message of setting $80, the sender needs to know the current state of the destination. This can bring in a whole new set of complexities. What if this is a pub-sub/event notification system? Are we going to keep track of everything happening in subscribers? Even if this was a command, we need to be sure of the current reality of the destination. This means increased coupling between the source and the destination. We are also increasing the importance of the order of messages and the bugs around this are never fun to resolve.
Inadvertently, we could be pushing this distributed system to be more consistent and that is always challenging given several explanations of CAP theorem. I highly recommend reading everything around this topic from designing data-intensive applications.
So, it sounds easy to say, "just send set-the-total-to-$80 message" but the reality is often a little more complex than that especially when we need to have a system more available than consistent or if we are dealing with event notification system with many subscribers. In these situations, it could be more practical to send a deduct-by-$20 message. Now, what?
In my system, I am working with NServiceBus with RabbitMQ as a transport. NServiceBus provides Outbox functionality. It looks good, but it comes with some caveats.
Because the Outbox uses a single (non-distributed) database transaction to store all data, the business data and Outbox storage must exist in the same database.
I can't guarantee this in my system. Also, what if I want to use a NoSQL store for this?
The Outbox feature works only for messages sent from NServiceBus message handlers.
This is very limiting. RabbitMQ is a very capable queuing system. It comes with very easy to use clients and enqueuing a message directly is common.
In comes the simple message store approach. We can store processed messages. In my case, my model looks like below:
public class MessageStore
{
public Guid MessageId { get; set; }
public Guid GenericIdentifier { get; set; }
public DateTime TimeStamp { get; set; }
public string MessageType { get; set; }
}
I can use any type of data store for this. I used SQL Server.
Now, in my NServiceBus message handlers, I can access this store as needed. To check if the message was already processed, I can do a simple check based on the message Id. This can happen for several reasons. For example, We can fetch a message out of RabbitMQ queue, process it but positive ack just never reaches back to the cluster because of a network blip.
Often, especially in data update scenarios, we want to discard a message if we have a more recent update processed. This can happen if the message is going through a retry logic and before it comes up again, another more recent update arrives and gets processed. In this situation, I can do a check like below:
var topProcessedMessage = dbContext.MessageStore.Where(m => m.MessageType.Equals(nameof(MyMessage)) && m.GenericIdentifier.Equals(message.MyRecordIdentifier))
.OrderByDescending(m => m.TimeStamp).FirstOrDefault();
I can compare the Timestamp of that message against the incoming message and discard the message if it has the Timestamp earlier than the stored one. Otherwise, process the message and store the necessary values in the MessageStore.
This way I can worry about what's relevant for my subscriber/handler. I can easily discard processed and stale messages. If multiple messages are going to make updates to the same entity, I won’t have to clutter it with many different types of LastUpdated Timestamp columns. This store can be easily expanded to store serialized messages, their checksums, etc. As we are not adding this to a pipeline, we are keeping this behavior optional and not applying by default to all the handlers.
The downside of this approach, is of course, the store can get out of hand quickly from a number of records perspective. We will see how to handle this situation in the next post.