Tuesday, September 10, 2013

Coherence 101 - EntryProcessor traffic amplification

Oracle Coherence data grid has a powerful tool for inplace data manipulation - EntryProcessor. Using entry processor you can get reasonable atomicity guarantees without locks or transactions (and without drastic performance fees associated).

One good example of entry processor would be built-in ConditionalPut processor, which will verify certain condition before overriding value. This, in turn, could be used for implementing optimistic locking and other patterns.

ConditionalPut could accept only one value, but ConditionalPutAll processor is also available. ConditionalPutAll accepts a map of key/values. Using it, we can update multiple cache entries with single call to NamedCache API.

But there is one caveat.

We have placed values for all keys in single map instance inside of entry processor object. On the other side, in distributed cache keys are distributed across different processes.
How right values would be transferred to right keys?

Answer is simple - every node, owning at least one of keys to be updated, will receive a copy of whole map of values.
In other words, in mid size cluster (i.e. 20 nodes) you may actually transfer 20 times more data over network than really needed.

Modern networks are quite good and you may not notice this traffic amplification effect for some time (as long as you network bandwidth can handle it). But once traffic has reached network limit things are starting to break apart.

Coherence TCMP protocol is very aggressive at grabbing as much of network bandwidth as it can, so other communications protocols will likely perish first.
JDBC connections are likely victim of bandwidth shortage.
Coherence*Extend connection may also suffer (it is using TCP) and proxy nodes may start to fail in unusual ways (e.g. with OutOfMemoryError due transmission backlog overflow).

This problem may be hard to diagnose. TCP is much more vulnerable to bandwidth shortage and you will be kept distracted with TCP communication problems while root cause is excessive TCMP cluster traffic.

Monitoring TCMP statistics (available via MBean) could give you an insight about network bandwidth consumption by TCMP and network health and help to find root cause.

Isolating TCMP in separate switch is also a good practice, BTW

But how to fix it?

Manual data splitting

Simple solution is to split keys set by owning nodes, and then invoke entry processor for each subset individually. Coherence API allows you to find node owning particular key.
This approach is far from ideal though:

  • it will not work for Extend clients,
  • you either have to process all subset sequentially or use threads to do several parallel calls to Coherence API,
  • splitting of key set complicates application logic.
Triggers

Another option is relocating your logic from entry processor to trigger and replacing invokeAll() by putAll() (putAll() does not suffer from traffic amplification). This solution is fairly good and fast, but has certain drawbacks too:

  • it is less transparent (put() is not just put() now),
  • trigger is configured once for all cache operations (not just one putAll() call),
  • you can only have one trigger and it should handle all your data update needs.
Synthetic data keys

Finally you can use DataSplittingProcessor from CohKit project. This utility class is using virtual cache keys to transfer data associated with keys, then it is using backing map API to access real entries.

This solution has its PROs and CONs too:

  • good drop-in replacement for ConditionalPutAll and alike,
  • prone to deadlocks if running concurrently with other bulk updates (it is partially mitigated by sorting keys before locking).

Choosing right solution

In practice I was using all three technique listed above.

Sometimes triggers fit overall cache design quite good.
Sometimes manual data split has its advantages.
And sometimes DataSplittingProcessor is just right remedy for existing entry processors.

3 comments:

  1. >Coherence API allows you to find node owning particular key.

    Could you please give a hint where and how I can find that API?

    ReplyDelete
  2. It is a part of DistributedCacheService API.

    cache.getService().getKeyOwner(key)

    ReplyDelete