Sending JSON message natively to RabbitMQ for NServiceBus subscribers

I was working on putting together some sample code to demonstrate what data synchronization services may look like. I will write a post about the details and purposes of this in a later post. The TLDR; version for that is, I have a trigger which gets fired on inserting a new row in a table, it inserts a row in the NServiceBus queue in the same database. The handler then picks up that message and puts in the RabbitMQ directly. For further illustration, please see the diagram below:

I followed NServiceBus documentation for sending the message natively to the RabbitMQ. In their example they used XML. In my case, I was working with JSON. After pushing the message on to the queue, I ran into a problem on the subscriber side. Here's the error:

 2017-08-29 15:45:48.269 ERROR NServiceBus.RecoverabilityExecutor Moving message 'd38b59e7-6680-416e-a679-d9a016972240' to the error queue 'error' because processing failed due to an exception:
NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from transport message d38b59e7-6680-416e-a679-d9a016972240 ---> System.Exception: Could not find metadata for 'Newtonsoft.Json.Linq.JObject'.
Ensure the following:
1. 'Newtonsoft.Json.Linq.JObject' is included in initial scanning. 
2. 'Newtonsoft.Json.Linq.JObject' implements either 'IMessage', 'IEvent' or 'ICommand' or alternatively, if you don't want to implement an interface, you can use 'Unobtrusive Mode'.
 at NServiceBus.Unicast.Messages.MessageMetadataRegistry.GetMessageMetadata(Type messageType)
at NServiceBus.Pipeline.LogicalMessageFactory.Create(Type messageType, Object message)
at NServiceBus.DeserializeLogicalMessagesConnector.Extract(IncomingMessage physicalMessage)
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message)
--- End of inner exception stack trace ---
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message)
at NServiceBus.DeserializeLogicalMessagesConnector.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

This was happening even before my breakpoint in the message handler was hit. I had Newtonsoft.Json referenced correctly. It was getting scanned as well. So, what was happening?

After feverishly scratching my head(and losing a little more hair) and some StackOverflow searches, I started to look at my message on the queue. It looked like below:

{
   "FirstName":"Jane",
   "LastName":"Doe",
    "Id":"44ECBBFD-85B6-47C8-9E08-3DCF2633862C"
}

It looks like a perfectly valid JSON. When NServiceBus picked it up off the queue, it left it as a JObject. Why? Because NServiceBus couldn't figure out the type. The type information was not part of the message. This type property has to be on the top. So, the JSON should look like below:

{
   "$type":"Messages.NewUser"
   "FirstName":"Jane",
   "LastName":"Doe",
    "Id":"44ECBBFD-85B6-47C8-9E08-3DCF2633862C"
}

There are multiple options such as using JsonConverter, adding the type property in the message, etc. I went to a simpler route as below:

 var jObjectFromMessage = JObject.FromObject(message);
 var typeName = typeof(NewUser).FullName;
 jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
 var serializedMessage = jObjectFromMessage.ToString();

And that worked! The complete handler code looks like below:

public async Task Handle(NewUser message, IMessageHandlerContext context)
{
     var factory = new ConnectionFactory
     {
          HostName = "localhost",
          UserName = "guest",
          Password = "guest"
     };

     await Task.Run(() =>
     {
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                  var properties = channel.CreateBasicProperties();
                  properties.MessageId = Guid.NewGuid().ToString();

                  var jObjectFromMessage = JObject.FromObject(message);
                  var typeName = typeof(NewUser).FullName;
                  jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
                  var serializedMessage = jObjectFromMessage.ToString();                        
                  var messageBytes = Encoding.UTF8.GetBytes(serializedMessage);

                  channel.QueueDeclare("SyncUsers.RabbitMqEndpoint", true, autoDelete: false, exclusive: false);
                  channel.BasicPublish(string.Empty, "SyncUsers.RabbitMqEndpoint", false, properties,  messageBytes);

                }
            }
     });
}

That's it. I hope this helps if you are trying to send JSON messages directly to RabbitMQ for NServiceBus subscribers.

UPDATE 09/04/2017

