Tuesday, December 6, 2016

Making queries on demand: MongoDB outbound gateway

1- Introduction


In order to read data from MongoDb, Spring Integration comes with the MongoDb inbound channel adapter. This adapter uses a poller to continuously retrieve documents from the database. However, sometimes we may need to query the database on demand, based on the result of another endpoint.

Taking advantage of Spring's extensibility, I implemented a MongoDb outbound gateway. The purpose of this gateway is to react to some request, make a query to the database and return the result.

In order to show you how the gateway works, I will use a simple example and modify it to implement the gateway with different configurations.

This example consists in a messaging gateway as the entry point to the integration flow. Once a message enters the flow, the mongoDb outbound gateway makes a query to the database and the result is then sent to another channel where a service activator will process it.

The source code for these examples and the gateway implementation can be found in my repository.


2- Java DSL example


I implemented the MongoDb static class to ease the definition of the gateway. I took the idea from the Spring Integration Jpa class.

In the following configuration you can see the flow requestPerson. An invocation to PersonService's send method will send a message to the flow, where the mongoDb outbound gateway will then query the database with a pre-defined query ({id : 1}):

The result handler is a "very useful" component which will log the retrieved person:

In order to start the flow, the following application sends a message to the PersonService gateway:

As a note, the abstract class just contains the logic to set up the database, which is used along all the other examples.


3- Java DSL example with dynamic query expression


The previous example was useful to see how to define the gateway, but having a hardcoded query may not be the most used case.

In this example, the query is defined in the message sent to the integration flow:

In the configuration file, the gateway's queryExpression property resolves the query dynamically by retrieving the data property of the message's payload:


4- Java DSL example returning multiple results


The two previous examples retrieved a single document from the database. In this next example, the query returns a list with all documents matching the query:

In the request message we specify the query to find all documents in the persons collection:

In the configuration, we have to remove the expectSingleResult property from the gateway (or set it to false). Additionally, we can specify a limit:

Finally, we have to define another method in the ResultHandler class to handle multiple results:


5- Java Config example


In this last example, Java Config is used instead of Java DSL to configure the whole flow. On the application's side everything is the same. We just query the person service for a specific document:

When using Java Config, we have to build the MongoDbExecutor, which is used by the gateway to do the queries.

Listening to the gateway's output channel, we define a service activator to handle the retrieved person:


6- Conclusion


An outbound gateway is suitable when you need to do queries on demand instead of actively polling the database. Currently, this implementation supports setup with Java Config and Java DSL. For now, I haven't implemented the parsers needed to support XML configuration since I think these two ways of configuration cover the main necessity.

If you found this post useful, please share it or star my repository :)

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Monday, November 21, 2016

Spring Integration MongoDB adapters with Java DSL

1 Introduction


This post explains how to save and retrieve entities from a MongoDB database using Spring Integration. In order to accomplish that, we are going to configure inbound and outbound MongoDB channel adapters using the Java DSL configuration extension. As an example, we are going to build an application to allow you to write orders to a MongoDB store, and then retrieve them for processing.

The application flow can be split in two parts:

  • New orders are sent to the messaging system, where they will be converted to actual products and then stored to MongoDB.
  • On the other hand, another component is continuously polling the database and processing any new product it finds.

The source code can be found in my Spring Integration repository.


2 MessagingGateway - Entering the messaging system


Our application does not know anything about the messaging system. In fact, it will just create new orders and send them to an interface (OrderService):

Taking an initial look at the configuration, we can see that the OrderService is actually a messaging gateway.

Any order sent to the order method will be introduced to the messaging system as a Message<Order> through the 'sendOrder.input' direct channel.


3 First part - processing orders


The first part of the Spring Integration messaging flow is composed by the following components:



We use a lambda to create an IntegrationFlow definition, which registers a DirectChannel as its input channel. The name of the input channel is resolved as 'beanName + .input'. Hence, the name is the one we specified in the gateway: 'sendOrder.input'

The first thing the flow does when receiving a new order is use a transformer to convert the order into a product. To register a transformer we can use the Transformers factory provided by the DSL API. Here, we have different possibilities. The one I chose is using a PayloadTypeConvertingTransformer, which delegates to a converter the transformation of the payload into an object.

