JConsole and its successor JVisualVM are awesome tools, but most of information used by these tools is available via JMX for any JMX capable tool (though JVisualVM extensively use other APIs too). JMX and especially set of standard JVM MBeans are awesome too. You can get a lot diagnostic from stock JVM for free.
Damn, would JMX have standard restful HTTP protocol it would be best thing since sliced bread! :)
But despite their awesomeness, there are few things which regularly frustrate me. Here they are
I often have SSH only access to server box (other ports are blocked). JMX is using java RMI and RMI is very unfriendly to constrained networks. While it is possible to setup RMI tunneling via SSH it is a real pain (and for chained SSH environment it is hardly possible at all).
I want to see portion of CPU consumed by threads. You know like top but for threads in JVM. Why this simple thing is not in JConsole?
I still have to parse GC logs. Heap usage diagrams are nice, but I need numbers. Visual GC is cool at diagramming, though.
What I would really like is a command line tool which could get me that information from running JVM.
With Java 1.6 and above it is actually fairly simple to achieve, thank to attach API (which allows you to execute agent on other JVM by process ID). Some digging into JConsole code and woala … my wish come true.
jtop
jtop connects to JVM by PID and periodically dumps thread CPU utilization to a console. Just type
Like jtop, gcrep connects to running JVM, but instead of thread usage, it collects GC information and mimic GC log.
java -jar gcrep.jar <PID>
[GC: G1 Young Generation#30 time: 54ms mem: G1 Survivor: 2048k+0k->2048k G1 Eden: 93184k-93184k->0k G1 Old Gen: 72253k+134k->72387k]
[GC: G1 Young Generation#31 time: 51ms interval: 9648ms mem: G1 Survivor: 2048k+0k->2048k[rate:0.00kb/s] G1 Eden: 93184k-93184k->0k[rate:-9658.37kb/s] G1 Old Gen: 72387k+0k->72387k[rate:0.00kb/s]]
Unlike normal GC logs, here I have detailed information for each space, deltas and approximate allocation rate. Of cause, you can get same thing for GC log with just few hundreds of awk script lines, provided GC log output is not screwed by race condition between concurrent phases (which is happen sometime).
Read-through is a technique which allows
cache to automatically populate entry for external data source up on cache
miss. Oracle Coherence supports this technique via read-write-backing-map and application
provided cache loaders (you can read more in Coherence
documentation).
It can work with binary objects, which
allows you to avoid unneeded serialization/deserialziation in some case.
It is possible to distinguish inserts vs.
updates using BinaryEntryStore. BinaryEntryinterface provides you access to both new and previous version of
value, this may be very useful.
Why Coherence is doing load() before store()?
Assume that we working with key which does
not exist in cache. If you just put(…) new key via named cache
interface, Coherence would work as expected. It will add object to a cache and
call store(…) in cache store plug-in. But if you will use entry processor and setValue(…) for entry which is not in cache – surprise, surprise – Coherence will
first load(…) key and then store(…) new value.
Reason is simple, setValue(…) should return pervious value as result of operation. Use other
version of method – setValue(value,
false) to avoid unnecessary load(…)
call. BTW way putAll(…) should be preferred over put(…) for same reason – putAll(…)
is not required to return previous value.
load() vs. loadAll() methods
Assume that your cache loader using SQL to
fetch data from RDBMS. It is clear what single SQL select retrieving N entries (e.g. using in (…) in where
clause) at once is better than N
subsequent SQL selects each fetching only one entry.
Prior to Coherence 3.7, read-write backing
map implementation were using sequential approach (making bulk cache preloading
with read-though impractical). In Coherence 3.7 this was fixed (but you should use
at least 3.7.1.3 version, earlier versions have known bugs related to read-through).
So, in 3.7 getAll() will use loadAll()
under hood (but remember that your key set will be split to partitions,
distributed across storage members and each storage member will process
read-though in partition-parallel fashion).
But will it work with aggregators and entry
processors invoked over collection of keys? – not so fast …
BTW If you stick
with 3.6 or earlier you can read about work around here.
Aggregator warm up
Assume that you know key set you want to
aggregate using Coherence distributed
aggregation, but some many of these keys may not be in cache (i.g. not-yet-loaded).
Read-though is enabled.
Instance of your aggregator started on
storage node will receive set of BinaryEntrys from Coherence. But
it does mean that all these entries are present in cache, Coherence will not
try to preload working set for aggregator. Actually aggregator may decide to
ignore data-not-in-cache (see isPresent() method). But if it call any kind of “get” methods on entry,
Coherence will load value via cache loader plug-in. Problem is – it will be
done in sequential manner, so this may take A LOT of time.
Can we work this around? - Sure.
Simplest workaround is call getAll()
before invoking aggregator (but it kills idea of distributed
aggregation). A smarter way is dig though internal cache layers and load
entries via call to read-write-backing-map. Snippet below can be used for
effective preloading for set of entries in aggregators and entry processors.
public static void preloadValuesViaReadThrough(Set<BinaryEntry> entries) {
CacheMap backingMap = null;
Set<Object> keys = new HashSet<Object>();
for (BinaryEntry entry : entries) {
if (backingMap == null) {
backingMap = (CacheMap) entry.getBackingMapContext().getBackingMap();
}
if (!entry.isPresent()) {
keys.add(entry.getBinaryKey());
}
}
backingMap.getAll(keys);
}
Aggregation, expiry and past expiry entry resurrection
If you are using read-write backing map in
combination with expiry, you may be prone to following effect.
Assume that your cache is idle for some time
and some of cache entries are already past their expiry. Now you are issuing an
aggregator over all cache data (in my case it was a regular housekeeping job interested
only in live cache data). Filters in Coherence can match only cache data (they
never trigger read-though), but surprisingly, operation described above starts
storming DB with read-through requests!
What has happen?
Lazy expiry
Local cache (acting as internal map for
read-write backing map) is doing expiry passively. If you are not touching it,
it cannot expire anything. But if you call any of its method, expiry check will
be triggered and entries may be physically removed for cache at this point.
Key index of partitioned cache service
Partitioned cache service has internal
structure called “key index” – it is simply a set of all keys in local backing
map. When you issuing a filter based operation, Coherence calculates key set
first (using filter), then perform operation (e.g. aggregation) over know set
of keys. A set of all keys are passed to filter, then filter may decide which
keys to process (it can consult with indexes at this point) and whenever
further filtering by value is required. AlwaysFilter
is very simple; it does not require any value filtering, so Coherence just
passing whole “key index” content as input for aggregation without consulting
with backing map.
Together
A lot of entries in cache are past expiry,
but they are still in cache because it is idle and local cache has no
opportunity to perform expiry check. Aggregator with AlwaysFilter
is issued, and Coherence storage member will perform aggregation against all
keys currently in “key index” (including key past their expiry). Access to
first entry from aggregator will trigger expiry check in backing map,
effectively wiping out expired entries. But aggregator instance is already started
and its entry set already has these keys. By processing recently expired entries,
which are in its entry set, aggregator will be triggering read-though resurrecting
them (and of cause it would be doing it one by one – read SLOW).
How to prevent this?
Well, my conditions are little exotics. You
probably never hit exactly this problem, but still understanding of such
effects may be helpful for related cases.
Workaround is dead simple – call size() on
cache just before issuing an aggregator. size() will hit
backing map, it will have a chance to process expiry, and by the moment of aggregator
arrival dead entries will be removed from “key index” thus no unexpected
read-though would happen.
Conclusion
Live is full of surprises when it comes to
complex distributed systems. Keep your eyes open ;)
Coherence*Extend
is a protocol which is used for non-members of cluster to get access to
Coherence services. Extend is using TCP connection to one of cluster members
(which should host proxy service) and use this member as a relay. Normally
client process is creating single TCP connection. This connection is shared
between allNamedCache instances and threads of client process. To be more precise, it is
creating single TCP connection per remote service, but normal you have just one
remote cache service (and in some case another remote invocation service).
Of cause
TCP connection will failover automatically is case of proxy or network
problems, so for most practical cases it is ok to share single TCP connection
per process. Proxy member of cluster is acting as a relay (in most cases it
doesn’t even deserialize data passing through), so single client process is unlikely
to overload proxy process … unless you are using invocation services. In case
of invocation service proxy is performing logic on behalf of client and it can
be arbitrary complex, so it may be desirable to spread requests across multiple
proxy processes.
Here is
simple trick, we should create as many remote services as connections we want
to have. There is a slight problem, you cannot have same cache name associated
with different remote services at the same time … unless you are using multiple
cache factories.
Below is snippet
of code which is manages creating cache factories and mangling service names to
create separate set of Extend connections perExtendConnection instance.
public class ExtendConnection {
private static Logger LOGGER = Logger.getLogger(ExtendConnection.class.getName());
private static AtomicInteger CONNECTION_COUNTER = new AtomicInteger();
private int connectionId = CONNECTION_COUNTER.incrementAndGet();
private ConfigurableCacheFactory cacheFactory;
private ConcurrentMap<Service, Service> activeServices = new ConcurrentHashMap<Service, Service>(4, 0.5f, 2);
public ExtendConnection(String configFile) {
cacheFactory = initPrivateCacheFactory(configFile);
}
private DefaultConfigurableCacheFactory initPrivateCacheFactory(String configFile) {
LOGGER.info("New Extend connection #" + connectionId + " is going to be created, config: " + configFile);
XmlElement xml = XmlHelper.loadFileOrResource(configFile, "Coherence cache configuration for Extend connection #" + connectionId);
// transforming configuration
XmlElement schemes = xml.getSafeElement("caching-schemes");
for(Object o: schemes.getElementList()) {
XmlElement scheme = (XmlElement) o;
if (isRemoteScheme(scheme)) {
String name = scheme.getSafeElement("service-name").getString();
if (name != null) {
String nname = name + "-" + connectionId;
scheme.getElement("service-name").setString(nname);
}
}
}
DefaultConfigurableCacheFactory factory = new DefaultConfigurableCacheFactory(xml);
return factory;
}
private boolean isRemoteScheme(XmlElement scheme) {
String name = scheme.getName();
return "remote-cache-scheme".equals(name) || "remote-invocation-scheme".equals(name);
}
public NamedCache getCache(String name) {
NamedCache cache = cacheFactory.ensureCache(name, null);
Service service = cache.getCacheService();
activeServices.putIfAbsent(service, service);
return cache;
}
public InvocationService getInvocationService(String serviceName) {
InvocationService service = (InvocationService) cacheFactory.ensureService(serviceName + "-" + connectionId);
activeServices.putIfAbsent(service, service);
return service;
}
/**
* Warning: this method is not concurrency safe, you may get to trouble if you are accessing caches of services via this connection during shutdown.
*/
public void disconnect() {
for(Service service: new ArrayList<Service>(activeServices.keySet())) {
try {
if (service.isRunning()) {
service.stop();
}
}
catch(Exception e) {
LOGGER.log(Level.WARNING, "Exception during remote service shutdown", e);
}
}
}
}
Each
instance of class above manages physical TCP Extend connection (or multiple if
you have multiple remote services in configuration). To create multiple
connections just create multiple instances, but make sure that you are not
leaking them. Extend connections will not be closed automatically by GC, so you
should pool them carefully.
This technique is also useful if you want to keep connections to several different
clusters at the same time.