Simon Cropp on Twitter sent me a note about enclosed message types serialization headers. I tried them out and that worked too. I removed jObjectFromMessage.AddFirst(new JProperty("$type", typeName)); and added NServiceBus.EnclosedMessageTypes header to my message like below:

   var typeName = typeof(NewUser).FullName;
   var properties = channel.CreateBasicProperties();
   properties.MessageId = Guid.NewGuid().ToString();
   properties.Headers =  new Dictionary<string, object> {{"NServiceBus.EnclosedMessageTypes", typeName}};

This could be useful if you don't have control over the serialization.

I was working on putting together some sample code to demonstrate what data synchronization services may look like. I will write a post about the details and purposes of this in a later post. The TLDR; version for that is, I have a trigger which gets fired on inserting a…

Read More

Upgrading NServiceBus to V5 from V4 - Part2

This is a continuation of my previous NServiceBus upgrade post.

Logging

The logging functionality that used to be in NServiceBus.Core is moved to a separate set of nuget packages such as NServiceBus.CommonLogging, NServicebus.Log4net and NServiceBus.NLog

SetLoggingLibrary from V4 is removed from V5. LogManager.Use<Log4NetFactory>() from the NServicebus.Log4Net will get the job done for Log4Net implementations. The obsolete error message clearly states that.

EndpointName

In order to stop the machine name from being appended, I thought the line below was sufficient.

configuration.ScaleOut().UseSingleBrokerQueue();

It didn't work. Then, I stumbled on this Stackoverflow post and this Github issue. It looks like RabbitMQ, SQLServer and ActiveMQ transports override that setting and try to create a queue with a machine name at the end of it even though one without the machine name exists.

To disable this behavior, you can do something like below (I am using SqlServerTransport for the sake of an example, the same should work for RabbitMQ and ActiveMQ).

configuration.UseTransport<SqlServerTransport>().DisableCallbackReceiver();

StructureMap

It is a good idea to upgrade the dependent packages if they are being used. So, for the StructureMap, after upgrading the NServiceBus.StructureMap package, the configuration that looked like below in V4

Configure.StructureMapBuilder(ObjectFactory.Container)

is like this in V5

configuration.UseContainer<StructureMapBuilder>(b=>b.ExistingContainer([container]));

It is clearly stated in the the error message from StructureMapBuilder extension method. This is an example of a good message.

Assembly Scanning

NServiceBus scans the directory where the assembly with the class that implements IConfigureThisEndPoint resides. This has to be done with care because it is very easy to fall into a dependency hell hole, and the endpoint will not come up because it may start scanning too many assemblies and their dependencies. It can be configured with a finite set of assemblies.

In V4, Configure.With([ListOfAssemblies]) used to be a way to pass the list of assemblies. In V5, configuration.AssembliesToScan(listOfAssemblies) will get the job done where configuration is an instance of BusConfiguration. The name makes more sense in V5. It can take IEnumberable<Assembly>,IIncludesBuilder or IExcludesBuilder.

I think the IIncludesBuilder approach is handy because the list of assemblies will be finite and the rest of the assemblies are excluded at the time of bringing the endpoint up. You can also do some patterns matching.

var listOfAssemblies = AllAssemblies.Matching("YourNameSpace.").And("SomethingElse");
configuration.AssembliesToScan(listOfAssemblies);

More on assembly scanning can be found at this Particular documentation link. This page makes me optimistic that the documentation will eventually catch up.

If too many assemblies are included, the dependency check can spiral into a hole. If too few are included, then you may see errors like No handlers found for the message type or Could not determine type for node like this google groups discussion

MSMQ utilties

The MsmqUtilities class is not public anymore in V5. I don't think it was ever meant to be. It is however allowed to be copied if needed. The V4 source is here and the latest is here

Persistence and features

Particular has provided three types of persistence implementations. InMemory persistence comes with core nuget. NHibernate has its own NServicebus.NHibernate nuget so does the NServiceBus.RavenDB

While using persistence, the order is important. The last option wins. It is highly recommended to take a look at this documentation link

