Monday, 19 February 2018

Distributed iteration improvements

Infinispan hasn't always provided a way for iterating upon entries in a distributed cache. In fact the first iteration wasn't until Infinispan 7. Then in Infinispan 8, with the addition of Java 8, we fully integrated this into distributed streams, which brought some minor iteration improvements in performance.

We are proud to announce that with Infinispan 9.2 there are even more improvements. This contains no API changes, although those will surely come in the future. This one is purely for performance and utilization.

New implementation details


There are a few different aspects that have been changed.  A lot of these revolve around the amount of entries being retrieved at once, which if you are familiar with DistributedStreams can be configured via the distributedBatchSize method. Note that if this is not specified it defaults to the chunk size in state transfer.

Entry retrieval is now pull based instead of push

Infinispan core (embedded) has added rxjava2 and reactive streams as dependencies and rewrote all of the old push style iterator code over to pull style to fully utilize the Publisher and Subscriber interfaces.

With this we only pull up to the batchSize in entries at a time from any set of nodes. The old style utilized push with call stack blocking, which could return up two times the amount of entries. Also since we aren't performing call stack blocking, we don't have to waste threads as these calls to retrieve entries are done async and finish very quickly irrespective of user interaction. The old method required multiple threads to be reserved for this purpose.

Streamed batches

The responses from a remote node are written directly to the output stream so there are no intermediate collections allocated. This means we only have to iterate upon the data once as we retain the iterator between requests. On the originator we still have to store the batches in a collection to be enqueued for the user to pull.

Rewritten Parallel Distribution

Great care was taken to implement parallel distribution in a way to vastly reduce contention and ensure that we properly follow the batchSize configuration.

When parallel distribution is in use the new implementation will start 4 remote node requests sharing the batch size (so each one gets 1/4). This way we can guarantee that we only have the desired size irrespective of the number of nodes in the cluster. The old implementation would request batchSize from all nodes at the same time. So not only did it reserve a thread for node but could easily swamp your JVM memory, causing OutOfMemoryErrors (which no one likes). The latter alone made us force the default to be sequential distribution when using an iterator.

The old implementation would write entries from all nodes (including local) to the same shared queue. The new implementation has a different queue for each request, which allows for faster queues with no locking to be used.

Due to these changes and other isolations between threads, we can now make parallel distribution the default setting for the iterator method. And as you will see this has improved performance nicely.


We have written a JMH test harness specifically for this blog post, testing 9.1.5.Final build against latest 9.2.0.SNAPSHOT. The test runs by default with 4GB of heap with 6 nodes in a distributed cache with 2 owners. It has varying entry count, entry sizes and distributed batch sizes.

Due to the variance in each test a large number of tests were ran and with different permutations to make sure it covered a large amount of test cases. The JMH test that was ran can be found at github. All the default settings were used for the run except -t4 (runs with 4 worker threads) was provided. This was all ran on my measly laptop (i7-4810MQ and 16 GB) - maxing out the CPU was not a hard task.

CAVEAT: The tests don't do anything with the iterator and just try to pull them as fast as they can. Obviously if you have a lot of processing done between iterations you will likely not see as good of a performance increase.

The entire results can be found here. It shows each permutation and how many operations per second and finds the difference (green shows 5% or more and red shows -5% or less).

Operation Average Gain Code
Specified Distribution Mode 3.5% .entrySet().stream().sequentialDistribution.iterator()
Default 11% .entrySet().iterator()
No Rehash 14% .entrySet().stream().disableRehashAware().iterator()

The above 3 rows show a few different ways you could have been invoking the iterator method. The second row is probably by far the most used case. In this case you should see around a 11% increase in performance (results will vary). This is due to the new pulling method as well as parallel distribution becoming the new default running mode. It is unlikely a user was using the other 2 methods, but are provided for a more complete view.

If you were specifying a distribution mode manually, either sequential or distribution you will only see a few percent faster run (3.5%), but every little bit helps! Also if you can switch to parallel you may want to think about doing so.

Also you can see if you were running with rehash disabled prior, it has even more gains (14%). Those don't even include the fact that no rehash was 28% faster than with before (which means it is about 32% faster in general now). So if you can get away with a at most once guarantee, disabling rehash will provide the best throughput.

Whats next? 

As was mentioned this is not exposed to the user directly. You still interact with the iterator as you would normally. We should remedy this at some point.

Expose new method

We would love to eventually expose a method to return a Publisher directly to the user so that they can get the full benefits of having a pull based implementation underneath.

This way any intermediate operations applied to the stream before would be distributed and anything applied to the Publisher would be done locally. And just like the iterator method this publisher would be fully rehash aware if you have it configured to do so and would make sure you get all entries delivered in an exactly once fashion (rehash disabled guarantees at most once).

Another side benefit is that the Subscriber methods could be called on different threads so there is no overhead required on the ISPN side for coordinating these into queue(s). Thus the Subscriber should be able to retrieve all entries faster than just doing an iterator.

Java 9 Flow

Also many of you may be wondering why we aren't using the new Flow API introduced in Java 9. Luckily the Flow API is a 1:1 conversion of reactive streams. So whenever Infinispan will start supporting Java 9 interfaces/classes, we hope to properly expose these as the JDK classes.

Segment Based Iteration 

With Infinispan 9.3, we hope to introduce data container and cache store segment aware iteration. This means when iterating over either we would only have to process entries that map to a given segment. This should reduce the time and processing for iteration substantially, especially for cache stores. Keep your eyes out for a future blog post detailing these as 9.3 development commences.

Give us Feedback

We hope you find a bit more performance when working with your distributed iteration. Also we value any feedback on what you want our APIs to look like or find any bugs. As always let us know at any of the places listed here.

No comments:

Post a Comment