Tuesday, May 17, 2016

Data Aggregation Spring Data MongoDB: Nested results

1 Introduction


In a previous post, we built a basic example of an aggregation pipeline. Maybe you want to take a look at Data aggregation with Spring Data MongoDB and Spring Boot if you need more detail about how to create the project and configure the application. In this post, we will focus on learning a use case where it makes sense to group a portion of the result in a nested object.

Our test data is a collection of football players, with data about the league they belong to and how many goals they scored. The document would be like this:


It may be interesting to know how many goals were scored in each league. Also, who was the league's top goalscorer. During the following section, we are going to implement our first simple example without using nested objects.

You can find the source code of all these examples at my Github repository.

2 Basic example


We can use the following class to store each league's result:


In order to retrieve the top scorers, we will first need to sort the documents by scored goals and then group them by league. In the repository, these two phases of the pipeline are implemented in the following methods:


That should do it. Let's aggregate the results using Spring's mongoTemplate:


If we retrieve the stats of the spanish league, we get the following result:



Although this is fair enough, I don't feel comfortable with all top scorer's information scattered throughout the result class. I think it would make much more sense if we could encapsulate all scorer's data into a nested object. Fortunately, we can do that directly during the aggregation.


3 Nesting the result


Spring Data's nested method is designed to create sub-documents during the projection phase. This will allow us to create the top goalscorer class as a property of the output result class:


In the line above, a nested document called topScorer is emitted by the nested method, which will contain all the data about the current league’s top goalscorer. Its properties are mapped to the output class using the bind method (topPlayer, topGoals and topCountry).

MongoTemplate’s invocation reuses our previous sort and group operations, and then adds the projection operation:


Executing this query will result in a much more compact result, having all top goalscorer’s related data wrapped in its own class:



4 Conclusion


Spring Data MongoDB nested method is very useful for creating well structured output results from our aggregation queries. Doing this step during the aggregation helps us avoid having java code to post-process the result.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, April 12, 2016

Data aggregation with Spring Data MongoDB and Spring Boot

MongoDB aggregation framework is designed for grouping documents and transforming them into an aggregated result. The aggregation query consists in defining several stages that will be executed in a pipeline. If you are interested in more in-depth details about the framework, then mongodb docs are a good point to start.

The point of this post is to write a web application for querying mongodb in order to get aggregated results from the database. We will do it in a very easy way thanks to Spring Boot and Spring Data. Actually it is really fast to implement the application, since Spring Boot will take care of all the necessary setup and Spring Data will help us configure the repositories.

The source code can be found on my Github repository.


1 The application


Before going through the code let’s see what we want to do with our application.

Our domain is a collection of products we have distributed across several warehouses:

Our target is to collect all the products within a price range, grouped by warehouse and collecting the total revenue and the average price of each grouping.

In this example, our warehouses are storing the following products:

The application will query for products with a price between 5.0 and 70.0. The required aggregation pipeline steps will be as follows:



We will end up with aggregated results grouped by warehouse. Each group will contain the list of products of each warehouse, the average product price and the total revenue, which actually is the sum of the prices.


2 Maven dependencies


As you can see, we have a short pom.xml with Spring Boot dependencies:

By defining spring-boot-starter-parent as our parent pom, we set the default settings of Spring Boot. Mainly it sets the versions of a bunch of libraries it may use, like Spring or Apache Commons. For example, Spring Boot 1.3.3, which is the one we are using, sets 4.2.5.RELEASE as the Spring framework version. Like stated in previous posts, it is not adding libraries to our application, it only sets versions.

Once the parent is defined, we only need to add three dependencies:

  • spring-boot-starter-web: Mainly includes Spring MVC libraries and an embedded Tomcat server.
  • spring-boot-starter-test: Includes testing libraries like JUnit, Mockito, Hamcrest and Spring Test.
  • spring-boot-starter-data-mongodb: This dependency includes the MongoDB Java driver, and the Spring Data Mongo libraries.



