Friday, 25 September 2015

Memory based eviction

Eviction Today

Infinispan since its inception has supported a way to help users control how much memory the in memory cache entries consume in the JVM.  This has always been limited to a number of entries.  In the past users have had to estimate the average amount of bytes their entries used on the heap.  With this average you can easily calculate how many entries could safely be stored in memory without running into issues.  For users who have keys and values that are relatively similar this can work well.  However when the case requires entries that vary in size this can be problematic and you end up calculating the average size based on the worse case.

Enter Memory Based Eviction


Infinispan 8 introduces memory based eviction counting.  That is Infinispan will automatically keep track of how large the key, value and overhead if possible.  It can use these values then to try to limit the number of entries instead to a memory count such as 1 Gigabyte.

Key/Value limitations

Unfortunately this is currently limited to only using keys and values stored as primitives, primitive wrappers (ie. Integer), java.lang.String(s) and any of the previously mentioned stored in an array.  This means this feature cannot be used with any custom classes.  If enough feedback is provided we could provide a SPI to allow the user to plug in their own counter for their own classes, but this is not planned currently.

There are a couple ways to easily get around this.  One is to use storeAsBinary, which will store your keys and/or values as byte arrays for you automatically, satisfying this limitation.  A second way is when you are using the client such as HotRod, in this case the data is stored in the serialized (byte[]) form.  Note that compatibility mode will prevent this from occurring and you are unable to use these configurations together.

Eviction Type limitation

Due to the complexity of LIRS, memory based eviction is only supported with LRU at this time. See the types here.  This could be enhanced at a later point, but is also not planned.

How to enable

You can enable memory based eviction either through programmatic or declarative configuration.  Note that Infinispan added long support (limited to 2^48) for the size value which directly helps memory based eviction if users want to utilize caches larger than 2 GB.

Programmatic


Declarative 


Supported JVMs

This was tested and written specifically for Oracle and OpenJDK JVMs.  In testing these JVMs showed memory accuracy within 1% of desired value. Other JVMs may shown incorrect values.

The algorithm takes into account JVM options, such as compressed pointers and 32 bit JVM vs 64 bit JVM.  Keep in mind this is only for the data container and doesn't take into account additional overhead such as created threads or other runtime objects.

Other JVMs are not handled such as the IBM JVM which was briefly tested and showed incorrect numbers greater than 10% of the desired amount.  Support for other JVMs can be added later as interest is shown for them.

Closing Notes


I hope this feature helps people to better handle their memory constraints while using Infinispan!  Let us know if you have any feedback or concerns.

Cheers!

 - Will

Wednesday, 23 September 2015

Infinispan 8.1.0.Alpha1

Dear all,

release early release often ! The first Alpha release of Infinispan 8.1 is out. As is traditional, it is codenamed after a beer. This time it is "Mahou" !

The highlights for 8.1.0.Alpha1 are:

ISPN-5781 - Upgrade server to WildFly 10.0.0.CR1
ISPN-5742 - Add global persistent state path configuration

Read the complete release notes 

We're working on lots of cool things for 8.1 Final due at the end of November, so be sure to check our roadmap to see what's coming.

Get it, learn how to use it, help us improve it.

Enjoy !


The Infinispan team

Monday, 21 September 2015

Introducing the Infinispan Hadoop Connector

The version 0.1 of the Infinispan Hadoop connector has just been made available!

The connector will host several integrations with Hadoop related projects, and in this first release it supports converting Infinispan server into a Hadoop compliant data source, by providing an implementation of InputFormat and OutputFormat.

The InfinispanInputFormat and InfinispanOutputFormat

 

A Hadoop InputFormat is a specification of how a certain data source can be partitioned and how to read data from each of the partitions. Conversely, OutputFormat is used to write.

Looking closely at the Hadoop's InputFormat interface, we can see two methods:

    List<InputSplit> getSplits(JobContext context);
 
    RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context);


The first method defines essentially a data partitioner, calculating one or more InputSplits that contain information about a certain partition of the data. With possession of a InputSplit, one can use it to obtain a RecordReader to iterate over the data. These two operations allow for parallelization of data processing across multiple nodes, and that's how Hadoop map reduce achieves a high throughput over large datasets.

