Monday, October 31, 2011

Data Grid Pattern - Time series index for managing versioned data

Many critical application are using append only approach for dealing with transactional data. In other words, they never update data records, but instead insert new records with greater timestamp (or sequence number, or any other kind of version, they are using to find latest record). Common challenge for such data model, is how to retrieve an appropriate version of record (e.g. latest version, or version at certain moment in time). A simple query for latest version for key 'A' would translate into query as complex as
select * from versions where series='A' and version = (select max(version) from versions where key='A')
This query is already too complex for data grid (and even RDBMS would not be too happy).
Accidently Ben Stopford has recently published a great article about this problem, outlining a lot of important aspects. I do not want to repeat him here, I suggest you read his article now, then continue with mine, which complements his two approaches this third one using custom index in Coherence.

Using custom index for accessing versioned data

In this  approach, each version is stored as a separate entry in cache.
Entry key is composite key, including logical key (series key) and some additional field to make version key unique (e.g. transaction ID, sequence number etc). Value contains actual business data (payload) and timestamp, we are using in our queries (technically timestamp could be part of composite key).
Series key should be an affinity key also - all versions related to same series should be physically on one Coherence node (or affinity key can be a part of series key, this will also satisfy this requirement).
Normally, if you want to find certain version by series key and timestamp you have to do aggregation of all versions for this series. In two approaches mentioned by Ben Stopford, latest version is separated from all other versions (using separate cache - approach 1, using marker - approach 2). It solves problem of finding latest version, but doesn't help if we need to find version for certain moment at time.

Time series index structure

Normal Coherence indexes cannot help us much, due to complexity of query, but it is possible to create custom index, tailored specifically for this task.
Time series index is similar to traditional inverted index, but instead of storing set of entry references, it is storing a nested index, indexing  only versions belonging to certain series by timestamp. Using this index structure you could find latest version or version for certain moment in time, without any aggregation, just by index lookup.
This index goes beyond standard index Coherence API, so it requires a complementary implementation of custom filter.

PROs and CONs

Below and PROs and CONs of this approach, compared to approaches from Ben's article.

PRO

  • Inserting new version doesn't require modifications of any other versions. In particular, you do not need to use hack, directly accessing to backing map, and you do not create extra replication traffic.
  • Time series index works efficiently for any point in time, not only latest versions.
  • It can be used with any kind of caches (even with continuous queries).

CON

  • Through custom index usage is straightforward, troubleshooting could be very tricky unless you understand index mechanics very well.

Source code

Time series index implementation is available at GridKit project.

Thursday, October 27, 2011

Java, How to throw undeclared checked exception

Usually when you have caught checked exception (e.g. IOException or Interrupted) you have to wrap it in RuntimeException to rethrow it without declaring exception in method signature. But with help of generic you can now easily break java rules and throw say IOException exception from a method which has no throws declaration.

Code below will allow to throw anything anywhere.
public class AnyThrow {

    public static void throwUncheked(Throwable e) {
        AnyThrow.<RuntimeException>throwAny(e);
    }
   
    @SuppressWarnings("unchecked")
    private static <E extends Throwable> void throwAny(Throwable e) throws E {
        throw (E)e;
    }
}

I would say, it is very sad outcome from Java implementation of generics. While benefits from checked exception are arguable, they have provided some rules you were able rely upon.  But not any more, thanks to generics.
But it is that it is :)




Tuesday, October 25, 2011

Data Grid Pattern - Proactive caching

Classic and most widely used approach for caching is read through pattern. Look up in cache, then try to load from primary data source if entry is missing in cache - that is how it works. This pattern is easy to implement but it has few unpleasant limitations:
  • caching may reduce average response time, but maximum response time is still bound to back end data source response time,
  • cache may have stale data, expiry policy may relive this problem to some extent, but aggressive expiry is drastically reducing performance gain from caching.

All in memory pattern

