Tuesday, April 10, 2012

Coherence, managing multiple Extend connections

Coherence*Extend is a protocol which is used for non-members of cluster to get access to Coherence services. Extend is using TCP connection to one of cluster members (which should host proxy service) and use this member as a relay. Normally client process is creating single TCP connection. This connection is shared between allNamedCache instances and threads of client process. To be more precise, it is creating single TCP connection per remote service, but normal you have just one remote cache service (and in some case another remote invocation service).

Of cause TCP connection will failover automatically is case of proxy or network problems, so for most practical cases it is ok to share single TCP connection per process. Proxy member of cluster is acting as a relay (in most cases it doesn’t even deserialize data passing through), so single client process is unlikely to overload proxy process … unless you are using invocation services. In case of invocation service proxy is performing logic on behalf of client and it can be arbitrary complex, so it may be desirable to spread requests across multiple proxy processes.

Here is simple trick, we should create as many remote services as connections we want to have. There is a slight problem, you cannot have same cache name associated with different remote services at the same time … unless you are using multiple cache factories.

Below is snippet of code which is manages creating cache factories and mangling service names to create separate set of Extend connections perExtendConnection instance.

public class ExtendConnection {

    private static Logger LOGGER = Logger.getLogger(ExtendConnection.class.getName());
    
    private static AtomicInteger CONNECTION_COUNTER = new AtomicInteger();

    private int connectionId = CONNECTION_COUNTER.incrementAndGet();
    private ConfigurableCacheFactory cacheFactory;
    private ConcurrentMap<Service, Service> activeServices = new ConcurrentHashMap<Service, Service>(4, 0.5f, 2);
    
    public ExtendConnection(String configFile) {
        cacheFactory = initPrivateCacheFactory(configFile);
    }

    private DefaultConfigurableCacheFactory initPrivateCacheFactory(String configFile) {
        LOGGER.info("New Extend connection #" + connectionId + " is going to be created, config: " + configFile);

        XmlElement xml = XmlHelper.loadFileOrResource(configFile, "Coherence cache configuration for Extend connection #" + connectionId);
        // transforming configuration
        XmlElement schemes = xml.getSafeElement("caching-schemes");
        for(Object o: schemes.getElementList()) {
            XmlElement scheme = (XmlElement) o;
            if (isRemoteScheme(scheme)) {
                String name = scheme.getSafeElement("service-name").getString();
                if (name != null) {
                    String nname = name + "-" + connectionId;
                    scheme.getElement("service-name").setString(nname);
                }
            }
        }
        
        DefaultConfigurableCacheFactory factory = new DefaultConfigurableCacheFactory(xml);
        return factory;
    }
    
    
    private boolean isRemoteScheme(XmlElement scheme) {
        String name = scheme.getName();
        return "remote-cache-scheme".equals(name) || "remote-invocation-scheme".equals(name);
    }


    public NamedCache getCache(String name) {
        NamedCache cache = cacheFactory.ensureCache(name, null);
        Service service = cache.getCacheService();
        activeServices.putIfAbsent(service, service);
        return cache;
    }

    public InvocationService getInvocationService(String serviceName) {
        InvocationService service = (InvocationService) cacheFactory.ensureService(serviceName + "-" + connectionId);
        activeServices.putIfAbsent(service, service);
        return service;
    }

    /**
     * Warning: this method is not concurrency safe, you may get to trouble if you are accessing caches of services via this connection during shutdown.
     */
    public void disconnect() {
        for(Service service:  new ArrayList<Service>(activeServices.keySet())) {
            try {
                if (service.isRunning()) {
                    service.stop();
                }
            }
            catch(Exception e) {
                LOGGER.log(Level.WARNING, "Exception during remote service shutdown", e);
            }
        }
    }
}
Each instance of class above manages physical TCP Extend connection (or multiple if you have multiple remote services in configuration). To create multiple connections just create multiple instances, but make sure that you are not leaking them. Extend connections will not be closed automatically by GC, so you should pool them carefully.

This technique is also useful if you want to keep connections to several different clusters at the same time.

6 comments:

  1. Looks like there is a bug in the disconnect code. When called, I get the following:

    2012-04-18 11:03:59,331 [main] WARN com.overstock.cache.multiextend.ExtendConnection - Exception during service shutdown:
    java.lang.IllegalArgumentException: Missing scheme for service: "cache1ExtendedTcpProxyService1-1"
    at com.tangosol.net.DefaultConfigurableCacheFactory.findServiceScheme(DefaultConfigurableCacheFactory.java:767)
    at com.tangosol.net.DefaultConfigurableCacheFactory.ensureService(DefaultConfigurableCacheFactory.java:337)
    at com.tangosol.net.CacheFactory.getService(CacheFactory.java:161)
    at com.xxx.cache.multiextend.ExtendConnection.disconnect(ExtendConnection.java:48)
    at com.xxx.cache.multiextend.Main.main(Main.java:16)
    2012-04-18 11:03:59.328/2.479 Oracle Coherence GE 3.7.1.0 (thread=main, member=n/a): Loaded cache configuration from "file:/home/slandis/clients/config/cache1-client-config.xml"


    Shouldn't the disconnect be using the same name that was generated in initialization?

    ReplyDelete
    Replies
    1. Hi,

      Indeed, there were a bug in my code. I have updated code snippet now.

      Private cache factories are bit tricky to work with. In short, I have just added explicit tracking of all active services. It seems to be impossible to get list of running local services from SafeCluster without accidentally triggering cluster formation (which it totally unneeded in Extend client process and dangerous).

      Sorry for posting broken code (I just never disconnect from remote cluster in my current applications so far).

      Thank you, for finding this

      Delete
  2. Thanks Alexey, I will give this a try and report back.

    ReplyDelete
  3. Hi Alexey,

    we are currently using the ExtendConnection class in our multi-threaded extend client.
    We do observe that one connection is open per thread but we are a bit disappointed with the performances (multi-processes clients with one thread are much performant than one process client with multi-threads).

    We also observe with JMX and ConnectionMBean that only one connection is really used during the communication with the extend client (the total messages sent and received only grow for one connection).

    Do you observe same thing in your tests ?

    regards,

    Christophe

    ReplyDelete
    Replies
    1. Hi Christophe,

      You are probably doing something wrong. To utilize multiple connections you should create multiple ExtendConnection object, and each thread should you NamedCache retrieved from its ExtendConnection object (you should call CacheFactory at all).

      Regards,
      Alexey

      Delete
    2. Hi,

      Exactly, there were a bug in our service layer which made NamedCache static and so, unique for all threads.

      Thanks for your response.

      Regards,

      Christophe

      Delete