In Infinispan terms, each partition is a set of segments on a certain server, and a record reader is a remote iterator over those segments. The default partitioner shipped with the connector will create as many partitions as servers in the cluster, and each partition will contain the segments that are associated with that specific server.

Not only map reduce


Although the InfinispanInputFormat and InfinispanOutputformat can be used to run traditional Hadoop map reduce jobs over Infinispan data, it is not coupled to the Hadoop map reduce runtime. It is possible to leverage the connector to integrate Infinispan with other tools that, besides supporting Hadoop I/O interfaces, are able to read and write data more efficiently. One of those tools is Apache Flink, that has a dataflow engine capable of doing batch and stream data processing that supersedes the classic two stage map reduce approach. 

Apache Flink example

 

Apache Flink supports Hadoop's InputFormat as a data source to execute batch jobs, so to integrate with Infinispan it's straightforward:


Please refer to the complete sample that has docker images for both Apache Flink and Infinispan server, and detailed instructions on how to execute and customise job.

Stay tuned


More details about the connector, maven coordinates, configuration options, sources and samples can be found at the project repository

In upcoming versions we expect to have a tighter integration with the Hadoop platform in order to run Infinispan clusters as a YARN application (ISPN-5709), and also support other tools from the ecosystem such as Apache Pig (ISPN-5749)

Wednesday, 16 September 2015

Simple cache

Infinispan local caches have several features that make it more than just a map - expiration and eviction, listeners, statistics, transactions, cache stores and so forth. However, this comes at a price - due to all the hooks and object allocation, plain ConcurrentHashMap is faster than local cache.

There are applications that need something in between - great performance but some of those features, too. In our case, the motivation were internal caches in Hibernate Second Level Cache. Therefore Infinispan 8.0.1.Final brings the simple cache - alternative implementation of the AdvancedCache interface optimized for maximum performance when you need just the basics.

The table below shows which features are available in simple cache:

FeatureAvailability
Basic map-like API
Cache listeners (non-clustered)
Expiration
Eviction
Security
JMX access
Statistics
Transactions
Invocation batching
Persistence (cache stores and loader)
Map Reduce Framework
Distributed Executors Framework
Custom interceptors
Indexing (querying)
Compatibility (embedded/server)
Store as binary


Configuring simple cache is as simple as adding one attribute to the XML configuration:


While configuration schema allows to set up the unsupported features, doing so results in an exception when the cache is created.

You can also configure simple cache programmatically:


So, what kind of performance improvement can you expect? We had run basic (single-threaded) benchmark using JMH and this is what we got:

Implementationget() (operations/s)put() (operations/s)
ConcurrentHashMap128,354,135±2,178,75533,980,088±28,487
Simple cache86,969,897±738,93514,044,642±14,280
Local cache17,280,018±361,9102,267,850±44,814

This gives us about 5✕ speedup for reads and 6✕ for writes. Your mileage may vary, but it's certain that simple cache provides substantial performance benefits.

So, if your use-case allows it, try out simple cache and let us know. It's as simple as one configuration attribute!

Monday, 14 September 2015

Initial Support for Apache Avro and Gora

Avro and Gora are two Apache projects that belong to the Hadoop ecosystem. Avro is a data serialization framework that relies on JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use in Hadoop is to provide a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services. Gora is an open-source software framework that provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key/value stores or databases, and analyzing the data with extensive Apache Hadoop MapReduce support.

As an effort to run Hadoop based applications atop Infinispan, the LEADS EU FP7 project has developed an Avro backend (infinispan-avro) and a Gora module (gora-infinispan). The former allows to store, retrieve and query Avro defined types via the HotRod protocol. The latter allows Gora-based applications to use Infinispan as a storage backend for their MapReduce jobs. In the current state of the implementation, the two modules make use of Infinispan 8.0.0.Final, Avro 1.7.6 and Gora 0.6

What’s in it for you Infinispan user

