We also enhanced parallel execution of map/reduce functions at each node and improved handling of intermediate results. Users can now specify custom intermediate cache for a particular MapReduceTask.
Even though map and reduce phases are sequential we can still execute the map and reduce phases themselves in parallel. If you recall, although Infinispan 6 executes map and reduce phases on all nodes in parallel, execution on each node itself is single-threaded. Similarly, reduce phase although executed on multiple nodes in parallel, each node executes its portion of the reduction on a single thread.
Since we baselined Infinispan 7 on JDK 7, we decided to experiment with fork/join threading framework for parallel execution of both map and reduce phases . If you recall fork/join framework enables high performance, parallel, fine-grained task execution in Java. Although parallel, recursively decomposable tasks are well suited for fork/join framework it may come as a surprise that parallel iteration of entries in arrays, maps and other collections represents a good fit as well. And do we have a well-suited candidate for parallel fork/join iteration - cache's data container itself! In fact, most of the work is related to iterating entries from the data container and invoking map/combine and reduce function on those entries.
Map/combine phase is particularly interesting. Even if we use the fork/join framework map and combine phases are distinct and until now - serially executed. Having serial execution of map and combine is not the only downside as map phase can be rather memory intensive. After all, it has to store all intermediate results into provided collectors. Combine phase takes produced intermediate values for a particular key and combines it into a single intermediate value. Therefore, it would very useful to periodically invoke combine on map produced keys/values thus limiting the total amount of memory used for map phase. So the question is how do we execute map/combine in parallel efficiently thus speeding up execution and at the same time limiting the memory used? We found the answer in producer/consumer threading paradigm.
In our case producers are fork/join threads that during map phase iterate key/value data container and invoke Mapper's map function. Map function transformation produces intermediate results stored into the Infinispan provided queue of collectors. Consumers are also fork/join threads that invoke combine function on key/value entries in those collectors. Note that this way map/combine phase execution itself becomes parallel, and phases of mapping and combing are no longer serial. In the end, we have notably lowered memory usage and significantly improved overall speed execution of map/combine algorithm at the same time.
Performance lab results
As you can see from the graphs below we have improved performance for all cache value sizes. We were not able to significantly improve throughput for the largest 1MB and 2MB cache values. For all other cache value sizes, we have seen on average five-fold throughput improvement. As throughput improvement is directly proportional to MapReduceTask speed of execution improvement, our users should expect their MapReduceTasks to execute, on average, five times faster in Infinispan 7 than in Infinispan 6.
 We back ported relevant classes so users can still run Infinispan 7 on JVM 6