Sending JSON message natively to RabbitMQ for NServiceBus subscribers

I was working on putting together some sample code to demonstrate what data synchronization services may look like. I will write a post about the details and purposes of this in a later post. The TLDR; version for that is, I have a trigger which gets fired on inserting a new row in a table, it inserts a row in the NServiceBus queue in the same database. The handler then picks up that message and puts in the RabbitMQ directly. For further illustration, please see the diagram below:

I followed NServiceBus documentation for sending the message natively to the RabbitMQ. In their example they used XML. In my case, I was working with JSON. After pushing the message on to the queue, I ran into a problem on the subscriber side. Here's the error:

 2017-08-29 15:45:48.269 ERROR NServiceBus.RecoverabilityExecutor Moving message 'd38b59e7-6680-416e-a679-d9a016972240' to the error queue 'error' because processing failed due to an exception:
NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from transport message d38b59e7-6680-416e-a679-d9a016972240 ---> System.Exception: Could not find metadata for 'Newtonsoft.Json.Linq.JObject'.
Ensure the following:
1. 'Newtonsoft.Json.Linq.JObject' is included in initial scanning. 
2. 'Newtonsoft.Json.Linq.JObject' implements either 'IMessage', 'IEvent' or 'ICommand' or alternatively, if you don't want to implement an interface, you can use 'Unobtrusive Mode'.
 at NServiceBus.Unicast.Messages.MessageMetadataRegistry.GetMessageMetadata(Type messageType)
at NServiceBus.Pipeline.LogicalMessageFactory.Create(Type messageType, Object message)
at NServiceBus.DeserializeLogicalMessagesConnector.Extract(IncomingMessage physicalMessage)
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message)
--- End of inner exception stack trace ---
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message)
at NServiceBus.DeserializeLogicalMessagesConnector.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

This was happening even before my breakpoint in the message handler was hit. I had Newtonsoft.Json referenced correctly. It was getting scanned as well. So, what was happening?

After feverishly scratching my head(and losing a little more hair) and some StackOverflow searches, I started to look at my message on the queue. It looked like below:

{
   "FirstName":"Jane",
   "LastName":"Doe",
    "Id":"44ECBBFD-85B6-47C8-9E08-3DCF2633862C"
}

It looks like a perfectly valid JSON. When NServiceBus picked it up off the queue, it left it as a JObject. Why? Because NServiceBus couldn't figure out the type. The type information was not part of the message. This type property has to be on the top. So, the JSON should look like below:

{
   "$type":"Messages.NewUser"
   "FirstName":"Jane",
   "LastName":"Doe",
    "Id":"44ECBBFD-85B6-47C8-9E08-3DCF2633862C"
}

There are multiple options such as using JsonConverter, adding the type property in the message, etc. I went to a simpler route as below:

 var jObjectFromMessage = JObject.FromObject(message);
 var typeName = typeof(NewUser).FullName;
 jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
 var serializedMessage = jObjectFromMessage.ToString();

And that worked! The complete handler code looks like below:

public async Task Handle(NewUser message, IMessageHandlerContext context)
{
     var factory = new ConnectionFactory
     {
          HostName = "localhost",
          UserName = "guest",
          Password = "guest"
     };

     await Task.Run(() =>
     {
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                  var properties = channel.CreateBasicProperties();
                  properties.MessageId = Guid.NewGuid().ToString();

                  var jObjectFromMessage = JObject.FromObject(message);
                  var typeName = typeof(NewUser).FullName;
                  jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
                  var serializedMessage = jObjectFromMessage.ToString();                        
                  var messageBytes = Encoding.UTF8.GetBytes(serializedMessage);

                  channel.QueueDeclare("SyncUsers.RabbitMqEndpoint", true, autoDelete: false, exclusive: false);
                  channel.BasicPublish(string.Empty, "SyncUsers.RabbitMqEndpoint", false, properties,  messageBytes);

                }
            }
     });
}

That's it. I hope this helps if you are trying to send JSON messages directly to RabbitMQ for NServiceBus subscribers.

UPDATE 09/04/2017

Simon Cropp on Twitter sent me a note about enclosed message types serialization headers. I tried them out and that worked too. I removed jObjectFromMessage.AddFirst(new JProperty("$type", typeName)); and added NServiceBus.EnclosedMessageTypes header to my message like below:

   var typeName = typeof(NewUser).FullName;
   var properties = channel.CreateBasicProperties();
   properties.MessageId = Guid.NewGuid().ToString();
   properties.Headers =  new Dictionary<string, object> {{"NServiceBus.EnclosedMessageTypes", typeName}};

This could be useful if you don't have control over the serialization.



comments powered by Disqus