There are several use cases for which you can benefit from those modules.
  • With Infinispan’s Avro support, you can decide to persist your data in Infinispan using Avro’s portable format instead of Infinispan’s own format (or Java serialization’s format). This might help you standardize upon a common format for your data at rest. 
  • If you use Apache Gora to store/query some of your data in, or even out, of the Hadoop ecosystem, you can use Infinispan as the backend and benefit Infinispan’s features that you come to know like data distribution, partition handling, cross-site clustering. 
  • The last use case is to run legacy Hadoop applications, using Infinispan as the primary storage. For instance, it is possible to run the Apache Nutch web crawler atop Infinispan. A recent paper at IEEE Cloud 2015 gives a detailed description of such an approach in a geo-distributed environment (a preprint is available here). 



New Redis Cache Store Introduced in Infinispan 8

A new cache store for storage of cache data within the Redis key/value server has been introduced with Infinispan 8. This allows all storage of cache data to be stored in a centralised Redis deployment which all Infinispan clients access.

The cache store supports 3 Redis deployment topologies. They are, single server, Sentinel and cluster (Redis v3 required). Redis versions 2.8+ and 3.0+ are currently supported.

Data expiration and purging is handled via Redis itself, reducing workload from Infinispan servers to manually delete cache entries.

Topologies

Single Server

In a single server deployment, the cache store is given the location of a Redis master server with which it connects to directly to handle all data storage. Using this topology, Redis has no fault tolerance unless a custom solution is built on top of Redis. To declare a single server local cache store:


<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd
                          urn:infinispan:config:store:redis:8.0 http://www.infinispan.org/schemas/infinispan-cachestore-redis-config-8.0.xsd"
    xmlns="urn:infinispan:config:8.0"
    xmlns:redis="urn:infinispan:config:store:redis:8.0" >

    <cache-container>
        <local-cache>
            <persistence passivation="false">
                <redis-store xmlns="urn:infinispan:config:store:redis:8.0"
                    topology="server" socket-timeout="10000" connection-timeout="10000">
                    <redis-server host="server1" />
                    <connection-pool min-idle="6" max-idle="10" max-total="20" min-evictable-idle-time="30000" time-between-eviction-runs="30000" />
                </redis-store>
            </persistence>
        </local-cache>
    </cache-container>
</infinispan>


Note the topology attribute is declared as server. This is needed to ensure a single server Redis topology is applied by the cache store. Only a single Redis server need be declared (only the first server will be used if multiple servers are declared) and the port will default to the Redis port 6379, but can be overridden using the port attribute. All connections are handled via a connection pool, which can optionally also test the validity of a connection on creation, lease, return from and when idling in the connection the pool.

Sentinel

The Sentinel topology relies on Redis Sentinel servers to connect to a Redis master server. Here, Infinispan connects to Redis Sentinel servers, requesting a master server name, then gets forwarded on to the correct location of the Redis master server. This topology gives resilience via Redis Sentinel, providing failure detection and automatic failover of Redis servers.


<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd
                          urn:infinispan:config:store:redis:8.0 http://www.infinispan.org/schemas/infinispan-cachestore-redis-config-8.0.xsd"
    xmlns="urn:infinispan:config:8.0"
    xmlns:redis="urn:infinispan:config:store:redis:8.0" >

    <cache-container>
        <local-cache>
            <persistence passivation="false">
                <redis-store xmlns="urn:infinispan:config:store:redis:8.0"
                    topology="sentinel" master-name="mymaster" socket-timeout="10000" connection-timeout="10000">
                    <sentinel-server host="server1" />
                    <sentinel-server host="server2" />
                    <sentinel-server host="server3" />
                    <connection-pool min-idle="6" max-idle="10" max-total="20" min-evictable-idle-time="30000" time-between-eviction-runs="30000" />
                </redis-store>
            </persistence>
        </local-cache>
    </cache-container>
</infinispan>


For a Sentinel deployment, the topology attribute changes to sentinel. A master name must also be specified to select the correct Redis master required as Sentinel can monitor multiple Redis master servers. The Sentinel server is declared using a sentinel-server XML tag, which you’ll notice is different to the main Redis servers used in single server and cluster topologies. This is to allow defaulting of the Sentinel port to 26379 if not declared. At least one Sentinel server must be declared, though if you run more Sentinel servers, they should all be declared too for the benefit of failure detection of the Sentinel servers themselves.

Cluster