3 Application setup


Thanks to Spring Boot, the application setup is as simple as the dependencies setup:

When running the main method, we will start our web application listening to the 8080 port.


4 The repository


Now that we have the application properly configured, we implement the repository. This isn’t difficult neither since Spring Data takes care of all the wiring.

The following test proves that our application is correctly set up.

We didn’t implement save and findOne methods. They are already defined since our repository is extending MongoRepository.


5 The aggregation query


Finally, we set up the application and explained all the steps. Now we can focus on the aggregation query.

Since our aggregation query is not a basic query, we need to implement a custom repository. The steps are:

Create the custom repository with the method we need:

Modify the first repository in order to also extend our custom repository:

Create an implementation to write the aggregation query:

Now we are going to implement the stages of the mongodb pipeline as explained in the beginning of the post.
Our first operation is the match operation. We will filter out all product documents that are beyond our price range:

The next stage of the pipeline is the group operation. In addition to grouping documents by warehouse, in this stage we are also doing the following calculations:

  • last: Returns the warehouse of the last document in the group.
  • addToSet: Collects all the unique product Ids of all the grouped documents, resulting in an array.
  • avg: Calculates the average of all prices in the group.
  • sum: Sums all prices in the group.


The last stage of the pipeline is the project operation. Here we specify the resulting fields of the aggregation:

The query is built as follows:

In the aggregate method, we indicate the input class, which is our Product document. The next argument is the output class, which is a DTO to store the resulting aggregation:

We should end the post with a test proving that results are what we expect:


6 Conclusion


Spring Data has a good integration with MongoDB aggregation framework. Adding Spring Boot to configure the application let's us focus on building the query. For the building process, Aggregation class has several static methods that help us implement the different pipeline stages.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, March 8, 2016

Grouping, transforming and reduction with Java 8

1 Introduction


In this previous post, I wrote about how we can group collections of objects with streams and grouping. This is useful but does not cover specific use cases. For example, sometimes we do not only need to group things but also transform the result into a more appropriate object.

In this post, we will learn how to apply transformations and reduction to the groupingBy result.

Here you can view the source code of the following examples.


2 Grouping by and transform


Let's take the model I used in the previous post where we had a collection of persons who owned a pet.

Now we want to know which pets belong to persons living in New York. We are asking for pets, so we can't just make a grouping since we would be returning a collection of persons. What we need to do is group persons by city and then transform the stream to a collection of pets.

For this purpose, we use mapping on the result of the group by:

In the grouping phase, we group persons by city and then perform a mapping to get each person's pet.


3 Grouping, transforming and reducing


The previous example is useful for converting groupings of objects, but maybe we don’t want to obtain the whole list for each group. In this example, we still want to group pets by its owner’s city, but this time we only want to get the oldest pet of each list.

The method collectingAndThen from Collectors allow us to make a final transformation to the result of the grouping:

After we group persons by city, in collectingAndThen we are transforming each person in each city’s list to its pet, and then applying a reduction to get the pet with the highest age in the list.


4 Conclusion


Collectors API not only allow us to group collections of things but also make transformations and reductions to obtain different objects depending on our needs.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Monday, February 29, 2016

Multi level grouping with streams

1 Introduction


With Java 8 streams it is pretty easy to group collections of objects based on different criteria. In this post, we will see how we can make from simple single level groupings to more complex, involving several levels of groupings.

We will use two classes to represent the objects we want to group by: person and pet.

Person.class


Pet.class


In the main method we create the collection we will use in the following sections.


You can take a look at the source code here.


2 Single level grouping


The simplest form of grouping is the single level grouping. In this example we are going to group all persons in the collection by their country:

If we take a look into the map, we can see how each country contains a list of its citizens:



The result shows persons living in the specified country:

Persons in USA: [Person{name='John', country='USA', city='New York'}, Person{name='Anna', country='USA', city='New York'}, Person{name='Mike', country='USA', city='Chicago'}]