Caching concept is close relative to memory hierarchy principle. Memory hierarchy is one of cornerstones of Von Neumann architecture relies on fact that we have different kinds (in terms of capacity and performance) of memory in system.  With modern hardware dynamic memory capacity is often large enough to keep whole dataset. All-in-memory is term used to describe caching architecture there you have 100% of your data in cache at all times. Having all data in cache allow you to guaranty that no request will have to hit slow backend data source and thus provide more aggressive SLA for max response time. It also may be required for workloads with highly random access to data, there traditional assumptions like 80/20 are not working.
While all-in-memory approach is definitely win in terms of performance, its implementation has few serious challenges:
  •  cache should have enough capacity to hold 100% of our data,
  • cache should be fault tolerant (losing a portion of data in cache will render it defunct until missing data would be reloaded),
  • preloading procedure is often non-trivial due to scale of data set,
  • cache should be kept in sync with backend data source.
Capacity and fault tolerance are provided by modern distributed caches out of box, but preloading procedures and cache update strategy are very application specific and fairly challenging to implement.
Below are few practical approaches for keeping cache in synch with primary data source.

Refresh ahead

This is approach similar to expiry policy, but instead of invalidating data, cache proactively refreshing them from master source. Refresh ahead pattern is quite simple, but not very practical though. If cached data set is large (and we are talking about all-in-memory pattern) automatic refreshing is like to overwhelm backend with requests. And even if data set is reasonably small we still have to use fairly long expiry time to make it practical.
So if you looking for all-in-memory cache, refresh ahead are unlikely to help you.

Proactive caching

In contrast with traditional (I would say reactive) caching, with proactive caching pattern you insert/update value in cache as soon as it is updated in backend data source, not at the moment data was requested from cache.
Proactive caching is not necessary should be used with all-in-memory pattern, but combination of these two is very powerful.

Polling updates from DB

An evolutionary step from refresh ahead to proactive caching, would be polling changes for database. Some daemon component should periodically (and frequently) poll database and fetch changes since last cache update. Sounds simple but you have to come out with a way how to "fetch changes since last cache update".  Usually some kind of timestamp is used - each record in table to be cached has a kind of last modified field. Another few challenges:
·         what if cache is representing a query result, not just a single table,
·         'last modified' field should be indexed, otherwise frequent polls will bring database to its knees,
·         polling daemon should be fault tolerant,
·         cache timestamp should be stored somewhere on cache side.
So, implementing this approach will require some amount of work (on both sides, database and cache), but in the end you will have very robust solution.
Only sever limitation of this approach is lag between changes in database and cache, which is no less than poll period.

Long poll

If database has some means for wait/notification in its query language, you can use long poll pattern. 
Using long poll will reduce load on your database server and probably reduce lag between cache and database. Disadvantages of this approach: more code on database side and need to use dedicated thread(s) for polling (because thread will be blocked waiting notification of database side for most time).
In Oracle database long poll could be implemented using DBMS_ALERT package. Though use of DBMS_ALERT may cause serialization of update transaction and harms database performance.

Database notifications

Some databases can push data change notifications directly to clients, without need for polling. E.g. Oracle database has DCN (data change notification) mechanism. Using DCN you can register callbacks which would be notified that certain data have been changed in database. Notification mechanism has few advantages over polling
  • less load of database while no data is actually changing,
  • smaller lag between changing data in database are reaction in cache.
Notifications approach have disadvantages also
  • API usually more complicated, you have to learn more quirks to make it work,
  • connection hang problems - application is listening to event on connection which is defunct for some reason,
  • notifications may be lost in transition for some reason.
One particular problem with using Oracle DCN in java, was leaking of subscriptions. DCN subscription is remaining active on database side after termination java process (unless it was deregistered explicitly) and eventually you are going to hit limit for active subscriptions. Of cause you should deregister subscription before terminating of client process, but you cannot always guaranty graceful shutdown in practice.

Hooking into database replication

