Tuesday, 12 May 2015

DCP Magic

What's not that magically is what DCP means. It is just meaning Database Change Protocol. What's really magically is what DCP is doing for you behind the scenes.

Since Couchbase Server 3.0 DCP is the internal de-facto standard protocol to handle changes of a bucket (A bucket is something like a container where your data is living in - other systems are naming it a database). Several sub-systems are using DCP, so for instance the Intra Cluster Replication (Store a copy of data on another node in the cluster for failover purposes), XDCR (Cross Data Center Replication - used in order to transfer data and changes from one Couchbase cluster to another one) or the View Engine ( (A)synchronously update an index for querying purposes). So with DCP, changes made to documents in memory are immediately streamed to be indexed. Before 3.0 the data had to hit the disk before it was picked by the View Engine.

Here two important terms:

  • DCP Client: This is a special client that streams data (mutation messages) from one or more Couchbase Server nodes,
  • Mutation: A mutation is an event that is raised when a a document with a specific key is deleted or if the value of a document with a specific key changes. So mutations are caused by creates, updates, deletes or expirations (deletes those are happening when the time to live of a document expires).
  • Sequence number: Each mutation that occurs on a vBucket (a partition/shard of a Bucket) has assigned a number which strictly increases. So such a sequence number can be used to order that event against other mutations within the same vBucket.

I just got the question how you could use a Couchbase bucket as a kind of Master Database. So as soon something in this Master Database changes, something in another system has to be changed, too. One idea would be to use Kafka and Storm in order to put the mutation messages into a message queue and then to filter/extract the information in the next step. So guess what! Couchbase's Kafka connector consumes the DCP stream ( https://github.com/couchbase/couchbase-kafka-connector ). Under the hood the 2.x Couchbase Java Client/SDK is used.

Important! Before we dig deeper into the Couchbase Java Client and how you can receive a DCP stream, it is really important that you understand that this feature is not yet officially exposed and it is only supported as part of connectors those are officially provided by Couchbase.

It's anyway cool to play a bit around with it in your Dev/Test environment. *g*

The source code examples can be found here: https://github.com/dmaier-couchbase/cb-dcp-receiver .

We are using core functionality of the Couchbase 2.x Java library. Core means that it is usually just used under the hood by being wrapped by a more high level abstracted API.

So let's start to formulate some spells in order to see the magic! First we need a CoreEnvironment. Such an Environment is usually used to pass specific parameters. so we just derive from the DefaultCoreEnvironment.

package com.couchbase.example.dcp;

import com.couchbase.client.core.env.DefaultCoreEnvironment;

/**
 * The environment of the receiver
 * 
 * Let's just use the DefaultCoreEnvirnment in this case
 * 
 * @author David Maier <david.maier at couchbase.com>
 */
public class ReceiverEnv extends DefaultCoreEnvironment {

    public ReceiverEnv()
    {  
        super(new ReceiverEnvBuilder());
    }

    public ReceiverEnv(Builder b)
    {
        super(b);

    }    
}

The Builder of such an environment is also just derived from the default one:

package com.couchbase.example.dcp;

import com.couchbase.client.core.env.DefaultCoreEnvironment;

/**
 * The environment builder
 * 
 * Let's just use the DefaultCoreEnvironment.Builder in this case
 * 
 * @author David Maier <david.maier at couchbase.com>
 */
public class ReceiverEnvBuilder extends DefaultCoreEnvironment.Builder {


}

The implementation of a DCP receiver could look like the following one. It gets passed a list of cluster nodes (used for bootstrapping purposes only) and the bucket details at construction time. It also gets passed a handler which then will handle the incoming DCP messages. There are basically two methods. The 'connect' method bootstraps the core. Then in the next step the request for opening the bucket is send. The 'stream' method opens the DCP connection, gathers the number of vBuckets, requests a DCP stream for each vBucket and finally calls the handler for each message of the DCP stream.

package com.couchbase.example.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseCore;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.GetBucketConfigRequest;
import com.couchbase.example.dcp.handler.IHandler;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Logger;
import rx.Observable;


/**
 * This is the receiver of the DCP stream
 * 
 * @author David Maier <david.maier at couchbase.com>
 */
public class Receiver {

    /**
     * A logger
     */
    private static final Logger LOG = Logger.getLogger(Receiver.class.getName());


    /**
     * The connection timeout to the cluster in seconds
     */
    public static final int CONNECT_TIMEOUT = 2;

    /**
     * The name of the DCP stream
     */
    public static final String STREAM_NAME = "dcp-receiver";

    /**
     * At first a Core environment is required
     */
    private final ReceiverEnv env;

    /**
     * The core of the Couchbase client. DCP is not yet completely exposed, 
     * so we are using a core component here which is used by the Couchbase
     * client itself.
     */
    private final ClusterFacade core;


    /**
     * The nodes to connect to during the bootstrap phase
     */
    private final String[] nodes;


    /**
     * The bucket to connect to
     */
    private final String bucket;

    /**
     * The bucket password
     */
    private final String password;

    /**
     * The connection state
     */
    private boolean connected = false;

    /**
     * To handle the DCP result
     */
    private IHandler handler;

    /**
     * Initialize the receiver
     * @param nodes
     * @param bucket
     * @param password
     */
    public Receiver(String[] nodes, String bucket, String password, IHandler handler) {

        this(nodes, bucket, password, handler, new ReceiverEnv());

    }