3 Two level grouping


In this example, we will group not only by country but also by city. To accomplish this, we need to implement a two level grouping. We will group persons by country and for each country, we will group its persons by the city where they live.

In order to allow multi level grouping, the groupingBy method in class Collectors supports an additional Collector as a second argument:

Let’s use this method to implement our two level grouping:

If we debug the execution, we will see how people is distributed:




4 Three level grouping


In our final example, we will take a step further and group people by country, city and pet name. I have splitted it into two methods for readability:

Now we have three nested maps containing each list of persons:




5 Conclusion


The Java 8 Collectors API provides us with an easy way to group our collections. By nesting collectors, we can add different layers of groups to implement multi level groupings.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Monday, July 13, 2015

Understanding Callable and Spring DeferredResult

1- Introduction


Asynchronous support introduced in Servlet 3.0 offers the possibility to process an HTTP request in another thread. This is specially interesting when you have a long running task, since while another thread processes this request, the container thread is freed and can continue serving other requests.

This topic has been explained many times, but there seems to be a little bit of confusion regarding those classes provided by the Spring framework which take advantage of this functionality. I am talking about returning Callable and DeferredResult from a @Controller.

In this post I will implement both examples in order to show its differences.

All the examples shown here consist on implementing a controller which will execute a long running task, and then return the result to the client. The long running task is processed by the TaskService:

The web application is built with Spring Boot. We will be executing the following class to run our examples:

The source code with all these examples can be found at the Github Spring-Rest repository.


2- Starting with a blocking controller


In this example, a request arrives to the controller. The servlet thread won't be released until the long running method is executed and we exit the @RequestMapping annotated method.

If we run this example at http://localhost:8080/block, looking at the logs, we can see that the servlet request is not released until the long running task has been processed (5 seconds later):

2015-07-12 12:41:11.849  [nio-8080-exec-6] x.s.web.controller.BlockingController    : Request received
2015-07-12 12:41:16.851  [nio-8080-exec-6] x.spring.web.service.TaskServiceImpl     : Slow task executed
2015-07-12 12:41:16.851  [nio-8080-exec-6] x.s.web.controller.BlockingController    : Servlet thread released



3- Returning Callable


In this example, instead of returning directly the result, we will return a Callable:

Returning Callable implies that Spring MVC will invoke the task defined in the Callable in a different thread. Spring will manage this thread by using a TaskExecutor. Before waiting for the long task to finish, the servlet thread will be released.

Let's take a look at the logs:

2015-07-12 13:07:07.012  [nio-8080-exec-5] x.s.w.c.AsyncCallableController          : Request received
2015-07-12 13:07:07.013  [nio-8080-exec-5] x.s.w.c.AsyncCallableController          : Servlet thread released
2015-07-12 13:07:12.014  [      MvcAsync2] x.spring.web.service.TaskServiceImpl     : Slow task executed

You can see that we have returned from the servlet before the long running task has finished executing. This doesn't mean the client has received a response. The communication with the client is still open waiting for the result, but the thread that received the request has been released and can serve another client's request.


4- Returning DeferredResult


First, we need to create a DeferredResult object. This object will be returned by the controller. What we will accomplish is the same with Callable, to release the servlet thread while we process the long running task in another thread.

So, what's the difference from Callable? The difference is this time the thread is managed by us. It is our responsibility to set the result of the DeferredResult in a different thread.

What we have done in this example, is to create an asynchronous task with CompletableFuture. This will create a new thread where our long running task will be executed. Is in this thread where we will set the result.

From which pool are we retrieving this new thread? By default, the supplyAsync method in CompletableFuture will run the task in the ForkJoin pool. If you want to use a different thread pool, you can pass an executor to the supplyAsync method:

If we run this example, we will get the same result as with Callable:

