Monday, 25 August 2014

Partitioned clusters tell no lies!

The problem

You are happily running a 10-node cluster. You want failover and speed and are using distributed mode with 2 copies of data for each key (numOwners=2). But disaster strikes: a switch in your network crashes and 5 of your nodes can't reach the other 5 anymore ! Now there are two independent clusters, each containing 5 nodes, which we are smartly going to name P1 and P2. Both P1 and P2 continue to serve user requests (puts and gets) as usual.

This cluster split in two or more parts is called partitioning or split brain. And it's bad for business, as in really bad ! Bob and Alice share a bank account stored in the cache. Bob updates his account on P1, then Alice reads it from P2: she sees a stale value of Bob's account (or even no value for Bob's account, depending on how the split looks like). This is a consistency issue, as there's an inconsistent view of the data between the two partitions.

Our solution

In Infinispan 7.0.0.Beta1 we added support for reacting to split brains: if nodes leave, Infinispan acknowledges that data might have been lost and denies user access to such data. We won't deny access to all the data, but just the data that might have been affected by the partitioning. Or, more formally: Infinispan sacrifices data availability in order to offer consistency (PC in Brewer's CAP theorem). For now partition handling is disabled by default, however we do intend to make it the default in an upcoming release: running with partition handling off is like running with scissors: do it at your own risk and only if you (don't) know what you're doing.

How we do it

A partition is assumed to happen when numOwners or more nodes disappear at the same time. When this happen two (or more) partitions form which are not aware of each other. Each such partition does not start a rebalance, but enters in degraded mode:
  • request (read and writes) for entries that have all the copies on nodes within this partition are honored
  • requests for entries that are partially or totally owned by nodes that disappeared are rejected through an AvailabilityException
To exemplify, consider the initial cluster C0={A,B,C,D}, A,B,C,D - nodes, configured in distributed mode with numOwners=2. Further on, the cluster contains k1, k2 and k3 keys such that owners(k1) = {A,B}, owners(k2) = {B,C} and owners(k3) = {C,D}. Then a partition happens C1={A,B} and C2={C,D}, the degraded mode exhibits the following behavior:
  • on C1, k1 is available for read/write, k2 (partially owned) and k3 (not owned) are not available and accessing them results in an AvailabilityException
  • on C2, k1 and k2  are not available for read/write, k3 is available
A relevant aspect of the partition handling process is the fact that when a split brain happens, the resulting partitions rely on the original consistent hash function (the one that existed before the split brain) in order to calculate key ownership. So it doesn't matter if k1, k2 or k3 already exists in the cluster or not, as the availability is strictly determined by the consistent hash and not by the key existence.
If at a further point in time the initial partition C0 forms again as a result of the network healing and C1 and C2 partitions being merged back together, then C0 exists the degraded mode becoming fully available again.

Configuration for partition handling functionality

In order to enable partition handling within the XML configuration:

The same can be achieved programmatically:

The actual implementation is work in progress and Beta2 will contain further improvements which we will publish here!

Mircea Markus

Wednesday, 20 August 2014

Hot Rod Remote Events #2: Filtering events

This blog post is the second in a series that looks at the forthcoming Hot Rod Remote Events functionality included in Infinispan 7.0. In the first blog post we looked at how to get started receiving remote events from Hot Rod servers. This time we are going to focus on how to filter events directly in the server.

Sending events to remote clients has a cost which increases as the number of clients. The more clients register remote listeners, the more events the server has to send. This cost also goes up as the number of modifications are executed against the cache. The more cache modifications, the more events that need to be sent.

A way to reduce this cost is by filtering the events to send server-side. If at the server level custom code decides that clients are not interested in a particular event, the event does not even need to leave the server, improving the overall performance of the system.

Remote event filters are created by implementing a org.infinispan.filter.KeyValueFilterFactory class. Each factory must have a name associated to it via the org.infinispan.filter.NamedFactory annotation.

When a listener is added, we can provide the name of a key value filter factory to use with this listener, and when the listener is added, the server will look up the factory and invoke getKeyValueFilter method to get a org.infinispan.filter.KeyValueFilter class instance to filter events server side.

Filtering can be done based on key or value information, and even based on cached entry metadata. Here's a sample implementation which will filter key "2" out of the events sent to clients:

Plugging the server with this key value filter requires deploying this filter factory (and associated filter class) within a jar file including a service definition inside the META-INF/services/org.infinispan.filter.KeyValueFilterFactory file:

With the server plugged with the filter, the next step is adding a remote client listener that will use this filter. For this example, we'll extend the EventLogListener implementation provided in the first blog post in the series and we override the @ClientListener annotation to indicate the filter factory to use with this listener:

Next, we add the listener via the RemoteCache API and we execute some operations against the remote cache:

If we checkout the system output we'll see that the client receives events for all keys except those that have been filtered:

Finally, with Hot Rod remote events we have tried to provide additional flexibility at the client side, which is why when adding client listeners, users can provide parameters to the filter factory so that filter instances with different behaviours can be generated out of a single filter factory based on client side information. To show this in action, we are going to enhance the filter factory above so that instead of filtering on a statically given key, it can filter dynamically based on the key provided when adding the listener. Here's the revised version:

Finally, here's how we can now filter by "3" instead of "2":

And the output:

To summarise, we've seen how Hot Rod remote events can be filtered providing key/value filter factories that can create instances that filter which events are sent to clients, and how these filters can act on client provided information.

In the next blog post, we'll look at how to customize remote events in order to reduce the amount of information sent to the clients, or on the contrary, provide even more information to our clients.


Tuesday, 12 August 2014

Hot Rod Remote Events #1: Getting started

Shortly after the first Hot Rod server implementation was released in 2010, ISPN-374 was created requesting cache events to be forwarded back to connected clients. Even though embedded caches have had access to these events since Infinispan's first release, propagating them to remote clients has taken a while, due to the increased complexity involved.

For Infinispan 7.0, we've finally addressed this. This is the first post in a series that looks at Hot Rod Remote Events and the different functionality we've implemented for this release. On this first post, we show you how to get started with Hot Rod Remote Events with the most basic of examples:

Start by downloading the Server distribution for the latest 7.0 (or higher) release from Infinispan's download page. The server contains the Hot Rod server with which the client will communicate. Once downloaded, start it up running the following from the root of the server:


Next up, we need to write a little application that interacts with the Hot Rod server. If you're using Maven, create an application with this dependency, changing version to 7.0.0.Beta1 or higher:

If not using Maven, adjust according to your chosen build tool or download the all distribution with all Infinispan jars.

With the application dependencies in place, we need to start to write the client application. We'll start with a simple remote event listener that simply logs all events received:
Now it's time to write a simple main java class which adds the remote event listener and executes some operations against the remote cache:

Once executed, we should see a console console output similar to this:

As you can see from the output, by default events come with the key and the internal data version associated with the current value. The actual value is not shipped back to the client for performance reasons. Clearly, receiving remote events has a cost, and as the cache size increases and more operations are executed, more events will be generated. To avoid inundating Hot Rod clients, remote events can either be filtered server side, or the event contents can be customised. In the next blog post in this series, we will see this functionality in action.


Monday, 11 August 2014

Infinispan 7.0.0.Beta1 is out!

Dear Community,

It is our pleasure to announce the first Beta release of Infinispan 7.0.0!

The release highlights are:

* Initial support for Cluster partitions (aka Split Brain) ISPN-263
* Added JGroups SASL support to Infinispan Server ISPN-4303
* Upgraded internal JGroups version to 3.5.0.CR2 ISPN-4609
* Map/Reduce tasks no longer have a timeout by default ISPN-4618
* Merge batching configuration with transaction modes to prevent ambiguity ISPN-4197
* Improve server test suite run times ISPN-4317
* Multiple improvements and bug fixes!

For a complete list of features and bug fixes included in this release please refer to the release notes.  Visit our downloads section to find the latest release.

If you have any questions please check our forums, our mailing lists or ping us directly on IRC.


The Infinispan team.

Wednesday, 30 July 2014

Cross site replication with JBoss Data Grid

Are you afraid the Death Star or the Vogon Constructor Fleet are going to blow up your data on the Earth ? 
Navin Surtani has recorded a nice video demonstrating the use of JBoss Data Grid Cross-Site Replication and how it can be used to keep backups of your precious data off-site.

The code for the demo is also available. Enjoy.

Tuesday, 29 July 2014

Infinispan Security #3: HotRod authentication

Let's continue our excursus into the security features that are being developed within Infinispan 7.0 by having a look at how our high-performance cache remoting protocol HotRod was enhanced to support authentication.

Since Infinispan 5.3, HotRod has featured SSL/TLS support which, aside from encryption, also provides some form of authentication by optionally requiring client certificates. While this does indeed stop unauthorized clients from connecting to a remote cache, the level of access-control ends there. Now that we have full role-based authorization checks at the cache and container level, we want to be able to recognize users and map their roles accordingly.

As usual, we didn't want to reinvent the wheel, but leverage existing security frameworks and integrate them into our existing platform. For this reason, the protocol chosen to implement HotRod authentication is SASL, which is in widespread use in other connection-oriented transports (e.g. LDAP, Memached, etc).

Using SASL we can support the following authentication mechanisms out of the box (since they are part of the standard JDK/JRE):
  • PLAIN where credentials are exchanged in clear-text (insecure, but easieast to setup)
  • DIGEST-MD5 where credentials are hashed using server-provided nonces
  • GSSAPI where clients can use Kerberos tokens
  • EXTERNAL where the client-certificate identity of the underlying transport is used as the credentials
More SASL mechanisms can be plugged in by using the Java Cryptography Archictecture (JCA).

Since our preferred server distribution is based on a stripped-down WildFly server, we are essentially reusing the Security Realms of the container. This gives us the ability to validate users and to also retrieve group membership. against a number of sources (property files, LDAP, etc).

The following is an example server configuration which uses the ApplicationRealm to authenticate and authorize users. Since the <identity-role-mapper> is in use, role names will be mapped 1:1 from the realm into Infinispan roles.
The HotRod endpoint is using the SASL PLAIN mechanism. Note that two caches have been defined: the default cache, without authorization, and a secured cache, which instead requires authorization. This means that remote clients can access the default cache anonymously, but they will need to authenticate if they want to access the secured cache.
The following bit of code explains how to use the HotRod Java client to connect to the secured cache defined above:
All of the above is already available in Infinispan 7.0.0.Alpha5, so head on over to the download page to experience the goodness.

Monday, 28 July 2014

Upcoming Infinispan 7.0.0 Map/Reduce is blazing fast


Our enthusiasm about Infinispan Map/Reduce implementation has been a driving impetus for new features and spectacular performance improvements we have achieved in the past months. As we approach the final Infinispan 7 release, we can not keep quiet about these improvements any longer, and we wanted to share the most significant new Map/Reduce features as well as a rather important performance improvement along with the details on how we achieved it.

New features

In the new features category, the most notable is a scalability improvement that allows storage of MapReduceTask's results in a distributed cache instead of returning results to the calling application. Infinispan now gives users the option to specify a target cache to store the results of an executed MapReduceTask. The results are available after the execute method (which is synchronous) completes. This new variant of the execute method prevents the master JVM node from exceeding its allowed maximum heap size.  Users could, for example, utilize the new execute method if objects that are the results of the reduce phase have a large memory footprint or if multiple MapReduceTasks are concurrently executing on the master task node. We have provided two variants of the new execute method:

We also enhanced parallel execution of map/reduce functions at each node and improved handling of intermediate results. Users can now specify custom intermediate cache to be used for a particular MapReduceTask.

Performance improvements

Infinispan 7 makes a rather big leap from a single threaded to a parallel execution model of both map and reduce phases on each Infinispan grid node. The final result of this change is on average fivefold faster execution of your typical MapReduceTask.

Even though map and reduce phases are sequential we can still execute the map and reduce phases themselves in parallel. If you recall, although Infinispan 6 executes map and reduce phases on all nodes in parallel, execution on each node itself is single-threaded. Similarly, reduce phase although executed on multiple nodes in parallel, each node executes its portion of reduction on a single thread.

Since we baselined Infinispan 7 on JDK 7, we decided to experiment with fork/join threading framework for parallel execution of both map and reduce phases [2]. If you recall fork/join framework enables high performance, parallel, fine-grained task execution in Java. Although parallel, recursively decomposable tasks are well suited for fork/join framework it may come as a surprise that parallel iteration of entries in arrays, maps and other collections represents a good fit as well. And do we have a well suited candidate for parallel fork/join iteration - cache's data container itself. In fact, most of the work is related to iterating entries from the data container and invoking map/combine and reduce function on those entries.

Map/combine phase is particularly interesting. Even if we use fork/join framework map and combine phases are distinct and until now - serially executed. Having serial execution of map and combine is not the only downside as map phase can be rather memory intensive. After all, it has to store all intermediate results into provided collectors. Combine phase takes produced intermediate values for a particular key and combines it into a single intermediate value. Therefore, it would very useful to periodically invoke combine on map produced keys/values thus limiting the total amount of memory used for map phase. So the question is how do we execute map/combine in parallel efficiently thus speeding up execution and at the same time limiting the memory used? We found the answer in producer/consumer threading paradigm.

In our case producers are fork/join threads that during map phase iterate key/value data container and invoke Mapper's map function. Map function transformation produces intermediate results stored into the Infinispan provided queue of collectors. Consumers are also fork/join threads that invoke combine function on key/value entries in those collectors. Note that this way map/combine phase execution itself becomes parallel and phases of mapping and combing are no longer serial. In the end, we have notably lowered memory usage and significantly improved overall speed execution of map/combine algorithm at the same time.

Performance lab results

Although initial performance results were more than promising, we were not satisfied. The throughput peaked for 32KB cache values but was much lower for larger and smaller values in our tests. We went back to the drawing board and devised the above described map/combine algorithm using fork/join framework and producer/consumer approach. This time the results from the performance lab were excellent. For more details on performance tests and hardware used please refer to [1].

As you can see on the graphs below we have improved performance for all cache value sizes. We were not able to significantly improve throughput for the largest 1MB and 2MB cache values. For all other cache value sizes we have seen on average five-fold throughput improvement. As throughput improvement is directly proportional to MapReduceTask speed of execution improvement, our users should expect their MapReduceTasks to execute, on average, five times faster in Infinispan 7 than in Infinispan 6.

[2] We back ported relevant classes so users can still run Infinispan 7 on JVM 6