Blog

How to Use Message Selectors for Conditional Delivery in Pub/Sub Messaging

by on September 1, 2019

Another article we wrote gave an introduction to Backendless pub/sub messaging which included a sample for broadcasting and receiving messages. Today we are going to show how to use Backendless messaging for conditional message delivery.

As you will see in the example, the publisher can attach headers to the published message. The header is a collection of arbitrary key/value pairs. A subscriber can specify a selector, which is an SQL query defining the condition a message must match in order to be delivered to that subscriber. Each subscriber for the same messaging channel can then specify its own selector and Backendless will handle all of the selectors individually.

In the example below, the publisher sends a message announcing a discount available at a retailer. The code randomly picks the retailer and the amount of the discount given. The name of the retailer, the merchandise, and the amount of the discount are added as message headers:

    Non-blocking sample (Android and Plain Java):

    Random random = new Random( System.currentTimeMillis() );
    while( true )
    {
        int i = random.nextInt( retailers.length );
        final PublishOptions publishOptions = new PublishOptions();
        publishOptions.putHeader( "retailer", retailers[ i ] );
        publishOptions.putHeader( "merchandise", "Jacket" );
        publishOptions.putHeader( "discount", random.nextInt( 100 ) + "" );
        Backendless.Messaging.publish( "SaleNews", "Discount Announcement", publishOptions, new AsyncCallback<MessageStatus>()
        {
            @Override
            public void handleResponse( MessageStatus messageStatus )
            {
                System.out.println( "Message published - " + messageStatus.getMessageId() + ", headers - " + publishOptions.getHeaders() );
            }
            @Override
            public void handleFault( BackendlessFault backendlessFault )
            {
                System.out.println( "Server reported an error " + backendlessFault.getMessage() );
            }
        } );
        Thread.sleep( 500 );
    }

    Blocking sample (Plain Java only):

    Random random = new Random( System.currentTimeMillis() );
    while( true )
    {
        int i = random.nextInt( retailers.length );
        PublishOptions publishOptions = new PublishOptions();
        publishOptions.putHeader( "retailer", retailers[ i ] );
        publishOptions.putHeader( "merchandise", "Jacket" );
        publishOptions.putHeader( "discount", random.nextInt( 100 ) + "" );
        MessageStatus messageStatus = Backendless.Messaging.publish( "SaleNews", "Discount Announcement", publishOptions );
        System.out.println( "Message published - " + messageStatus.getMessageId() + ", headers - " + publishOptions.getHeaders() );
        Thread.sleep( 500 );
    }
    

    When you run the code, you will see output similar to this:

    Message published - AFB94AE1-167B-4BB6-FF11-085C4BBBFC00, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=35}
    Message published - 00FBE607-BE15-AFF4-FF02-5BB645BF1700, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=24}
    Message published - BB98A2BC-D1F0-A5B9-FF59-2580503EAB00, headers - {merchandise=Jacket, retailer=Jeans-R-Us, discount=97}
    Message published - 02253B3D-9F8D-0FE6-FF67-A120EF60E400, headers - {merchandise=Jacket, retailer=Childs Play, discount=17}
    Message published - 9EF9A2D7-4CC6-D1EF-FFCA-FBE198068300, headers - {merchandise=Jacket, retailer=Childs Play, discount=89}
    Message published - 7959F340-5011-0A53-FF25-F29F3675B100, headers - {merchandise=Jacket, retailer=Childs Play, discount=63}
    Message published - 3D6DA5D7-CFBB-5B80-FF7E-7B8B340CBC00, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=9}
    Message published - A7FD4B2F-EB9C-5C98-FF5E-52688CFF9900, headers - {merchandise=Jacket, retailer=Childs Play, discount=22}
    Message published - BAF59A95-0FAF-0222-FFB1-B6F0253B6200, headers - {merchandise=Jacket, retailer=Jeans-R-Us, discount=99}
    Message published - 8B4D21AA-A42C-1ED6-FF8B-331AB799D200, headers - {merchandise=Jacket, retailer=Childs Play, discount=71}
    Message published - 3244645A-5E87-B508-FF2C-FE8299D18100, headers - {merchandise=Jacket, retailer=Jeans-R-Us, discount=39}
    Message published - 71A5A0AE-69B8-C97E-FF32-A9394E51E400, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=45}
    Message published - D4D80052-45D1-6099-FF2C-CBC678F5BE00, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=77}
    Message published - 005030D0-9D49-49E6-FF2E-AA0B8375A100, headers - {merchandise=Jacket, retailer=Childs Play, discount=65}
    Message published - FE2F9E8C-2B74-D238-FF24-902ADF75CC00, headers - {merchandise=Jacket, retailer=HomeOffice Depot, discount=56}

    Suppose a subscriber would like to receive notifications only from the “Jeans-R-Us” retailer, for any discounts less than 50%, and for the “Jacket” and “Shorts” merchandise. The question is: how do they express that condition in a selector? Using the SQL syntax makes it very simple:

    retailer = 'Jeans-R-Us' and merchandise in ('Jacket', 'Shorts') and discount < 50

    Below is the source code listing for creating and registering a subscriber that uses such a selector:

    AsyncCallback<List<Message>> subscriptionResponder = new AsyncCallback<List<Message>>()
    {
        @Override
        public void handleResponse( List<Message> messages )
        {
            Iterator<Message> messageIterator = messages.iterator();
            while( messageIterator.hasNext() )
            {
                Message message = messageIterator.next();
                Map<String, String> headers = message.getHeaders();
                String retailer = headers.get( "retailer" );
                String merchandise = headers.get( "merchandise" );
                String discount = headers.get( "discount" );
                System.out.println( String.format( "Received message - %s\n\tretailer - %s\n\tmerchandise - %s\n\tdiscount - %s",
                        message.getData(), retailer, merchandise, discount ) );
            }
        }
        @Override
        public void handleFault( BackendlessFault backendlessFault )
        {
            System.out.println( "Server reported an error " + backendlessFault.getMessage() );
        }
    };
    SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
    String selector = "retailer = 'Jeans-R-Us' and merchandise in ('Jacket', 'Shorts') and discount < 50";
    subscriptionOptions.setSelector( selector );
    Backendless.Messaging.subscribe( "SaleNews", subscriptionResponder, subscriptionOptions );

    When you run the subscriber code, you will see that it receives only the messages that satisfy the selector criteria:

    Received message - Discount Announcement
    	retailer - Jeans-R-Us
    	merchandise - Jacket
    	discount - 36
    Received message - Discount Announcement
    	retailer - Jeans-R-Us
    	merchandise - Jacket
    	discount - 1
    Received message - Discount Announcement
    	retailer - Jeans-R-Us
    	merchandise - Jacket
    	discount - 19
    Received message - Discount Announcement
    	retailer - Jeans-R-Us
    	merchandise - Jacket
    	discount - 42

    Enjoy!

    Leave a Reply