2015-07-12 13:28:08.433  [io-8080-exec-10] x.s.w.c.AsyncDeferredController          : Request received
2015-07-12 13:28:08.475  [io-8080-exec-10] x.s.w.c.AsyncDeferredController          : Servlet thread released
2015-07-12 13:28:13.469  [onPool-worker-1] x.spring.web.service.TaskServiceImpl     : Slow task executed


5- Conclusion


At a high level view, Callable and DeferredResult do the same exact thing, which is releasing the container thread and processing the long running task asynchronously in another thread. The difference is in who manages the thread executing the task.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Tuesday, April 14, 2015

Configure a Spring JMS application with Spring Boot and annotation support

1   Introduction


In previous posts we learned how to configure a project using Spring JMS. If you check the article introduction to messaging with Spring JMS, you will notice that it is configured using XML. This article will take advantage of the improvements introduced in Spring 4.1 version, and configure a JMS project using Java config only.

In this example we will also see how easy it can be to configure the project by using Spring Boot.

Before we get started, just note that as usual, you can take a look at the source code of the project used in the examples below.

See the example project at github.

Sections:
  1. Introduction.
  2. The example application.
  3. Setting up the project.
  4. A simple example with JMS listener.
  5. Sending a response to another queue with @SendTo.
  6. Conclusion.

2   The example application


The application uses a Client service to send orders to a JMS queue, where a JMS listener will be registered and handle these orders. Once received, the listener will store the order through the Store service:



We will use the Order class to create orders:

Before moving on to the first example, we will first explore how the project structure is built.


3   Setting up the project


3.1   Configuring pom.xml

The first thing to do is to define the artifact spring-boot-starter-parent as our parent pom.

This parent basically sets several Maven defaults and provides the dependency management for the main dependencies that we will use, like the Spring version (which is 4.1.6).

It is important to note that this parent pom defines the version of many libraries but it does not add any dependency to our project. So don’t worry about getting libraries you won’t use.

The next step is to set the basic dependencies for Spring Boot:

In addition to the core Spring libraries, this dependency will bring the auto configuration functionality of Spring Boot. This will allow the framework to try to automatically set up the configuration based on the dependencies you add.

Finally, we will add the Spring JMS dependency and the ActiveMQ message broker, leaving the whole pom.xml as follows:

3.2   Spring Configuration with Java Config

We used @SpringBootApplication instead of the usual @Configuration annotation. This Spring Boot annotation is also annotated with @Configuration. In addition, it sets other configuration like Spring Boot auto configuration:

The configuration class does not need to define any bean. All the configuration is automatically set by Spring Boot. Regarding the connection factory, Spring Boot will detect that I included the ActiveMQ dependency on the classpath and will start and configure an embedded broker.

If you need to specify a different broker url, you can declare it in the properties. Check ActiveMQ support section for further detail.

It is all set now. We will see how to configure a JMS listener in the example in the next section, since it is configured with an annotation.


4   A simple example with JMS listener


4.1   Sending an order to a JMS queue

The ClientService class is responsible for sending a new order to the JMS queue. In order to accomplish this, it uses a JmsTemplate:

Here, we use a JmsTemplate to convert our Order instance and send it to the JMS queue. If you prefer to directly send a message through the send message, you can instead use the new JmsMessagingTemplate. This is preferable since it uses the more standardized Message class.


4.2   Receiving an order sent to the JMS queue

Registering a JMS listener to a JMS listener container is as simple as adding the @JmsListener annotation to the method we want to use. This will create a JMS listener container under the covers that will receive messages sent to the specified queue and delegate them to our listener class:

The StoreService receives the order and saves it to a list of received orders:

4.3   Testing the application

Now let’s add a test to check if we did everything correctly:


5   Sending a response to another queue with @SendTo


Another addition to Spring JMS is the @SendTo annotation. This annotation allows a listener to send a message to another queue. For example, the following listener receives an order from the “in.queue” and after storing the order, sends a confirmation to the “out.queue”.

There, we have another listener registered that will process this confirmation id:


6   Conclusion