A cluster topology gives Infinispan the ability to connect to a Redis cluster. One or more cluster nodes are declared to infinispan (the more the better) which are then used to store all data. Redis cluster supports failure detection so if a master node in the cluster fails, a slave takes over. Redis v3 is required to run a Redis cluster.


<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd
                          urn:infinispan:config:store:redis:8.0 http://www.infinispan.org/schemas/infinispan-cachestore-redis-config-8.0.xsd"
    xmlns="urn:infinispan:config:8.0"
    xmlns:redis="urn:infinispan:config:store:redis:8.0" >

    <cache-container>
        <local-cache>
            <persistence passivation="false">
                <redis-store xmlns="urn:infinispan:config:store:redis:8.0"
                    topology="cluster" socket-timeout="10000" connection-timeout="10000">
                    <redis-server host="server1" port="6379" />
                    <redis-server host="server2" port="6379" />
                    <redis-server host="server3" port="6379" />
                    <connection-pool min-idle="6" max-idle="10" max-total="20" min-evictable-idle-time="30000" time-between-eviction-runs="30000" />
                </redis-store>
            </persistence>
        </local-cache>
    </cache-container>
</infinispan>


For cluster deployments, the topology attribute must change to cluster. One or more Redis cluster nodes must be declared to access the cluster which uses the redis-server XML tag. Note that when operating a cluster, database IDs are not supported.

Multiple Cache Stores, Single Redis Deployment

Redis single server and Sentinel deployments support the option of database IDs. A database ID allows a single Redis server to host multiple individual databases, referenced via an integer ID number. This allows Infinispan to support multiple cache stores on the same Redis deployment, isolating the data between the stores. Redis cluster does not support the database ID. A database ID is defined using the database attribute on the redis-store XML tag.


<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd
                          urn:infinispan:config:store:redis:8.0 http://www.infinispan.org/schemas/infinispan-cachestore-redis-config-8.0.xsd"
    xmlns="urn:infinispan:config:8.0"
    xmlns:redis="urn:infinispan:config:store:redis:8.0" >

    <cache-container>
        <local-cache>
            <persistence passivation="false">
                <redis-store xmlns="urn:infinispan:config:store:redis:8.0"
                    topology="sentinel" master-name="mymaster" socket-timeout="10000" connection-timeout="10000" database="5">
                    <sentinel-server host="server1" />
                    <sentinel-server host="server2" />
                    <sentinel-server host="server3" />
                    <connection-pool min-idle="6" max-idle="10" max-total="20" min-evictable-idle-time="30000" time-between-eviction-runs="30000" />
                </redis-store>
            </persistence>
        </local-cache>
    </cache-container>
</infinispan>

Redis Password Authentication

In order to secure access to a Redis server, a password can optionally be used in Redis. This then requires the cache store to declare the password when connecting. The password is added via a password attribute on the redis-store XML tag.


<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd
                          urn:infinispan:config:store:redis:8.0 http://www.infinispan.org/schemas/infinispan-cachestore-redis-config-8.0.xsd"
    xmlns="urn:infinispan:config:8.0"
    xmlns:redis="urn:infinispan:config:store:redis:8.0" >

    <cache-container>
        <local-cache>
            <persistence passivation="false">
                <redis-store xmlns="urn:infinispan:config:store:redis:8.0"
                    topology="sentinel" master-name="mymaster" socket-timeout="10000" connection-timeout="10000" password="mysecret">
                    <sentinel-server host="server1" />
                    <sentinel-server host="server2" />
                    <sentinel-server host="server3" />
                    <connection-pool min-idle="6" max-idle="10" max-total="20" min-evictable-idle-time="30000" time-between-eviction-runs="30000" />
                </redis-store>
            </persistence>
        </local-cache>
    </cache-container>
</infinispan>

What about SSL support?

Redis does not provide protocol encryption, instead leaving this to other specialist software. At this time, the Redis client used to integrate Infinispan with Redis servers (Jedis) does not yet support SSL connection negotiation natively.

Thursday, 10 September 2015

Infinispan 8.0.1.Final (and 7.2.5.Final)

Dear all,

we've just cooked two new point releases of Infinispan to address a number of issues.

