Monday, November 8, 2010

Data Grid Pattern - Network shared memory

Using shared memory for inter-process communications is a very popular pattern in UNIX world. Many popular RDBMS (both open source and commercial) are implemented as sets of OS processes communicating via shared memory. Designing a system as a set of functionally specialized processes cooperating with each other helps to keep system more transparent and maintainable; while using shared memory as IPC keeps communication overhead is very low. Shared memory is very efficient compared to other forms of IPC. Several processes can exchange or share information with each other via shared memory without involvement of OS (e.i. no syscalls, no context switching overheads).
Unfortunately shared memory is useful only if all communicating processes are hosted on same server (e.i. connected to same memory circuits). While we cannot use real shared memory when building a distributed system, idea of building system as a set of cooperating specialized processes still remains attractive.
Key point of shared memory is what all process can access and modify same data; all processes have consistent view and can do atomic operations on data (e.g. compare-and-set). Data grid technologies allow us to achieve same features but in network environment. Modern data grid products (e.g. Oracle Coherence, GemStone GemFire, etc) provide us key features for shared-memory-like communication:
  • share data with strong consistency guaranties across processes in cluster;
  • atomic operations over single key in shared storage.
Still, data grids remain a very different technology. They have with different tradeoffs compared to shared memory. It is impossible to take a PostgresSQL and make it clustered using Oracle Coherence. But we can reuse and extend architectural approach and develop a “network shared memory” pattern as a way to build distributed system.

“Network shared memory” pattern

First you need separate data and processes in your mind. Next you should identify operations with data which are happening in your system. You may have operations such as serving request using data, updating, importing data, exporting data, transforming data, etc. Ideally you should have separate process dedicated for each operation (though it is not always possible due to various reasons, so be holistic). Once you finished with analyzing your data and designing processes, you can start to work on physical model to store your data in grid. Remember, data for grid should be always modeled with access pattern in mind. Just coping data model from RDBMS may produce disastrous results (data modeling for grid is another large topic).

 Why having multiple processes is better?

Why having multiple processes is better compared e.g. with multiple threads within a single process?
Below are few answers:
  • You can distributed your processes between servers (e.g. for optimizing resource utilization)
  • You can start/stop processes independently
  • You can upgrade processes independently
  • You have better failure isolation (e.g. memory leak in one process will not bring down whole system)
  • You can use less heap per JVM, and have individual memory options for different processes
Sure there are some drawbacks also. A few of them:
  • You will have more JVMs running and thus more overhead of JVM itself.
  • All processes in the end have to communicate over network and it is not as fast as using memory shared between threads.

 Advanced usages of “Network shared memory”

Data grids have high availability built in. We can leverage this feature to achieve high availability for our processes. First pattern is a “hot standby”. We may have several instances of same process running (e.g. on different boxes) but only one of them active. In case of active process going down, grid will detect process failure and we can promote one of standbys to be new active. Sounds simple, but this simple approach requires “death detection”, “peer discovery” and “distributed consensus”. Believe me, all of these is not an easy task to implement. Fortunately, data grid already have all of this implemented, you just need to use its API. Data grid also can be used to store internal state of process, this way you can implement failover even for stateful processes.
A next evolutionary step of this approach is a load balancing between processes. Data grid can help coordinating processes by sharing routing table for request, state for stateful operations and/or using distributed locks for controlling access to resources.

Data grid is more than just distributed data store

While data grid technology is primarily designed for working with larger data sets, advanced features of modern data grid products may bring benefits to your system even if all your data fit a memory of single server. Their high availability and distributed coordination features may be invaluable for designing modular distributed solutions.

Wednesday, October 13, 2010

Data Grid Pattern - Data flow mediator

I want to start series of articles about data grid oriented architectural patterns. First pattern I want to present is a “Data flow mediator”. Let me start with example.
Imagine you have a large ecommerce web application and want to do some real time analysis over user actions. You have a stream of simple event, let’s just say clicks. And you need to do real time aggregation by various dimensions like by user, by product, etc. At large scale this task is quite challenging: number of writes is enormous, different dimensions made shading challenging and business want this analytics as close to real time as possible (say few seconds delay). With or without data grid such will remain challenging, but data grid technology have a strong advantages for such task.
Let me now introduce “data flow mediator” pattern.
In this pattern, data grid is used as buffer between systems which produces events (clicks), and systems which consumes information (real time analysis modules).
From producer point of view:
  • Grid provides high and scalable throughput,
  • Grid provides reasonable balance between durability/performance/cost. In grid we can store data in memory only, protected by multiple redundant copies.
