Monday, 28 July 2014

Upcoming Infinispan 7.0.0 Map/Reduce is blazing fast


Our enthusiasm about Infinispan Map/Reduce implementation has been a driving impetus for new features and spectacular performance improvements we have achieved in the past months. As we approach the final Infinispan 7 release, we can not keep quiet about these improvements any longer. We wanted to share the most significant new Map/Reduce features as well as a rather important performance improvement along with the details on how we achieved it.

New features

In the new features category, the most notable is a scalability improvement that allows storage of MapReduceTask's results in a distributed cache instead of returning results to the calling application. Infinispan now gives users the option to specify a target cache to store the results of an executed MapReduceTask. The results are available after the execute method (which is synchronous) completes. This new variant of the execute method prevents the master JVM node from exceeding its allowed maximum heap size.  Users could, for example, utilize the new execute method if objects that are the results of the reduce phase have a large memory footprint or if multiple MapReduceTasks are concurrently executing on the master task node. We have provided two variants of the new execute method:

We also enhanced parallel execution of map/reduce functions at each node and improved handling of intermediate results. Users can now specify custom intermediate cache for a particular MapReduceTask.

Performance improvements

Infinispan 7 makes a rather big leap from a single threaded to a parallel execution model of both map and reduce phases on each Infinispan grid node. The final result of this change is on average fivefold faster execution of your typical MapReduceTask.

Even though map and reduce phases are sequential we can still execute the map and reduce phases themselves in parallel. If you recall, although Infinispan 6 executes map and reduce phases on all nodes in parallel, execution on each node itself is single-threaded. Similarly, reduce phase although executed on multiple nodes in parallel, each node executes its portion of the reduction on a single thread.

Since we baselined Infinispan 7 on JDK 7, we decided to experiment with fork/join threading framework for parallel execution of both map and reduce phases [2]. If you recall fork/join framework enables high performance, parallel, fine-grained task execution in Java. Although parallel, recursively decomposable tasks are well suited for fork/join framework it may come as a surprise that parallel iteration of entries in arrays, maps and other collections represents a good fit as well. And do we have a well-suited candidate for parallel fork/join iteration - cache's data container itself! In fact, most of the work is related to iterating entries from the data container and invoking map/combine and reduce function on those entries.

Map/combine phase is particularly interesting. Even if we use the fork/join framework map and combine phases are distinct and until now - serially executed. Having serial execution of map and combine is not the only downside as map phase can be rather memory intensive. After all, it has to store all intermediate results into provided collectors. Combine phase takes produced intermediate values for a particular key and combines it into a single intermediate value. Therefore, it would very useful to periodically invoke combine on map produced keys/values thus limiting the total amount of memory used for map phase. So the question is how do we execute map/combine in parallel efficiently thus speeding up execution and at the same time limiting the memory used? We found the answer in producer/consumer threading paradigm.

In our case producers are fork/join threads that during map phase iterate key/value data container and invoke Mapper's map function. Map function transformation produces intermediate results stored into the Infinispan provided queue of collectors. Consumers are also fork/join threads that invoke combine function on key/value entries in those collectors. Note that this way map/combine phase execution itself becomes parallel, and phases of mapping and combing are no longer serial. In the end, we have notably lowered memory usage and significantly improved overall speed execution of map/combine algorithm at the same time.

Performance lab results

Although initial performance results were more than promising, we were not satisfied. The throughput peaked for 32KB cache values but was much lower for larger and smaller values in our tests. We went back to the drawing board and devised the above-described map/combine algorithm using fork/join framework and producer/consumer approach. This time the results from the performance lab were excellent. For more details on performance tests and hardware used please refer to [1].

As you can see from the graphs below we have improved performance for all cache value sizes. We were not able to significantly improve throughput for the largest 1MB and 2MB cache values. For all other cache value sizes, we have seen on average five-fold throughput improvement. As throughput improvement is directly proportional to MapReduceTask speed of execution improvement, our users should expect their MapReduceTasks to execute, on average, five times faster in Infinispan 7 than in Infinispan 6.

[2] We back ported relevant classes so users can still run Infinispan 7 on JVM 6

No comments:

Post a Comment