All mature databases have replication feature and thus replication wire protocol. Sometimes, they also provide API to hook into replication channel and programmatically receive all updates (essentially change notifications).
Replication is implemented differently in various databases (or even with different replication solution for same database). But general idea is to make cache act as replication slave for its source database.
Compared to polling or data change notifications, using of replication usually requires more effort from DBA side (they should setup replication slave of master database). It may also cost you some in licensing fees dependent on your replication solution.
MySQL slave protocol does not require any setup on master, so replication links can be created ad hoc. But MySQL has another catch, you should setup row based replication on master database unless you want to parse and execute SQL statements in your cache.

Single cache in front of multiple sources

In large scale system, primary data source may be distrusted itself  (e.g. using sharded database). Having single read through cache may be a problem in this case (doing read through, you have to know which shard to consult about data missing in cache), but with proactive caching such setup it much straightforward. While in read through caching, cache responsibilities of serving requests and acquiring data are coupled. With proactive caching, responsibilities of storing data/serving read requests and feeding cache with data updates may be separated. This way, you can have single cache instance and multiple other components pushing data into it (e.g. each sharing could push its data in single cache).
In this role cache can be though as a kind "materialized view" based up on data in primary (potentially distributed) data source.

Few more links

Using Database Change Notification (DCN) with a Coherence Cache

Thursday, October 20, 2011

CMS heap fragmentation. Follow up 1

I have recently published an article Java GC, HotSpot's CMS and heap fragmentation. Post has lead to a number of interesting discussions and here I want to present some hard numbers from my experiments to back up these discussions.

Experiment details

Application

Test application is a synthetic application. It is allocating byte arrays of random size and eventually releasing them (with fairly complex pattern). Though application is rather simple, it shows heap space fragmentation symptoms  similar to symptoms of real application. Application has 2 threads which are consuming 100% of CPU core time each. Object allocation sequence is deterministic, so I use number of young collections as time axis on diagrams.

JVM settings

64bit HotSpot JVM was used in experiments. Base line configuration is below
-server
-Xmx1700m -Xms1700m -Xmn300m
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseConcMarkSweepGC
-XX:-UseCompressedOops
-XX:+PrintPromotionFailure
-XX:CMSInitiatingOccupancyFraction=30
-XX:+UseCMSInitiatingOccupancyOnly
-XX:ParallelGCThreads=2
-XX:ConcGCThreads=1
-XX:PrintFLSStatistics=1
JVM process was limited to 2 cores (using taskset) to simulate high CPU load (real application is consuming all cores at pick loads, but for test application I would like to stick with single thread to keep its behavior deterministic and  reproducible).
Live data set of application is about 1GiB (which leaves about 400MiB in old space as head room for CMS collector). These numbers are scaled down from real application setup.
For test runs with reduced old PLAB size, following options were added to JVM
-XX:OldPLABSize=16
-XX:-ResizeOldPLAB

Diagrams

Diagrams are showing max chunk size and total amount of free memory in old space on Y axis (in heap words, heap word = 8 bytes). X axis a showing number of young collections since start of JVM.

Fragmentation in different HotSpot revisions

 

As you can see something is really broken in HotSpot 6u24. Upgrading to version of 6u25 or above should be a good idea. See CR-6999988 for more details.
Anyway, in both revisions, reducing old PLAB size is considerably reducing fragmentation.

Adding more memory

Let's see how adding more memory to JVM will affect fragmentation. In next experiment heap size is increased by 200 MiB without increasing of live data set.

As you can see increasing CMS headroom drastically decreases fragmentation (though max available chunk size continues to decrease over time).

Adding more CPU cores

As I said before test application consumes all available cores and concurrent GC thread have to compete for CPU cycles with application. In this test, I have increased number of cores available to JVM process to three. No other parameters of experiment have been changed.

