Message store clean up strategy

In the previous post, we saw how to use message store strategy to make a receiver idempotent. As we are using a table in SQL server, it can fill up quickly if you have multiple types of messages getting stored. How do we keep it from filling up?

Instead of relying on any strategy outside our endpoint, I decided to use NServicebus scheduling. I configured it to send a message every 24 hours. This can be configured easily. My send code looks like below:

await endpointInstance.ScheduleEvery(
    timeSpan: TimeSpan.FromHours(24),
    task: context=>
    {
        var message = new CleanupStoreMessage();
        return context.Send(message);
    })
.ConfigureAwait(false);

The CleanupStoreMessageHandler picks it up after. The handler goes back to the configured amount of days and removes them from the store. That code looks like below:

 var messagesToRemove = dbContext.MyMessageStore.Where(
     m =>  m.Timestamp <= DateTime.UtcNow.AddDays(-10));

 dbContext.MyMessageStore.RemoveRange(messagesToRemove);
 dbContext.SaveChanges();

The NServicebus scheduling comes with several caveats. A relevant one is below:

Since the Scheduler is non-durable, if the process restarts, all scheduled tasks (that are created during the endpoint's startup) are recreated and given new identifiers.

To get around this, my strategy is to keep the TimeSpan for the ScheduleEvery method small enough so that it gets executed at least once in a day. Since I am going back some days based on the day of the execution, if I receive more than one message, the later messages just wont find any data to cleanup and that's ok for me. My query here goes back 10 days and I am sending the CleanupStoreMessage every 24 hours. If I reduce the TimeSpan to 12, I will get 2 messages per day and the second time, I may not find any data to clean. This way my handler has a greater chance of executing and mitigating the limitation.

Another advantage here is that I am cleaning up my own data without relying on any other clever database archiving strategy or some other process to clean up it. I am less likely to forget about it and it requires less maintenance.

In the previous post, we saw how to use message store strategy to make a receiver idempotent. As we are using a table in SQL server, it can fill up quickly if you have multiple types of messages getting stored. How do we keep it from filling up? Instead of…

Read More

Idempotent receivers using message store

In Microservices/Distributed systems, messaging is a preferred form of integration. I covered some of the reasons in one of my previous posts. In these type of systems, whether we have pub-sub/event notification/sending commands, it is a good practice to make the receiving endpoints idempotent. It simply means the endpoint can receive the same message multiple times.

One way of ensuring this is by designing this message body itself. For example, in an accounting system, instead of sending a message to deduct the withdrawn amount, we can send the resulting total. To explain further, if I had $100 in my account and I took $20 out, sending a message of setting the total to $80 would be naturally more idempotent than sending the deduct by $20 message.

However, there are times when this is not possible. In the simple accounting system described above, to send a message of setting $80, the sender needs to know the current state of the destination. This can bring in a whole new set of complexities. What if this is a pub-sub/event notification system? Are we going to keep track of everything happening in subscribers? Even if this was a command, we need to be sure of the current reality of the destination. This means increased coupling between the source and the destination. We are also increasing the importance of the order of messages and the bugs around this are never fun to resolve.
Inadvertently, we could be pushing this distributed system to be more consistent and that is always challenging given several explanations of CAP theorem. I highly recommend reading everything around this topic from designing data-intensive applications.

So, it sounds easy to say, "just send set-the-total-to-$80 message" but the reality is often a little more complex than that especially when we need to have a system more available than consistent or if we are dealing with event notification system with many subscribers. In these situations, it could be more practical to send a deduct-by-$20 message. Now, what?

In my system, I am working with NServiceBus with RabbitMQ as a transport. NServiceBus provides Outbox functionality. It looks good, but it comes with some caveats.

Because the Outbox uses a single (non-distributed) database transaction to store all data, the business data and Outbox storage must exist in the same database.

I can't guarantee this in my system. Also, what if I want to use a NoSQL store for this?

The Outbox feature works only for messages sent from NServiceBus message handlers.

This is very limiting. RabbitMQ is a very capable queuing system. It comes with very easy to use clients and enqueuing a message directly is common.

In comes the simple message store approach. We can store processed messages. In my case, my model looks like below:

public class MessageStore
{
    public Guid MessageId { get; set; }
    public Guid GenericIdentifier { get; set; }
    public DateTime TimeStamp { get; set; }
    public string MessageType { get; set; }
}  

I can use any type of data store for this. I used SQL Server.

Now, in my NServiceBus message handlers, I can access this store as needed. To check if the message was already processed, I can do a simple check based on the message Id. This can happen for several reasons. For example, We can fetch a message out of RabbitMQ queue, process it but positive ack just never reaches back to the cluster because of a network blip.

Often, especially in data update scenarios, we want to discard a message if we have a more recent update processed. This can happen if the message is going through a retry logic and before it comes up again, another more recent update arrives and gets processed. In this situation, I can do a check like below:

var topProcessedMessage = dbContext.MessageStore.Where(m =>     m.MessageType.Equals(nameof(MyMessage)) && m.GenericIdentifier.Equals(message.MyRecordIdentifier))
.OrderByDescending(m => m.TimeStamp).FirstOrDefault();

I can compare the Timestamp of that message against the incoming message and discard the message if it has the Timestamp earlier than the stored one. Otherwise, process the message and store the necessary values in the MessageStore.

This way I can worry about what's relevant for my subscriber/handler. I can easily discard processed and stale messages. If multiple messages are going to make updates to the same entity, I won’t have to clutter it with many different types of LastUpdated Timestamp columns. This store can be easily expanded to store serialized messages, their checksums, etc. As we are not adding this to a pipeline, we are keeping this behavior optional and not applying by default to all the handlers.

The downside of this approach, is of course, the store can get out of hand quickly from a number of records perspective. We will see how to handle this situation in the next post.

In Microservices/Distributed systems, messaging is a preferred form of integration. I covered some of the reasons in one of my previous posts. In these type of systems, whether we have pub-sub/event notification/sending commands, it is a good practice to make the receiving endpoints idempotent. It simply means…

Read More