The next step in the orders flow is to store the newly created product to the database. Here, we use a MongoDB outbound adapter:

If you wonder what the message handler is actually doing internally, it uses a mongoTemplate to save the entity:


4 Second part - processing products


In this second part we have another integration flow for processing products:



In order to retrieve previously created products, we have defined an inbound channel adapter which will continuously be polling the MongoDB database:

The MongoDB inbound channel adapter is the one responsible for polling products from the database. We specify the query in the constructor. In this case, we poll one non processed product each time:

The router definition shows how the product is sent to a different service activator method depending on the 'premium' field:

As a service activator, we have a simple bean which logs a message and sets the product as processed. Then, it will return the product so it can be handled by the next endpoint in the flow.

The reason for setting the product as processed is because the next step is to update its status in the database in order to not poll it again. We save it by redirecting the flow to the mongoDb outbound channel adapter again.


5 Conclusion


You have seen what endpoints you do have to use in order to interact with a MongoDB database using Spring Integration. The outbound channel adapter passively saves products to the database, while the inbound channel adapter actively polls the database to retrieve new products.

If you found this post useful, please share it or star my repository. I appreciate it :)

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, July 19, 2016

Spring Integration - Polling file creation and modification

1 Introduction


File support is another of Spring Integration's endpoints to communicate with external systems. In this case, it provides several components to read, write and transform files. During this post, we are going to write an application which monitors a directory in order to read all files in there. In concrete it does the following:

  • When the application starts, it reads all files present in the directory.
  • The application will then keep an eye on the directory to detect new files and existing files which have been modified.

The source code can be found in Github.


2 Configuration


The application is built with Spring Boot, since it eases configuration significantly. To create the initial infrastructure of the application, you can go to https://start.spring.io/, select the Integration module and generate the project. Then you can open the zip file in your favourite IDE.

I added a couple of dependencies to the pom.xml like commons.io or Spring Integration Java DSL. My pom.xml file looks like as follows:


The starting point is FileReadDirectoryApplication:

Starting from here, we are going to add the Spring Integration components for reading from a specific folder of the filesystem.


3 Adding the adapter


In order to read from the file system, we need an inbound channel adapter. The adapter is a file reading message source, which is responsible of polling the file system directory for files and create a message from each file it finds.

We can prevent some types of files from being polled by setting a list of filters to the message source. For this example two filters have been included:

  • SimplePatternFileListFilter: Filter provided by Spring. Only files with the specified extension will be polled. In this case, only text files will be accepted.
  • LastModifiedFileFilter: Custom filter. This filter keeps track of already polled files and will filter out files not modified since the last time it was tracked.


4 Processing the files


For each polled file, we will transform its content to String before passing it to the processor. For this purpose, Spring already provides a component:


Hence, instead of receiving a Message<File>, the processor will receive a Message<String>. The file processor is our custom component which will do something as advanced as printing the file content:


5 Building the flow


Now that we have all the required components in place, let's build the flow. We are using Spring Integration Java DSL, since it makes the flow more readable:


6 Running the application


In my directory, I already have a file called 'previousFile.txt'. After starting the application, we will create two files and modify one of them.

If we run the application, we should see the following print statements:

previousFile.txt received. Content: previous content
file1.txt received. Content: content
file2.txt received. Content: another file
file1.txt received. Content: content modified


7 Conclusion


This example shows how simple it is to read files from a directory using Spring Integration, obviously with the help of Spring Boot to simplify the configuration. Depending on your needs, you can add your own custom filters to the message source, or use another one of the provided by Spring, like the RegexPatternFileListFilter. You can check for other implementations here.

If you found this post useful, please share it or star my repository :)

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, May 17, 2016

Data Aggregation Spring Data MongoDB: Nested results

1 Introduction


In a previous post, we built a basic example of an aggregation pipeline. Maybe you want to take a look at Data aggregation with Spring Data MongoDB and Spring Boot if you need more detail about how to create the project and configure the application. In this post, we will focus on learning a use case where it makes sense to group a portion of the result in a nested object.