The highlights for 8.0.1.Final are:
 
  • ISPN-5717 Notify continuous query also when entry expires
  • ISPN-5591 Simple local cache without interceptor stack. This is an extremely fast cache with very few features (no transactions, no indexing, no persistence, etc). Its primary intendend usage is as a 2nd-level cache for Hibernate, but we're sure you can find lot's of other applications for it, provided you don't require all the bells and whistles that come with our fully-fledged caches.
  • Bump Hibernate Search to 5.5.0.CR1 and Lucene to 5.3.0
  • A number of query fixes, including indexing and searching of null non-string properties, aggregation expressions in orderBy, filter with both 'where' and 'having' in the same query
  • ISPN-5731 Cannot use aggregation expression in orderBy
  • Read the complete release notes

The highlights for 7.2.5.Final are:
  • ISPN-5607 Preemptively invalidate near cache after writes
  • ISPN-5670 Hot Rod server sets -1 for lifespan or maxIdle as default
  • ISPN-5677 RemoteCache async methods use flags
  • ISPN-5684 Make getAll work with compatibility mode in DIST
  • Read the complete release notes

Get it, learn how to use it, help us improve it.

Enjoy !


The Infinispan team

Tuesday, 8 September 2015

Functional Map API: Working with multiple entries

We continue with the blog series on the experimental Functional Map API which was released as part of Infinispan 8.0.0.Final. In this blog post we'll be focusing on how to work with multiple entries at the same time. For reference, here are the previous entries in the series:
  1. Functional Map Introduction
  2. Working with single entries
The approach taken by the Functional Map API when working with multiple keys is to provide a lazy, pull-style API. All multi-key operations take a collection parameter which indicates the keys to work with (and sometimes contain value information too), and a function to execute for each key/value pair. Each function's ability depends on the entry view received as function parameter, which changes depending on the underlying map: ReadEntryView for ReadOnlyMap, WriteEntryView for WriteOnlyMap, or ReadWriteView for ReadWriteMap. The return type for all multi-key operations, except the ones from WriteOnlyMap, return an instance of Traversable which exposes methods for working with the returned data from each function execution. Let's see an example:


This example demonstrates some of the key aspects of working with multiple entries using the Functional Map API:
  • As explained in the previous blog post, all data-handling methods (including multi-key methods) for WriteOnlyMap return CompletableFuture<Void>, because there's nothing the function can provide that could not be computed in advance or outside the function.
  • Normally, the order of the Traversable matches the order of the input collection though this is not currently guaranteed.
There is a special type of multi-key operations which work on all keys/entries stored in Infinispan. The behaviour is very similar to the multi-key operations shown above, with the exception that they do not take a collection of keys (and/or values) as parameters:


There's a few interesting things to note about working with all entries using the Functional Map API:
  • When working with all entries, the order of the Traversable is not guaranteed.
  • Read-only's keys() and entries() offer the possibility to traverse all keys and entries present in the cache. When traversing entries, both keys and values including metadata are available. Contrary to Java's ConcurrentMap, there's no possibility to navigate only the values (and metadata) since there's little to be gained from such method and once a key's entry has been retrieved, there's no extra cost to provide the key as well.
It's worth noting that when we sat down to think about how to work with multiple entries, we considered having a push-style API where the user would receive callbacks pushed as the entries to work with were located. This is the approach that reactive APIs such as Rx follow, but we decided against using such APIs at this level for several reasons:
  1. We have huge interest in providing a Rx-style API for Infinispan, but we didn't want the core API to have a dependency on Rx or Reactive Streams
  2. We didn't want to reimplement a push-style async API since this is not trivial to do and requires careful thinking, specially around back-pressure and flow control.
  3. Push-style APIs require more work on the user side compared to pull-style APIs.
  4. Pull-style APIs can still be lazy and partly asynchronous since the user can decide to work with the Traversable at a later stage, and the separation between intermediate and terminating operations provides a good abstraction to avoid unnecessary computation.
In fact, it is this desire to keep a clear separation between intermediate and terminating operations at Traversable that has resulted in having no manual way to iterate over the Traversable. In other words, there is no iterator() nor spliterator() methods in Traversable since these are often associated with manual, user-end iteration, and we want to avoid such thing since in the majority of cases, Infinispan knows best how to exactly iterate over the data.