Results are little surprising. Even though average amount of free memory in lower for 3 core test vs. 2 core test , fragmentation issue goes away!
In other words, if your application is starving on CPU in may lead to increased fragmentation of CMS space and eventual Full GC freeze, even though amount of free memory reported in GC logs and via JMX does not show any indication of problem.
I really wish CMS free list space statistics would be exposed via JMX. Now I have to reconsider my baseline JVM health monitoring approach for production environments :)

Conclusions

  • HotSpot prior to 6u25 have severe issue leading to increased fragmentation.
  • Increasing CMS headroom (lag between live data set and old space size) is drastically reducing fragmentation.
  • CPU shortage may leading to fragmentation of memory, this problem cannot be detected by normal monitoring (only by parsing GC logs with options above).
  • Limiting OldPLABSize shows less fragmentation compared with default PLAB settings in all experiments (though it has negative performance impact).
Though synthetic CMS fragmentation tests have produced some interesting data it is still just a synthetic test. Real systems are much less predictable and not so easy to test unfortunately.
So, my main advice to everyone having to deal with GC issues - use thoughtful monitoring in your production environment.

Wednesday, October 12, 2011

Java GC, HotSpot's CMS and heap fragmentation

Concurrent Mark Sweep (CMS) is one of garbage collection algorithms implemented in HotSpot JVM. CMS is designed to be mostly concurrent, requiring just two quick stop-the-world pauses per old space garbage collection cycle. But if CMS cannot keep up with allocation of memory by application, it may fallback to stop-the-world whole heap collection - infamous Full GC pause. Full GC is very noticeable application pause, on large heaps it can take several dozens of seconds to collect and compact whole heap. 

There are two different failure scenarios for CMS collection (both of them are leading to longer than usual stop-the-world pauses):
  • concurrent mode failure,
  • promotion failure.

Concurrent mode failure

At beginning  of each young GC, collector should ensure that there is enough free memory in old space to promote aged objects from young space. Modern CMS collector estimates size of objects to be promoted using statistics from previous collections. If old space does not have enough free bytes to hold estimated promotion amount, concurrent mode failure will be raise. Concurrent mode failure doesn't necessary lead to Full GC, in certain cases JVM will just wait for concurrent collection cycle to finish, but application will remain in STW pause until young collection will be finished.

Most frequent reason for concurrent mode failure is late initiation of CMS cycle. JVM tries to estimate amount of garbage in heap and duration of CMS cycle and start it as late as possible to avoid wasting of CPU. Unfortunately this estimation may be too optimistic. You can advise JVM to start CMS earlier using following flags:
-XX:CMSInitiatingOccupancyFraction=30
-XX:+UseCMSInitiatingOccupancyOnly
Setting above will force CMS cycle is more than 30% of old space is use. Second option disables JVM heuristics, without second parameter JVM may not obey CMS initiating occupancy fraction setting.

Normally for server type applications you would like CMS to be running continuously. If you are experiencing concurrent mode failure, even though next CMS cycle is starting right after previous, it means that CMS throughput is just not enough. In this case you should increase size of old generation and give CMS collector more head room to do its job. Alternatively you may try to dedicate more CPU cores for concurrent collector, but CPU is usually even more limited resource on modern servers than memory.

In summary, there are two reasons for concurrent mode failure STW pause mentioned above, both of them can be remedied fairly easily with JVM options.

Promotion failure

Promotion failure is more complicated situation. CMS collector is not compacting free memory in old space, instead it have to deal with fragmented free space (a set of free memory chunks). It is possible, that all free bytes are scattered though small chunks, and it is impossible to find certain amount of continuous memory to promote particular large object, even though total number of free bytes is large enough.

Heap fragmentation is well known problem, and there are few effective techniques reducing fragmentation.

CMS memory manager is using separate free lists for different size of chunks. Using these free lists, it can effectively fill small holes in fragmented memory space with objects of exact size. This technique is known to be fairly effective (and widely used in C/C++ memory managers). But, surprisingly, it doesn't seems to work well for JVM in many real live situations.