From consumers’ (RT analysis modules) point of view:
  • Advanced data grids (e.g. Coherence, GemFire, etc) provide required queering/aggregation tool (implementing efficient queries by multiple dimensions in grid still an art, but it is doable),
  • High and scalable read throughput. Different analysis modules may share same “mediator”
From architect point of view:
  • Mediator decouples data producer from data consumers, thus localizing impact of changes for each component,
  • Data grid is self managing. Imagine managing DB with 50 shards + HA replication + dynamic adding/removing servers to cluster and you will treasure this feature of data grid.
I have demonstrated this pattern with ecommerce examples, but there are similar use cases in finance and telecom. Key prerequisites for this pattern are:
  • Large number of small updates,
  • Data loss is not fatal (either we can tolerate it or restore data from somewhere else),
  • Large number of read queries,
  • Queries are reasonable simple but more complicated than just get by primary key,
  • Low response time requirements for both read and write,
  • Scale is above than single RDMB can handle.
I hope this article was helpful for you to better understand data grid technology and its use cases.

Monday, October 11, 2010

Coherence, magic trick with cache index

A technical article related to Oracle Coherence.

Oracle Coherence has ability to query data in cache not only by key, but by values (or their attributes). Of course queering by secondary attributes is not as efficient as by primary key, but still can be very useful. Oracle Coherence also supports indexes which improve performance of value based queries significantly.
But some times index may not behave exactly as you expect.

Full text of article is available at GridDynamics blog - http://blog.griddynamics.com/2010/10/coherence-magic-trick-with-cache-index.html

Friday, July 16, 2010

Coherence, ReflectionPofSerializer now supports POF extractor

A technical article related to Oracle Coherence.

A year ago I have implemented and open sourced ReflectionPofSerializer. This class has saved me from implementing thousands of lines of boring serialization code for various Filters, EntryProcessors, Aggregators , Invocables and other mobile objects in Coherence.
Recently I was asked about POF extrator support in ReflectionPofSerializer. My answer was no, it doesn't support extraction from POF directly. But it turns out what ReflectionPofSerializer can be easily extended to support it. A bit of coding and woala, let me introduce ReflectionPofExtrator.

Full text of article is available at GridDynamics blog - http://blog.griddynamics.com/2010/07/coherence-reflectionpofserializer-now.html

Wednesday, June 30, 2010

Coherence. Read through and operation bundling.

A technical article about Oracle Coherence data grid.

If you are using read-write-backing-map, you should know what CacheLoader interface has methods load(...) and loadAll(…) for bulk loading. And if your cache loader is loading data from DB, your implementation of loadAll(…) is probably designed to load all objects via single DB query (cause it is usually order of magnitude more efficient than issue DB query per requested key). Lets assume your read through cache is working just fine, and at some point you decided to implement cache preloading – a popular pattern. To implement preloading you have found some way to collect keys (e.g. via query to DB) and wanted to use getAll(…) call to load data into cache. Everything looks ok, unit tests have passed, but on real data it takes forever to preload cache! Let’s investigate this …

Full text of article is available at GridDynamics blog - http://blog.griddynamics.com/2010/06/coherence-read-through-and-operation.html

Tuesday, April 13, 2010

RIT Conf 2010 (Moscow Internet conference)

Today I had a speach in РИТ++ 2010 - Moscow Internet conference.
Below is slide deck in russian.


Friday, February 26, 2010

4 levels of replication technologies

Esse about state of art in replication technologies.
Replication is widely used to achieve various goals in information systems, such as better performance, fault tolerance, backup, etc. Four classes of replication technologies are available. Your decision about which technology to use will depend on the logical presentation of the data you are trying to replicate.

Full text of article is available at GridDynamics blog - http://blog.griddynamics.com/2010/02/4-levels-of-replication-technologies.html

Thursday, January 28, 2010

Data storages and read vs write controversy

While working with various high-loaded systems recently, I noticed a paradoxical contradiction in the data models. Let me explain through the following comparisons. First let's think about normalized vs. denormalized data models.


Normalized Denormalized

Read

Bad

- Queries become complex
- Joins and nested selects are slow

Good

- Fast queries
- No joins
- Queries are mostly single index lookup
(assuming schema
is tailored for application need)

Write

Good

- Consistency is easier to keep
- No self contraction by schema
- Less rows to update

Bad

- Potential inconsistency in data
- More rows to update
(data may be
duplicated in several places)
- Complex update procedures

Let's continue with the next comparison -- single vs. multiple copies of data.


Single copy Multiple copies

Read

Bad

- Single copy is bottleneck

Good

