65.9K
CodeProject is changing. Read more.
Home

Mule ESB : Creating Custom Aggregator

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0 vote)

Sep 1, 2016

CPOL
viewsIcon

8210

Creating a Custom Aggregator component in Mule ESB

Introduction

Sometimes, the aggregators provided by Mule may not suit your requirement. In this case, you have to create a custom aggregator.

Using the Code

Create your own Aggregator implementation by implementing import org.mule.routing.AbstractAggregator interface or by overriding the standard implementations of import org.mule.routing.AbstractAggregator like org.mule.routing.SimpleCollectionAggregator (used in Collection Aggregator component), org.mule.routing.MessageChunkAggregator (used in Message Chunk Aggregator component) and many more.

Below is a sample Aggregator, which will append the payload from the received Mule Messages and returns the final appended string as an Aggregation result payload.

//
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;

import java.util.Iterator;

public class TestAggregator extends AbstractAggregator
{
@Override
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
    {
        return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
        {
            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
            {
                StringBuffer buffer = new StringBuffer(128);

                try
                {
                    for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                    {
                        MuleEvent event = iterator.next();
                        try
                        {
                           buffer.append(event.transformMessageToString());
                        }
                        catch (TransformerException e)
                        {
                            throw new AggregationException(events, null, e);
                        }
                    }
                }
                catch (ObjectStoreException e)
                {
                    throw new AggregationException(events,null,e);
                }

                logger.debug("event payload is: " + buffer.toString());
                return new DefaultMuleEvent(new DefaultMuleMessage
                  (buffer.toString(), muleContext), events.getMessageCollectionEvent());
            }
        };
    }
}

//

Similarly you can create your own Aggregator implementations.

Hope it helped you !!