What is wrong with a way JVM allocating memory in old space?

PLABs

Update: According to comment below, I'm be very wrong here. Please read comment from Ramki and give me some time to grok this staff and rewrite section.
Also take a look at next articale which have some numbers to back me up
CMS heap fragmentation. Follow up 1

Update 2: I have written another article specifically explaining PLABs in CMS. See Java GC, HotSpot's CMS promotion buffers

Section below describes old PLAB logic, which is still present in code base, but not used any more.
Let me make a step aside and explain some details of JVM memory management. You may know that JVM is using TLABs (thread local allocation block) to avoid concurrent allocation from single memory source over multiple threads. Same idea is also used for parallel young space collector. Collecting threads are copying live objects (young collection is a copy collection) and they need to allocate memory for objects being copied. To avoid CPU cache contention (due to synchronization of caches between CPUs) each thread have two, one for young space and one for old space, PLABs (parallel or promotion local allocation buffer) of its own. Using PLAB in young space is totally make sense, free memory is always continuous there, but using PLAB for old space in practice makes a huge contribution to fragmentation.

Let me illustrate this by example.
Each collector thread is allocating chunk of continuous memory in old space. Then, during collection, these chunks are filled with objects of different size (large objects will be allocated directly in old space, but small objects will be placed in PLAB).
Once young collection is finished, our PLABs are full with objects. Then, after some time, CMS cycle takes place and sweeper mark few dead objects as free space, creating small free memory chunks.
But on next young collection, instead of reusing small chunks of memory, each GC thread will allocate new large continuous block and will fill it with small objects.
Effectively, due to using PLABs, free lists are not used for small objects and small chunks will never be reused (until JVM could coalesce them into larger chunks). In practice, in our applications, we have a lot of small objects which can live long enough to die in old space. Strings and hash map entries are just to name few such cases.

Ironically, while CMS has sophisticated free list machinery, it makes it inefficient by using PLABs.

Important: There is bug in JDK prior 6u25, which makes fragmentation even worse. See CR-6999988 for more details.

Can we workaround this issue?
Yes, we can reduce size of PLAB improving reuse of small chunks.
-XX:OldPLABSize=16
-XX:-ResizeOldPLAB
Options above will force JVM to use PLAB as small as 16 heap words (heap word = 4 bytes in 32 bit JVM or 64 bit JVM with compressed pointers enabled, 8 bytes otherwise).

My experiments have shown drastic reduction of heap fragmentation in few application with known promotion failure problems. Though options above will help you reduce fragmentation, I would recommend to use them only if you HAVE problems with fragmentation. Using PLAB is important from multi core prospective and such soft disabling of PLAB may have serious impact on young collection pause time.

Remainder: Section above was based on my missunderstaning of PLABs. Correct description can be found here.
Next section is ok :) 

How to diagnose problems with fragmentation?

Monitoring heap fragmentation

Below are options, that will help you diagnose and monitor problem with CMS heap fragmentation.
-XX:+PrintGCDetails
-XX:+PrintPromotionFailure
-XX:PrintFLSStatistics=1
PrintGCDetails is MUST HAVE base line GC diagnostic option. PrintPromotionFailure will print few more details about promotion failures (see example below).
592.079: [ParNew (0: promotion failure size = 2698)  (promotion failed): 135865K->134943K(138240K), 0.1433555 secs]
PrintFLSStatistics=1 will print free list statistics each young or old collection (see example below).
Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: 40115394
Max   Chunk Size: 38808526
Number of Blocks: 1360
Av.  Block  Size: 29496
Tree      Height: 22
Most important parameter of FLS (free list space) statistics is max chunk size. If size of max chunk decreasing over time, that means increasing fragmentation of heap and risk of promotion failure.

Conclusion                     