In the next blog post, we'll be looking at how to work with listeners using the Functional Map API.

Cheers,
Galder

Monday, 7 September 2015

Distributed Streams

Now that Infinispan supports Java 8, we can take full advantage of some of the new features.  One of the big features of Java 8 is the new Stream classes.  This flips the head on processing data so that instead of having to iterate upon the data yourself the underlying Stream handles that and you just provide the operations to perform on it.  This lends itself great to distributed processing as the iteration is handled entirely by the implementation (in this case Infinispan).

I therefore am glad to introduce for Infinispan 8, the feature Distributed Streams!  This allows for any operation you can perform on a regular Stream to also be performed on a Distributed cache (assuming the operation and data is marshallable).

Marshallability

When using a distributed or replicated cache, the keys and values of the cache must be marshallable.  This is the same case for intermediate and terminal operations when using the distributed streams.  Normally you would have to provide an instance of some new class that is either Serializable or has an Externalizer registered for it as described in the marshallable section of the user guide.

However, Java 8 also introduced lambdas, which can be defined as serializable very easily (although it is a bit awkward).  An example of this serialization can be found here.

Some of you may also be aware of the Collectors class which is used with the collect method on a stream.  Unfortunately, all of the Collectors produced are not able to be marshalled.  As such, Infinispan has added a utility class that can work in conjunction with the Collectors class.  This allows you to still use any combination of the Collectors classes and still work properly when everything is required to be marshalled.

Parallelism

Java 8 streams naturally have a sense of parallelism.  That is that the stream can be marked as being parallel.  This in turn allows for the operations to be performed in parallel using multiple threads.  The best part is how simple it is to do.  The stream can be made parallel when first retrieving it by invoking parallelStream or you can optionally enable it after the Stream is retrieved by just invoking parallel.

The new Distributed streams from Infinispan take this one step further, which I am calling parallel distribution.  That is that since data is already partitioned across nodes we can also allow operations to be ran simultaneously on different nodes at the same time.  This option is enabled by default.  However this can be controlled by using the new CacheStream interface discussed just below.  Also, to be clear, the Java 8 parallel can be used in conjunction with parallel distribution.  This just means you will have concurrent operations running on multiple nodes across multiple threads on each node.

CacheStream interface

There is a new interface Cachestream provided that allows for controlling additional options when using a Distributed Stream.  I am highlighting the added methods (note comments have been removed from gist)


distributedBatchSize

This method controls how many elements are brought back at one time for operations that are key aware.  These operations are (spl)iterator and forEach.  This is useful to tweak how many keys are held in memory from a remote node.  Thus it is a tradeoff of performance (more keys) versus memory.  This defaults to the chunk size as configured by state transfer.

parallelDistribution / sequentialDistribution

This was discussed in the parallelism section above.  Note that all commands have this enabled by default except for spl(iterator) methods.

filterKeys

This method can be used to have the distributed stream only operate on a given set of keys.  This is done in a very efficient way as it will only perform the operation on node(s) that own the given keys.  Using a given set of keys also allows for constant access time from the data container/store as the cache doesn't have to look at every single entry in the cache.

filterKeySegments (advanced users only)

This is useful to do filtering of instances in a more performant way.  Normally, you could use the filter intermediate operation, but this method is performed before any of the operations are performed to most efficiently limit the entries that are presented for stream processing.  For example, if only a subset of segments are required, it may not have to send a remote request.

segmentCompletionListener (advanced users only)

Similar to the previous method, this is related to key segments.  This listener allows for the end user to be notified when a segment has been completed for processing.  This can be useful if you want to keep track of completion and if this node goes down, you can rerun the processing with only the unprocessed segments.  Currently, this listener is only supported for spl(iterator) methods.

disableRehashAware (advanced users only)

By default, all stream operations are what is called rehash aware.  That is if a node joins or leaves the cluster while the operation is in progress the cluster will be aware of this and ensure that all data is processed properly with no loss (assuming no data was actually lost).

This can be disabled by calling disableRehashAware; however, if a rehash is to occur in the middle of the operation, it is possible that all data may not be processed.  It should be noted that data is not processed multiple times with this disabled, only a loss of data can occur.

