Coherence*Extend
is a protocol which is used for non-members of cluster to get access to
Coherence services. Extend is using TCP connection to one of cluster members
(which should host proxy service) and use this member as a relay. Normally
client process is creating single TCP connection. This connection is shared
between allNamedCache instances and threads of client process. To be more precise, it is
creating single TCP connection per remote service, but normal you have just one
remote cache service (and in some case another remote invocation service).
Of cause
TCP connection will failover automatically is case of proxy or network
problems, so for most practical cases it is ok to share single TCP connection
per process. Proxy member of cluster is acting as a relay (in most cases it
doesn’t even deserialize data passing through), so single client process is unlikely
to overload proxy process … unless you are using invocation services. In case
of invocation service proxy is performing logic on behalf of client and it can
be arbitrary complex, so it may be desirable to spread requests across multiple
proxy processes.
Here is
simple trick, we should create as many remote services as connections we want
to have. There is a slight problem, you cannot have same cache name associated
with different remote services at the same time … unless you are using multiple
cache factories.
Below is snippet
of code which is manages creating cache factories and mangling service names to
create separate set of Extend connections perExtendConnection instance.
public class ExtendConnection { private static Logger LOGGER = Logger.getLogger(ExtendConnection.class.getName()); private static AtomicInteger CONNECTION_COUNTER = new AtomicInteger(); private int connectionId = CONNECTION_COUNTER.incrementAndGet(); private ConfigurableCacheFactory cacheFactory; private ConcurrentMap<Service, Service> activeServices = new ConcurrentHashMap<Service, Service>(4, 0.5f, 2); public ExtendConnection(String configFile) { cacheFactory = initPrivateCacheFactory(configFile); } private DefaultConfigurableCacheFactory initPrivateCacheFactory(String configFile) { LOGGER.info("New Extend connection #" + connectionId + " is going to be created, config: " + configFile); XmlElement xml = XmlHelper.loadFileOrResource(configFile, "Coherence cache configuration for Extend connection #" + connectionId); // transforming configuration XmlElement schemes = xml.getSafeElement("caching-schemes"); for(Object o: schemes.getElementList()) { XmlElement scheme = (XmlElement) o; if (isRemoteScheme(scheme)) { String name = scheme.getSafeElement("service-name").getString(); if (name != null) { String nname = name + "-" + connectionId; scheme.getElement("service-name").setString(nname); } } } DefaultConfigurableCacheFactory factory = new DefaultConfigurableCacheFactory(xml); return factory; } private boolean isRemoteScheme(XmlElement scheme) { String name = scheme.getName(); return "remote-cache-scheme".equals(name) || "remote-invocation-scheme".equals(name); } public NamedCache getCache(String name) { NamedCache cache = cacheFactory.ensureCache(name, null); Service service = cache.getCacheService(); activeServices.putIfAbsent(service, service); return cache; } public InvocationService getInvocationService(String serviceName) { InvocationService service = (InvocationService) cacheFactory.ensureService(serviceName + "-" + connectionId); activeServices.putIfAbsent(service, service); return service; } /** * Warning: this method is not concurrency safe, you may get to trouble if you are accessing caches of services via this connection during shutdown. */ public void disconnect() { for(Service service: new ArrayList<Service>(activeServices.keySet())) { try { if (service.isRunning()) { service.stop(); } } catch(Exception e) { LOGGER.log(Level.WARNING, "Exception during remote service shutdown", e); } } } }
Each
instance of class above manages physical TCP Extend connection (or multiple if
you have multiple remote services in configuration). To create multiple
connections just create multiple instances, but make sure that you are not
leaking them. Extend connections will not be closed automatically by GC, so you
should pool them carefully.
This technique is also useful if you want to keep connections to several different
clusters at the same time.
Looks like there is a bug in the disconnect code. When called, I get the following:
ReplyDelete2012-04-18 11:03:59,331 [main] WARN com.overstock.cache.multiextend.ExtendConnection - Exception during service shutdown:
java.lang.IllegalArgumentException: Missing scheme for service: "cache1ExtendedTcpProxyService1-1"
at com.tangosol.net.DefaultConfigurableCacheFactory.findServiceScheme(DefaultConfigurableCacheFactory.java:767)
at com.tangosol.net.DefaultConfigurableCacheFactory.ensureService(DefaultConfigurableCacheFactory.java:337)
at com.tangosol.net.CacheFactory.getService(CacheFactory.java:161)
at com.xxx.cache.multiextend.ExtendConnection.disconnect(ExtendConnection.java:48)
at com.xxx.cache.multiextend.Main.main(Main.java:16)
2012-04-18 11:03:59.328/2.479 Oracle Coherence GE 3.7.1.0 (thread=main, member=n/a): Loaded cache configuration from "file:/home/slandis/clients/config/cache1-client-config.xml"
Shouldn't the disconnect be using the same name that was generated in initialization?
Hi,
DeleteIndeed, there were a bug in my code. I have updated code snippet now.
Private cache factories are bit tricky to work with. In short, I have just added explicit tracking of all active services. It seems to be impossible to get list of running local services from SafeCluster without accidentally triggering cluster formation (which it totally unneeded in Extend client process and dangerous).
Sorry for posting broken code (I just never disconnect from remote cluster in my current applications so far).
Thank you, for finding this
Thanks Alexey, I will give this a try and report back.
ReplyDeleteHi Alexey,
ReplyDeletewe are currently using the ExtendConnection class in our multi-threaded extend client.
We do observe that one connection is open per thread but we are a bit disappointed with the performances (multi-processes clients with one thread are much performant than one process client with multi-threads).
We also observe with JMX and ConnectionMBean that only one connection is really used during the communication with the extend client (the total messages sent and received only grow for one connection).
Do you observe same thing in your tests ?
regards,
Christophe
Hi Christophe,
DeleteYou are probably doing something wrong. To utilize multiple connections you should create multiple ExtendConnection object, and each thread should you NamedCache retrieved from its ExtendConnection object (you should call CacheFactory at all).
Regards,
Alexey
Hi,
DeleteExactly, there were a bug in our service layer which made NamedCache static and so, unique for all threads.
Thanks for your response.
Regards,
Christophe