    /**
     * Initialize the receiver by passing an environment
     * 
     * @param nodes
     * @param bucket
     * @param env 
     * @param password
     */
    public Receiver(String[] nodes, String bucket, String password, IHandler handler, ReceiverEnv env) {

        this.env = env;
        this.core = new CouchbaseCore(env);
        this.nodes = nodes;
        this.bucket = bucket;
        this.password = password;
        this.handler = handler;

    }

    /**
     * Since the Receiver is initialized we want to connect to the Couchbase Cluster
     */
    public void connect()
    {
        //This sets up the bootstrap nodes for 
        core.send(new SeedNodesRequest(nodes))
                .timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
                .toBlocking()
                .single();

        //Now open a bucket connection
        core.send(new OpenBucketRequest(bucket, password))
                .timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
                .toBlocking()
                .single();

        connected = true;
    }

    /**
     * Open the DCP streams and handle them by using the passed handler
     * 
     */
    public void stream()
    {
        core.send(new OpenConnectionRequest(STREAM_NAME, bucket))
                .toList()
                .flatMap(resp -> numOfVBuckets()) //Send a cluster config request and map the result to the number of vBuckets
                .flatMap(count -> requestStreams(count)) //Stream by taking the number of vBuckets into account
                .toBlocking()
                .forEach(dcp -> this.handler.handle(dcp)); //Now handle every result of the stream here
    }

    /**
     * Retrieve the number of vBuckets of the Bucket
     * @return 
     */
    private Observable<Integer> numOfVBuckets()
    {
        return core.<GetClusterConfigResponse>send(new GetClusterConfigRequest())
                .map(cfg -> { 

                    CouchbaseBucketConfig bucketCfg = (CouchbaseBucketConfig) cfg.config().bucketConfig(bucket);
                    return bucketCfg.numberOfPartitions();

                } );
    }

    /**
     *  Request the streams for all vBuckets
     * 
     * @return 
     */
    private Observable<DCPRequest> requestStreams(int numOfVBuckets)
    {
        return Observable.merge( //Merge the streams to one stream
                Observable.range(0, numOfVBuckets) //For each vBucket
                .flatMap(vBucket -> core.<StreamRequestResponse>send(new StreamRequestRequest(vBucket.shortValue(), bucket))) //Request a stream
                .map(response -> response.stream()) //Return the stream as Observable of DCPRequest
        );               
    }  
}


The following is the implementation of a simple Log Handler. It just takes the received mutation message and prints it to the standard output.

package com.couchbase.example.dcp.handler;

import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.MutationMessage;

/**
 * A logging DCP handler
 * 
 * @author David Maier <david.maier at couchbase.com>
 */
public class LogHandler implements IHandler {

    @Override
    public void handle(DCPRequest dcp) {

        if (dcp instanceof MutationMessage)
        {
            MutationMessage msg = (MutationMessage) dcp;

            System.out.println("key = " +  msg.key() + ", cas = " +  msg.cas());      
        }      

    }
}

To test it, just execute the following test case and insert/update/delete some documents in your bucket:

package com.couchbase.example.dcp;

import com.couchbase.example.dcp.handler.LogHandler;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;

/**
 * Test case
 *
 * @author David Maier <david.maier at couchbase.com>
 */
public class ReceiverTest {


    private static Receiver r;

    public ReceiverTest() {

        //Make sure that DCP is usable
        System.setProperty("com.couchbase.dcpEnabled", "true");
        r = new Receiver(new String[]{"192.168.7.160"}, "test", "test", new LogHandler());
    }

    @BeforeClass
    public static void setUpClass() {
    }

    @Test
    public void testReceiveStream() {

        r.connect();
        r.stream();
    }
}

So after saying just a few spells you can see how log messages regarding your data changes magically appear on the console :-). Hope you had fun with it. A next bog article will more focus on how to use our Kafka connector directly.

6 comments:

  1. Hi David,

    Thanks for the article. I've been trying to use couchbase's kafka-connector for a while with limited success. I find that only some DCPRequests actually make it through, and some seem to disappear into the ether without any errors or anything. I see the same when i implement a simplified version of the connector like your example above. Have you experienced this or have any ideas on what might be going on?

    To be clear, I've copied your example exactly as above and when i update documents only a few of them (although seemingly always the same ones) initiate a DCPRequest to come through, some never get seen.

    Any help would be much appreciated.
    Tom

    ReplyDelete
  2. This line of code doesn't compile under Couchbase 4.0 (core-io-1.2.3.jar), as there is no "stream()" method.
    Please advise on how to update this sample to work on newer code...


    .map(response -> response.stream()) //Return the stream as Observable

    ReplyDelete
  3. Hi Leon,

    seems the core library changed in between. Will update the code examples and ping you again then. This might take until the end of the week.

    Regards,
    David

    ReplyDelete
  4. Hello Leon,

    Streaming is even simpler with 2.2.5 (and the new core library) than before. There is BucketStreamAggregator class which already does the job. All you need is to establish the bucket connections. In my original example, the 'Receiver' class needs to be replaced by the following one: https://gist.github.com/dmaier-couchbase/7de28724139f07c9ba37 . The feed method of the BucketStreamAggregator has two implementations of the feed method. The default feed method allows to stream all mutations, whereby the other one allows you to specify a state in order to stream only the mutations based on this state. If you are more interested in how the inner implementation of the BucketStreamAggregator is looking like, then please take a look here: https://github.com/couchbase/couchbase-jvm-core/blob/master/src/main/java/com/couchbase/client/core/dcp/BucketStreamAggregator.java .

    Hope this helps.

    Regards,
    David

    ReplyDelete
  5. BucketStreamAggregator got deleted in latest core :(
    What do we do now?

    ReplyDelete
  6. Do you have any updates on the status of the DCP client?

    ReplyDelete