If you want to roll out your own, you can't do it as a BusConfiguration extension because it won't work. It could be implemented as features. The endpoint below is using MyFancyPeristence

 public class MyEndpointConfig
    :IConfigureThisEndpoint
    ,AsA_Server
    ,IWantToRunWhenConfigurationIsComplete
{
    public void Customize(BusConfiguration configuration)
    {
        configuration.UsePersistence<MyFancyPeristence>().For(Storage.Subscriptions, Storage.Timeouts);
    }
    public void Run(Configure config)
    {
        //read settings here.
        //var settings = config.Settings;
    }
}

We are demanding the MyFancyPersistence to provide implementations at least for subscriptions and timeouts. The Storage enum looks like below in NSB codebase as of today.

public enum Storage
{
    Timeouts = 1,
    Subscriptions = 2,
    Sagas = 3,
    GatewayDeduplication = 4,
    Outbox = 5,
}

(source - NSB Storage enum (subject to change))

MyFancyPersistence (besides inheriting from PersistenceDefinition) declares default features with Defaults method and what it can support with Supports method.

public class MyFancyPeristence :PersistenceDefinition
{
    public MyFancyPeristence()
    {
        Defaults(s => s.EnableFeatureByDefault<MyDefaultFeature>());
        Supports(Storage.Timeouts, s => s.EnableFeatureByDefault<MyTimeoutsFeature>());
        Supports(Storage.Subscriptions, s => s.EnableFeatureByDefault<MySubscriptionFeature>());
    }
}

The individual features look like below. Again, this is a simplistic implementation.

public class MyDefaultFeature :Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var settings = context.Settings; //instance of ReadOnlySettings to get endpointname,etc
        var pipeline = context.Pipeline; //instance of PipelineSettings to register steps in NSB pipeline
        var items = context.Container; //intance of IConfigureComponents to ConfigureComponents
    }
}

public class MyTimeoutsFeature :Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        //Configure components that implement timeouts implementation into fav storage
    }
}
public class MySubscriptionFeature :Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        //Configure components that implement subscription implementation into fav storage
    }
}

For more detailed implementations, please take a look at NHibernate, InMemory or RavenDb.

Conclusion

In my opinion, most of these changes are good changes and make sense. They provide more flexibility like this stackoverflow post. If you are an early adopter, you will have to deal with the documentation that is catching up and a lot of changes in the public API. The open source nature of the project overcomes all of these even if it can be a little time consuming to dig for little changes. I hope these posts help and save some time and grief.

This is a continuation of my previous NServiceBus upgrade post. Logging The logging functionality that used to be in NServiceBus.Core is moved to a separate set of nuget packages such as NServiceBus.CommonLogging, NServicebus.Log4net and NServiceBus.NLog SetLoggingLibrary from V4 is removed from V5. LogManager.Use<Log4NetFactory&…

Read More

Upgrading NServiceBus to V5 from V4 - Part1

Sometimes upgrading a framework could be a pain. Upgrading NServicebus to 5.X.X from 4.X.X is not any different. There are a lot of changes in the public API and not everything is documented or obvious. Sometimes the error messages could be misleading, too. This was a surprise for me from a mature open source project. So, I decided to document the missing links and save some time for anyone who is facing similar issues.

Documentation

On Particular's site, under the migration tag, you will find only two posts which are fairly long.

The API-diff documents only shows what was removed like below:
The API difference doc It only shows what was removed. So, if you were using one of the removed types or methods, you are lucky to find a mapping in V5 straight away. In my opinion, users would much appreciate the document that shows how a method looked in V4 and how it is in V5 now. The complete API differences can be found here. It looks like a work in progress and there are signs of improvement.

The upgrade guide from V4 to V5 is a little better but does not cover everything that is changed in V5.

Upgrade Guide

The best form of documentation is the code base. For V4.6.7, you can go here and for the latest here. This is the beauty of the open-source software. The good tests are always more helpful to show the usage than the exhaustive documentation. If so many changes get done in a big bang approach like this release, it can be a time consuming affair for the user.

To be fair, very few open source projects have awesome documentation like Knockoutjs

After upgrading the version, depending on your usage, you may see a ton of compile time errors. What is the reason behind this?

The biggest change in V5