Let me summarize, how to reduce risk of concurrent mode and promotion failures using CMS collector.
1. Provide CMS enough head room, more free space – less risk of fragmentation. Remember any garbage collector need some extra space to work efficiently.
2. Let CMS run continuously using initiating occupancy settings (you may want reduce number of concurrent threads though).
3. If you still having problems – reduce old PLAB size.
4. Avoid spurious allocation of very large objects in application (resize of large hash table is a good example). Regular allocation of large objects is ok, though.

See also

Java GC, HotSpot's CMS promotion buffers 
CMS heap fragmentation. Follow up 1
HotSpot JVM garbage collection options cheat sheet
Understanding GC pauses in JVM, HotSpot's minor GC
Understanding GC pauses in JVM, HotSpot's CMS collector
How to tame java GC pauses? Surviving 16GiB heap and greater



Friday, October 7, 2011

Coherence write behind, finding not-yet-stored entries


Write behind strategy may be very useful in certain cases. Using Coherence you can use this pattern in very smart way - synchronously replicate your data in memory over several nodes, then asynchronously write to slow external storage. This way you will not lose any data in case of single server outage. One unpleasant thing is that Coherence does not retain sequence of updates in certain cases, but wait,  this is a distributed system after all :)

But there is a catch. Let's assume you are receiving message via JMS, put data to Coherence grid, start processing etc. But you do not want to acknowledge message until it is written to persistent storage.
While Coherence let your data survive single node failure, there could be network failure, logical bug in your system or outage of persistent storage your are writing to. You want to be consistent, you can process message, but you do not want to confirm its delivery (delete it from one persistent storage) until it is not written into another persistent storage.

So, is it possible to find which entries in cache are not persisted yet?
Yes, it is. Internally Coherence marks not-yet-stored entries with special flag. This flag is usually invisible for application, but we can access it using some low level Coherence API.

Below is StoreFlagExtractor returning FALSE for vulnerable entries.
public class StoreFlagExtractor extends AbstractExtractor implements PortableObject {

    private static final long serialVersionUID = 20010915L;

    public StoreFlagExtractor() {
    }
   
    @Override
    public int compare(Object object1, Object object2) {
        // make no sense
        throw new UnsupportedOperationException();
    }

    @Override
    public int compareEntries(com.tangosol.util.QueryMap.Entry entry1, com.tangosol.util.QueryMap.Entry entry2) {
        // make no sense
        throw new UnsupportedOperationException();
    }

    @Override
    public Object extract(Object object) {
        // decorator can be extracted only from binary entry
        throw new UnsupportedOperationException();
    }

    @Override
    @SuppressWarnings("rawtypes")
    public Object extractFromEntry(java.util.Map.Entry entry) {
        BinaryEntry binEntry = (BinaryEntry) entry;
        Binary binValue = binEntry.getBinaryValue();
        return extractInternal(binValue, binEntry);
    }

    @Override
    public Object extractOriginalFromEntry(com.tangosol.util.MapTrigger.Entry entry) {
        BinaryEntry binEntry = (BinaryEntry) entry;
        Binary binValue = binEntry.getOriginalBinaryValue();
        return extractInternal(binValue, binEntry);
    }
   
    private Object extractInternal(Binary binValue, BinaryEntry entry) {
        if (ExternalizableHelper.isDecorated(binValue)) {
            Binary store = ExternalizableHelper.getDecoration(binValue, ExternalizableHelper.DECO_STORE);
            if (store != null) {
                Object st = ExternalizableHelper.fromBinary(store, entry.getSerializer());
                return st;
            }
        }
        return Boolean.TRUE;
    }

    @Override
    public void readExternal(PofReader paramPofReader) throws IOException {
        // do nothing
    }

    @Override
    public void writeExternal(PofWriter paramPofWriter) throws IOException {
        // do nothing
    }
}

Using this extractor you can easily query unstored entries, set listeners and even build indexes and CQ for such entries.
UPDATE: Changes of STORE decoration are not triggering cache events (only backing map events) and index updates. Nor CQ nor indexes nor listener would not work.