Our test data is a collection of football players, with data about the league they belong to and how many goals they scored. The document would be like this:


It may be interesting to know how many goals were scored in each league. Also, who was the league's top goalscorer. During the following section, we are going to implement our first simple example without using nested objects.

You can find the source code of all these examples at my Github repository.

2 Basic example


We can use the following class to store each league's result:


In order to retrieve the top scorers, we will first need to sort the documents by scored goals and then group them by league. In the repository, these two phases of the pipeline are implemented in the following methods:


That should do it. Let's aggregate the results using Spring's mongoTemplate:


If we retrieve the stats of the spanish league, we get the following result:



Although this is fair enough, I don't feel comfortable with all top scorer's information scattered throughout the result class. I think it would make much more sense if we could encapsulate all scorer's data into a nested object. Fortunately, we can do that directly during the aggregation.


3 Nesting the result


Spring Data's nested method is designed to create sub-documents during the projection phase. This will allow us to create the top goalscorer class as a property of the output result class:


In the line above, a nested document called topScorer is emitted by the nested method, which will contain all the data about the current league’s top goalscorer. Its properties are mapped to the output class using the bind method (topPlayer, topGoals and topCountry).

MongoTemplate’s invocation reuses our previous sort and group operations, and then adds the projection operation:


Executing this query will result in a much more compact result, having all top goalscorer’s related data wrapped in its own class:



4 Conclusion


Spring Data MongoDB nested method is very useful for creating well structured output results from our aggregation queries. Doing this step during the aggregation helps us avoid having java code to post-process the result.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, April 12, 2016

Data aggregation with Spring Data MongoDB and Spring Boot

MongoDB aggregation framework is designed for grouping documents and transforming them into an aggregated result. The aggregation query consists in defining several stages that will be executed in a pipeline. If you are interested in more in-depth details about the framework, then mongodb docs are a good point to start.

The point of this post is to write a web application for querying mongodb in order to get aggregated results from the database. We will do it in a very easy way thanks to Spring Boot and Spring Data. Actually it is really fast to implement the application, since Spring Boot will take care of all the necessary setup and Spring Data will help us configure the repositories.

The source code can be found on my Github repository.


1 The application


Before going through the code let’s see what we want to do with our application.

Our domain is a collection of products we have distributed across several warehouses:

Our target is to collect all the products within a price range, grouped by warehouse and collecting the total revenue and the average price of each grouping.

In this example, our warehouses are storing the following products:

The application will query for products with a price between 5.0 and 70.0. The required aggregation pipeline steps will be as follows:



We will end up with aggregated results grouped by warehouse. Each group will contain the list of products of each warehouse, the average product price and the total revenue, which actually is the sum of the prices.


2 Maven dependencies


As you can see, we have a short pom.xml with Spring Boot dependencies:

By defining spring-boot-starter-parent as our parent pom, we set the default settings of Spring Boot. Mainly it sets the versions of a bunch of libraries it may use, like Spring or Apache Commons. For example, Spring Boot 1.3.3, which is the one we are using, sets 4.2.5.RELEASE as the Spring framework version. Like stated in previous posts, it is not adding libraries to our application, it only sets versions.

Once the parent is defined, we only need to add three dependencies:

  • spring-boot-starter-web: Mainly includes Spring MVC libraries and an embedded Tomcat server.
  • spring-boot-starter-test: Includes testing libraries like JUnit, Mockito, Hamcrest and Spring Test.
  • spring-boot-starter-data-mongodb: This dependency includes the MongoDB Java driver, and the Spring Data Mongo libraries.



3 Application setup


Thanks to Spring Boot, the application setup is as simple as the dependencies setup:

When running the main method, we will start our web application listening to the 8080 port.


4 The repository


Now that we have the application properly configured, we implement the repository. This isn’t difficult neither since Spring Data takes care of all the wiring.

The following test proves that our application is correctly set up.

We didn’t implement save and findOne methods. They are already defined since our repository is extending MongoRepository.


5 The aggregation query


Finally, we set up the application and explained all the steps. Now we can focus on the aggregation query.