- Operations can be balanced between copies

Write

Good

- No need to keep copies in sync
- Single place to update

Bad

- Update should performed at every copy
- Synchronization and consistency issues
- Transactional updates become distributed transactions

In general, the patterns are simple. If we think of our data as a set of facts, we anticipate each fact will become several data records (either though database replicas or denormalization of the model). For write access, dealing with each fact as a single record is much more efficient. The impact of this difference between what is and what should be can mitigated by using the optimal storage technology for a given application.


The picture above is very simple (or even simplistic) but it provides some insight for selecting the proper data storage for your applications (or even specific parts of your application). If you have any comments about this, please reply to this post.

Key/value and document-oriented storages

These are data-storage techniques that don't support joins and they usually break the 1st normal form. Due to limited query support, the schema is often prepared in such a way that application queries become simple index lookups. This can be a very effective approach, but the flip side of this is usually a duplication of the entity's attributes across several tables (or their analogs). This makes updating of data more expensive. These types of storages also tending to use asynchronous disk operations which is further undermine their value as system of record storage. Lack of ACID properties is also of no help.

Search indexes

Search indexes such as Lucene and Sphynx provide excellent query performance. They are using data structures designed for information retrieval at cost of expensive updates. Search engines also require denormalized data model, further complicating writes.

RDBMS

Relational databases have a strong reliance on normalization of the data model. It is extremely difficult for an RDBMS to be effective for both read and write operations. While an RDBMS will never be as fast or as simple as a key/value hash table because the write operations will always be quite expensive (due to indexes and consistency checks), they can be a good middle ground between key/value hash tables and MQ-based storage.

MQ

It may be surprising to some people that I have included message queues in the same discussion with databases. But persistent queues or publish/subscribe systems with quarantined delivery are similar to databases. Submitting a message is like a write operation and receiving a message is like a read. The important thing about MQs is that they can be very efficient for write operations. Read operations in MQ environments are very limited, but experience has shown that "write-only" data storage can fill an important niche.

Convergence of paradigms

In the early days of relational databases, many people were skeptical about their future. The main argument against them was performance. But RDBMS technology has survived. Vendors have made the indexes faster and the query optimizers smarter. Overall, the performance and reliability have improved significantly. While maintaining a strong position in their niche, they have slowly assimilated key features of competing technologies to expand their appeal.

Materialized views are actually a smart way to get the benefits of denormalized data while keeping the schema normalized when it comes to updating. Message queues are also becoming a part of RDBMS offerings (e.g., Oracle RDBMS has queues at its core and PostgreSQL has production-ready built-in queues used by Skype).

We are living in an interesting time. Networking has led to the exponential growth in the amount of data created by, and use for, applications. Data-storage technologies have to adapt and evolve quickly, and it's fascinating to watch this evolution. Good luck with your data storage!

Wednesday, January 6, 2010

Java tricks, reducing memory consumption

In this blog post, I want to discuss optimization of java memory usage. The Sun JDK has two simple-but-powerful tools for memory profiling – jmap and jhat.
jmap has two important capabilities for memory profiling. It can:
  • create a heap dump file for any live java process
  • show a heap distribution histogram
Neither of these capabilities requires any special parameters for the Java virtual machine (JVM). Below is a heap distribution histogram produced by jmap.