The Configure class has lost most of its useful static methods such as Configure.EndpointName. The BusConfiguration is introduced. An instance of BusConfiguration is being passed in. So, to further illustrate this, if you have your endpoint like below in V4:

public class MyEndpointConfig : IConfigureThisEndpoint, 
                              AsA_Server, 
                              IWantCustomInitialization
{
    public void Init() 
    {
    }
 }

The Init() is a part of IWantCustomInitialization.

In V5,it looks like below

public class MyEndpointConfig : IConfigureThisEndpoint,
                              AsA_Server
{
    public void Customize(BusConfiguration configuration)
    {
    }
}

IWantCustomInitialization is removed IConfigureThisEndpoint is sufficient.

This means you will have to change your endpoints and build them if you are implementing NServicebus Endpoint interfaces.

Configuration Changes

If you had extensions added on to Configure class, they are pretty much useless. You will have to re-implement those on BusConfiguration instead.

Some of the methods that are available on BusConfiguration are :

ImageForNSBConfig

Backwards compatibility and obsolete errors

The way I understand backwards compatibility is, you keep old and new implementations marking the old one with obsolete attribute. It gives users the opportunity to make gradual changes.

The V5 of NSB has methods like below:(Source- NSB github repo)

[Obsolete("Please use `ReadOnlySettings.GetConfigSection<T>` instead. Will be removed in version 6.0.0.", true)]
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static T GetConfigSection<T>()
{
  throw new NotImplementedException();
}

I can imagine the reasons behind not keeping two implementations (probably too much of a hassle) but that error message is not helpful at all. Will be removed in Version 6.0.0? It is removed in this version too,right? Moreover ReadOnlySettings is an interface and does not have GetConfigSection<T> method. It is an extension method of ReadOnlySettings. The extension code file can be found here. The SettingsHolder class implements ReadOnlySettings interface, so you can access it from a property Settings (an instance of SettingsHolder) like below:

public class MyEndpointConfig : IWantToRunWhenConfigurationIsComplete
{
    public void Run(Configure config)
    {
       var configSection = config.Settings.GetConfigSection<TransportConfig>();
    }
}

If you need to access EndpointName then:

public class MyEndpointConfig : IWantToRunWhenConfigurationIsComplete
{
    public void Run(Configure config)
    {
       var endpointName = config.Settings.EndpointName(); // readonly
    }
}

However, it is a readonly property.

Now, why do we need to implement IWantToRunWhenConfigurationIsComplete to access this type of information? IConfigureThisEndpoint has Customize method with instance of BusConfiguration parameter. I tried to do

configuration.GetSettings().GetConfigSection<TransportConfig>();

for config section and

configuration.GetSettings().EndpointName(); 

to get the name.

The prior throws the KeyNotFoundException with a message "The given key (TypesToScan) was not present in the dictionary." The endpoint one also throws the KeyNotFoundException with a message "The given key (EndpointName) was not present in the dictionary." but you can definitely set it there like

configuration.GetSettings().EndpointName("MyEndpointName")`.

Perhaps, this could be a write-only property.

ConfigurationComplete event that used to be in V4 has disappeared. I took it to Stackoverflow. The community and open source nature of the project helped. Again, IWantToRunWhenConfigurationIsComplete comes to the rescue but it was not obvious. So, the endpoint class may start looking like below in order to make it work:

 public class MyEndpointConfig : IConfigureThisEndpoint
    , AsA_Server
    ,IWantToRunWhenConfigurationIsComplete
{
    public void Customize(BusConfiguration configuration)
    {  
        configuration.Transactions().Disable();// 
    }

    public void Run(Configure config)
    {
        var configSection = config.Settings.GetConfigSection<TransportConfig>();
        var endpointName = configuration.GetSettings().EndpointName();
    }
}

The follow up post for more changes such as persistence is coming.

Update

The part2 of this series is now online.

Sometimes upgrading a framework could be a pain. Upgrading NServicebus to 5.X.X from 4.X.X is not any different. There are a lot of changes in the public API and not everything is documented or obvious. Sometimes the error messages could be misleading, too. This was a…

Read More