This option is not normally recommended unless you have a situation where you can afford to only operate on a subset of data.  The tradeoff is that the operation can perform faster, especially (spl)iterator and forEach methods.


Map/Reduce

The age old example of map/reduce is always word count.  Streams allow you to do that as well!  Here is an equivalent word count example assuming you have a Cache containing String keys and values and you want the count of all words in the values.  Some of you may be wondering how this  relates to our existing map/reduce framework.  The plan is to deprecate the existing Map/Reduce and replace it completely with the new distributed streams at a later point.


Remember though that distributed streams can do so much more than just map/reduce. And there are a lot of examples already out there for streams. To use the distributed streams, you just need to make sure your operations are marshallable, and you are good to go.

Here are a few pages with examples of how to use streams straight from Oracle:

http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html

I hope you enjoy Distributed Streams.  We hope they change how you interact with your data in the cluster!

Let us know what you think, any issues or usages you would love to share!

Cheers,

Will

Wednesday, 2 September 2015

Functional Map API: Working with single entries

In this blog post we'll continue with the introduction of the experimental Functional Map API, which was released as part of Infinispan 8.0.0.Final, focusing on how to manipulate data using single-key operations.

As mentioned in the Functional Map API introduction, there are three types of operations that can be executed against a functional map: read-only operations (executed via ReadOnlyMap), write-only operations (executed via WriteOnlyMap), and read-write operations (executed via ReadWriteMap) and .

Firstly, we need construct instances of ReadOnlyMap, WriteOnlyMap and ReadWriteMap to be able to work with them:

Next, let's see all three types of operations in action, chaining them to store a single key/value pair along with some metadata, then read it and finally delete a returning the previously stored data:

This example demonstrates some of the key aspects of working with single entries using the Functional Map API:
  • Single entry methods are asynchronous returning CompletableFuture instances which provide methods to compose and chain operations so that it can feel is they're being executed sequentially. Unfortunately Java does not have Haskell's do notation or Scala's for comprehensions to make it more palatable, but it's great news that Java finally offers mechanisms to work with CompletableFutures in a non-blocking way, even if they're a bit more verbose than what's proposed in other languages.
  • All data-handling methods for WriteOnlyMap return CompletableFuture<Void>, meaning that the user can find out when the operation has completed but nothing else, because there's nothing the function can provide that could not be computed in advance or outside the function.
  • The return type for most of the data handling methods in ReadOnlyMap (and ReadWriteMap) are quite flexible. So, a function can decide to return value information, or metadata, or for convenience, it can also return the ReadEntryView it receives as parameter. This can be useful for users wanting to return both value and metadata parameter information.
  • The read-write operation demonstrated above showed how to remove an entry and return the previously associated value. In this particular case, we know there's a value associated with the entry and hence we called ReadEntryView.get() directly, but if we were not sure if the value is present or not, ReadEntryView.find() should be called and return the Optional instance instead.
  • In the example, Lifespan metadata parameter is constructed using the new Java Time API available in Java 8, but it could have been done equally with java.util.concurrent.TimeUnit as long as the conversion was done to number of milliseconds during which the entry should be accessible.
  • Lifespan-based expiration works just as it does with other Infinispan APIs, so you can easily modify the example to lower the lifespan, wait for duration to pass and then verify that the value is not present any more.
If storing a constant value, WriteOnlyMap.eval(K, Consumer) could be used instead of WriteOnlyMap.eval(K, V, Consumer), making the code clearer, but if the value is variable, WriteOnlyMap.eval(K, V, Consumer) should be used to avoid, as much as possible, functions capturing external variables. Clearly, operations exposed by functional map can't cover all scenarios and there might be situations where external variables are captured by functions, but these should in general, should be a minority. Here is as example showing how to implement ConcurrentMap.replace(K, V, V) where external variable capturing is required:


The reason we didn't add a WriteOnly.eval(K, V, V, Consumer) to the API is because value-equality-based replace comparisons are just one type of replace operations that could be executed. In other cases, metadata parameter based comparison might be more suitable, e.g. Hot Rod replace operation where version (a type of metadata parameter) equality is the deciding factor to determine whether the replace should happen or not.

In the next blog post, we'll be looking at how to work with multiple entries using the Functional Map API.

Cheers,
Galder