>jmap -histo:live 16608
num     #instances         #bytes  class name
----------------------------------------------
   1:        260317       25199232  [C
   2:        166085       24205896 
   3:        265765       13336072 
   4:        166085       13304504 
   5:         15645        9921728 
   6:        295513        7092312  java.lang.String
   7:         27721        7048392  [I
   8:         15645        7032592 
   9:         12920        5839232 
  10:         20621        5702816  [B
  11:         27440        3493744  [Ljava.util.HashMap$Entry;
  12:        140640        3375360  java.util.HashMap$Entry
  13:        102400        3276800  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
  14:        102400        3276800  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
  15:         51198        2880304  [Ljava.lang.Object;
  16:        106925        2566200  java.util.concurrent.locks.ReentrantLock$NonfairSync
  17:         22668        1693408  [S
  18:         16749        1607904  java.lang.Class
  19:         23731        1086064  [[I
  20:         30176        1060240  [Ljava.lang.String;
  21:         25600        1024000  org.jboss.netty.util.internal.ConcurrentIdentityHashMap
  22:         25600        1024000  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$KeyIterator
  23:         29038         929216  java.util.LinkedHashMap$Entry
  24:         22857         914280  java.util.HashMap
  25:         13255         848320  org.eclipse.core.internal.resources.ResourceInfo
  26:         25600         819200  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment;
  27:         17506         805264  [[C
  28:         27531         660744  java.util.ArrayList
...
Total       2587755      166479680

For each class we can see the class name, the number of instances of the class in the heap, and the number of bytes used in the heap by all instances of the class. The table is sorted by consumed heap space. Although it is very simple, it is extremely useful. This information is enough to diagnose 60% of heap capacity problems.

[ symbol before class name means that it is array of objects, arrays of primitive types look like
[B - for byte[]
[C
- for char[]
[I
- for int[]

If a heap distribution histogram is too high-level, you can try jhat. jhat can read and explore a heap dump. It has a web interface and you can click though your dumped object graph. Of course, clicking through a few million objects is not fun. Fortunately, jhat supports query language. Let's give it a try.

>jmap -dump:file=eclipse.heap 16608
Dumping heap to C:\Program Files (x86)\Java\jdk1.6.0_11\bin\eclipse.heap ...
Heap dump file created

>jhat -J-Xmx1G eclipse.heap
Reading from eclipse.heap...
Dump file created Wed Oct 14 08:41:07 MSD 2009
Snapshot read, resolving...
Resolving 2300585 objects...
Chasing references, expect 460 dots.............................................
................................................................................
................................................................................
.......................................
Eliminating duplicate references................................................
................................................................................
................................................................................
....................................
Snapshot resolved.
Started HTTP server on port 7000
Server is ready.

Now open http://localhost:7000 and you can see a summary of all classes. You can use standard queries via links or go straight to the “Execute Object Query Language (OQL) query” link at the bottom and type your own query. Query language is quite awkward (it is based on the java script engine) and may not work well for large numbers of objects, but it is a very powerful tool.

Enterprise applications are 80% strings and maps

Let’s look at the jmap histogram again. In the top row, we can see "[C" class consuming most of the heap space. Actually, these char arrays are part of String objects, and we can see that String instances are also consuming considerable space. From my experience, 60-80% of heaps in an enterprise application are consumed by strings and hash maps.

Strings

Let's look how the JVM is storing strings. String objects are semantically immutable. Each instance has four fields (all except hash are marked final):
   Reference to char array
   Integer offset
   Integer count of character
    Integer string hash (lazily evaluated, and once evaluated never changes)
How much memory does one String instance consume?

Here and below are size calculations for the Sun JVM (32bit). This should be similar for other vendors.
Object header (8 bytes) + 3 refs (12 bytes) + int (4 bytes) = 24 bytes. But the String instance is only a header. Actual text is stored in char array (2 bytes each character + 12 bytes array header).


String instances can share char arrays with each other. When you call substring(…) on a String instance, no new char array allocation happens. Instead, a new String referencing subrange of existing char arrays is created. Sometimes this can become a problem. Imagine you are loading a text file (e.g., CSV). First you are loading the entre line as string, then you seek the position of the field and call substring(…). Now your small field value string object has a reference to an entry line of text. Sometime later, a string header for the text line object is collected, but characters are still in memory because they are referenced via other string object.
In the illustration above, useful data is marked by yellow. We can see that some characters cannot be accessed any more, but still occupy space in the heap. How can you avoid such problems?
If you are creating a String object with a constructor, a new char array is always allocated. To copy content of a substring you can use the following construct (it looks a bit strange, but it works):
new String(a.substring(…))

String.intern() – think twice

String class has a method intern() that can guarantee the following:
a.equals(b)  => a.intern() == b.intern()
There is a table inside of the JVM used to store normal forms of strings. If some text is found in a table then value from a table will be returned from intern(), else the string will be added to table. So a reference returned by intern() is always an object from JVM intern table.
String intern table keep weak references to its objects, so unused strings can be collected as garbage when no other references exist except from intern table itself. It looks like a great idea to use intern(), and eliminate all duplicated strings in an application. Many have tried … and many have regretted such decision. I cannot say this is true for every JVM vendor, but if you are using Sun’s JVM, you should never do this.
Why?
  • JVM string intern tables are stored in PermGen – a separate region of the heap (not included in ‑Xmx size) used for the JVM’s internal needs. Garbage collection in this area is expensive and size is limited (though configurable).
  • You would have to insert a new string into the table which has O(n) complexity, where n is table size.
String intern tables in JVMs work perfectly for the JVM’s needs (new entries are added only while loading new classes so insertion time is not a big issue) and it is very compact. But it is completely unsuitable for storing millions of application strings. It just was not designed for such a use case.

Removing of duplicates

The JVM’s string intern table is not an option but the idea of eliminating string duplicates is very attractive. What can we do?
public
class
InternTable {

 
private
   final Map table = new HashMap();

 
public X intern(X val) {
  X in = table.get(val);
  if (in == null) {
  table.put(val, val);
  in = val;
  }
  return in;
  }
}
Above is a simple snippet of a custom intern table. It can be used for strings or any other immutable objects. Looks good, but it has a problem. Such an implementation will prevent objects from being collected by GC. What can we do?
We need to use weak references. There are WeakHashMap classes in the JDK. It is a hash table that uses weak references for keys. Let's try to use it.
public
class
InterTable {

private
final
Map table = new WeakHashMap();

public X intern(X val) {
  WeakReference ref = table.get(val);
  X in = ref == null ? null : ref.get();
  if (in == null) {
    table.put(val, new WeakReference(val));
    in = val;
  }
  return in;
 }
}
Did you notice we also need a weak reference for the wrap value? This will work, but what is the cost?
   Reference from hash table – 1 ref * ratio of unused slots (~6 bytes)
   WeakHashMap$Entry object – 40 bytes
   value WeakReference – 24 bytes

The cost per entry in the table is about 70 bytes. That’s expensive. Can we reduce it?
Right now we have to have two weak references per entry. If we rewrite WeakHashMap to return an entry from the get(...) method (instead of the value), we can drop the second weak reference and save 24 bytes. But the cost of such an intern table is still high. You should analyze/experiment with your data to see if such a trick will bring greater benefits in your case.

UTF8 strings

Java strings are UTF and encoded using UTF16 in memory. If they are to be converted to UTF8, the actual text is likely to consume half the memory. But using UTF8 will break compatibility with the String class. You have to create your own class and convert it to standard String for a majority of operations. This will increase CPU usage, but in some edge cases this approach can be useful for overcoming heap size limitations (e.g., storing large text indexes in memory).

Maps and sets

Old good HashMap

Good old java.util.HashMap is used everywhere. Standard implementation in JDKs is to use the open hash table data structure.
References to key and value are stored in the Entry object (which also keeps the hash for the key for faster access). If several keys are mapped to same hash slot, they are stored as a list of interlinked entries. The size of each entry structure: object header + 3 references + int = 24 bytes. java.util.HashSet is using java.util.HashMap under the hood so your overhead will be the same.
Can we store map/set in more compact form?
Sure.

Sorted array map

If the keys are comparable, we can store the map or set as sorted arrays of interleaved key/value pairs (array of keys in the case of a set). Binary search can be used for fast lookups in an array, but insertion and deletion of entries will have to shift elements. Due to the high cost of updates in such a structure, it will be effective only for smaller collection sizes, and for operation patterns that are mostly read. Fortunately, this is usually what we have – map/set of a few dozen objects that are read more often than modified.

Closed hash table

If we have a collection of a larger size, we should use a hashtable. Can we make the hashtable more compact? Again, the answer is yes. There are data structures called closed hashtables that do not require entry objects.
In closed hashtables, references from the hashtable point directly to an object (key). What if we want to put in a reference to a key but the hash slot is occupied already? In such a case, we should find another slot (e.g., next one). How do you lookup a key? Search through all adjacent slots until key or null reference is found. As you can see from algorithm, it is very important to have enough empty slots in your table. Density of closed hash tables should be kept below 0.5 to avoid performance degradation.

Maps structures summary

Open hash map/set
Cost per entry:
  • 1 ref * 1.33 (for 0.75 density)
  • Entry object
  • Total: ~30 bytes
+ Better handling hash collisions.
+ Fast access to hash code.
Closed hash map/set
Cost per entry (map):
  • 2 ref * 2 (for 0.5 density)
  • Total: 16 bytes
Cost per entry (set):
  • 1 ref * 2 (for 0.5 density)
  • Total: 8 bytes
- Generally slower than open version.
Sorted array map/ser
Cost per entry (map):
  • 2 ref
  • Total: 8 bytes
Cost per entry (set):
  • 1 ref
  • Total: 4 bytes
- For small collection only.
- Expensive update/delete.

The JDK is using an open hashtable structure because in general case it is a better structure. But when you are desperate to save memory, the other two options are worth considering.

Conclusion

Optimization is an art. There are no magical data structures capable of solving every problem. As you can see, you have to fight for every byte. Memory optimization is a complex process. Remember that you should design your data so each object can be referenced from different collections (instead of having to copy data). It is usually better to use semantically immutable objects because you can easily share them instead of copying them. And from my experience, in a well-designed application, optimization and tuning can reduce memory usage by 30-50%. If you have a very large amount of data, you have to be ready to handle it.