Since our aggregation query is not a basic query, we need to implement a custom repository. The steps are:

Create the custom repository with the method we need:

Modify the first repository in order to also extend our custom repository:

Create an implementation to write the aggregation query:

Now we are going to implement the stages of the mongodb pipeline as explained in the beginning of the post.
Our first operation is the match operation. We will filter out all product documents that are beyond our price range:

The next stage of the pipeline is the group operation. In addition to grouping documents by warehouse, in this stage we are also doing the following calculations:

  • last: Returns the warehouse of the last document in the group.
  • addToSet: Collects all the unique product Ids of all the grouped documents, resulting in an array.
  • avg: Calculates the average of all prices in the group.
  • sum: Sums all prices in the group.


The last stage of the pipeline is the project operation. Here we specify the resulting fields of the aggregation:

The query is built as follows:

In the aggregate method, we indicate the input class, which is our Product document. The next argument is the output class, which is a DTO to store the resulting aggregation:

We should end the post with a test proving that results are what we expect:


6 Conclusion


Spring Data has a good integration with MongoDB aggregation framework. Adding Spring Boot to configure the application let's us focus on building the query. For the building process, Aggregation class has several static methods that help us implement the different pipeline stages.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, March 8, 2016

Grouping, transforming and reduction with Java 8

1 Introduction


In this previous post, I wrote about how we can group collections of objects with streams and grouping. This is useful but does not cover specific use cases. For example, sometimes we do not only need to group things but also transform the result into a more appropriate object.

In this post, we will learn how to apply transformations and reduction to the groupingBy result.

Here you can view the source code of the following examples.


2 Grouping by and transform


Let's take the model I used in the previous post where we had a collection of persons who owned a pet.

Now we want to know which pets belong to persons living in New York. We are asking for pets, so we can't just make a grouping since we would be returning a collection of persons. What we need to do is group persons by city and then transform the stream to a collection of pets.

For this purpose, we use mapping on the result of the group by:

In the grouping phase, we group persons by city and then perform a mapping to get each person's pet.


3 Grouping, transforming and reducing


The previous example is useful for converting groupings of objects, but maybe we don’t want to obtain the whole list for each group. In this example, we still want to group pets by its owner’s city, but this time we only want to get the oldest pet of each list.

The method collectingAndThen from Collectors allow us to make a final transformation to the result of the grouping:

After we group persons by city, in collectingAndThen we are transforming each person in each city’s list to its pet, and then applying a reduction to get the pet with the highest age in the list.


4 Conclusion


Collectors API not only allow us to group collections of things but also make transformations and reductions to obtain different objects depending on our needs.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Monday, February 29, 2016

Multi level grouping with streams

1 Introduction


With Java 8 streams it is pretty easy to group collections of objects based on different criteria. In this post, we will see how we can make from simple single level groupings to more complex, involving several levels of groupings.

We will use two classes to represent the objects we want to group by: person and pet.

Person.class


Pet.class


In the main method we create the collection we will use in the following sections.


You can take a look at the source code here.


2 Single level grouping


The simplest form of grouping is the single level grouping. In this example we are going to group all persons in the collection by their country:

If we take a look into the map, we can see how each country contains a list of its citizens:



The result shows persons living in the specified country:

Persons in USA: [Person{name='John', country='USA', city='New York'}, Person{name='Anna', country='USA', city='New York'}, Person{name='Mike', country='USA', city='Chicago'}]


3 Two level grouping


In this example, we will group not only by country but also by city. To accomplish this, we need to implement a two level grouping. We will group persons by country and for each country, we will group its persons by the city where they live.

In order to allow multi level grouping, the groupingBy method in class Collectors supports an additional Collector as a second argument:

Let’s use this method to implement our two level grouping:

If we debug the execution, we will see how people is distributed:




4 Three level grouping


In our final example, we will take a step further and group people by country, city and pet name. I have splitted it into two methods for readability:

Now we have three nested maps containing each list of persons:




5 Conclusion


The Java 8 Collectors API provides us with an easy way to group our collections. By nesting collectors, we can add different layers of groups to implement multi level groupings.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.