With annotation support, it is now much easier to configure a Spring JMS application, taking advantage of asynchronous message retrieval using annotated JMS listeners.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

Sunday, March 8, 2015

Improving performance: non-blocking processing of streams

1   Introduction

Imagine we have an application that needs to access an external web service in order to gather information about clients and then process it. More specifically, we can’t get all this information in a single invocation. If we want to look up different clients, we will need several invocations.

As shown in the graphic below, the example application will retrieve information about several clients, group them in a list and then process it to calculate the total amount of its purchases:


In this post, we will see different ways of gathering the information and which one is the best in terms of performance.

This is a Java related post. However, we will use the Spring framework to invoke a RESTful web service.

Sections:
  1. Introduction
  2. Explaining the example
  3. First attempt: Sequential stream
  4. Improving performance: Parallel stream
  5. Non-blocking processing with CompletableFuture
  6. Conclusion

The source code can be found at the Java 8 GitHub repository.

Additionally, you can access the source code of the web application exposing the RESTful web service at this repository.


2   Explaining the example

In our application, we have a list of 20 ids representing clients we want to retrieve from a web service. After retrieving all the clients, we will look up at what did every client purchase and sum them up to compute what is the total amount of money spent by all the clients.

There is one problem though, this web service only allows to retrieve one client at each invocation, so we will need to invoke the service twenty times. In addition, the web service is a little bit slow, taking at least two seconds to respond to a request.

If we take a look at the application implementing the web service, we can see that invocations are handled by the ClientController class:

A Thread.sleep is used to simulate the slowness in responding.

The domain class (Client) contains the information we need; how much money has a client spent:


3   First attempt: Sequential stream

In this first example we will sequentially invoke the service to get the information of all twenty clients:

Output:
Sequential | Total time: 42284 ms
Total purchases: 20.0

The execution of this program takes 42 seconds approximately. This is too much time. Let’s see if we can improve its performance.


4   Improving performance: Parallel stream

Java 8 allows us to split a stream into chunks and process each one in a separate thread. What we need to do is simply create the stream in the previous example as a parallel stream.

You should take into account that each chunk will be executed in its thread asynchronously, so the order in which the chunks are processed must not matter. In our case, we are summing the purchases, so we can do it.

Let’s try this:

Output:
Parallel | Total time: 6336 ms
Total purchases: 20.0

Wow, that’s a big improvement! But what does this number come from?

Parallel streams internally use the ForkJoinPool, which is the pool used by the ForkJoin framework introduced in Java 7. By default, the pool uses as many threads as your machine's processors can handle. My laptop is a quad core that can handle 8 threads (you can check this by invoking Runtime.getRuntime.availableProcessors), so it can make 8 invocations to the web service in parallel. Since we need 20 invocations, it will need at least 3 "rounds":


Ok, so from 40 seconds to 6 is quite a good improvement but, can we still improve it further? The answer is yes.


5   Non-blocking processing with CompletableFuture

Let’s analise the previous solution.

We send 8 threads invoking each one the web service, but while the service is processing the request (two whole seconds), our processors are doing nothing but waiting (this is a IO operation). Until these requests don’t come back, we won’t be able to send more requests.

The question is, what if we could send all 20 requests asynchronously, freeing our processors and process each response when is available? This is where CompletableFuture comes to the rescue:

Output:
Async with executor | Total time: 2192 ms
Total purchases: 20.0

It took a third of the time spent in the previous example.

We sent all 20 requests at the same time, so the time spent in IO operations is spent only once. As soon as responses come by, we process them quickly.

It is important the use of the executor service, set as an optional second parameter of the supplyAsync method. We specified a pool of a hundred threads so we could send 100 requests at the same time. If we don’t specify an executor, the ForkJoin pool will be used by default.

You can try to remove the executor and you will see the same performance as in the parallel example.


6   Conclusion

We have seen that when executing operations that do not involve computing (like IO operations) we can use the CompletableFuture class to take advantage of our processors and improve the performance of our applications.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.