Working on the database project Keva has given me the chance to learn and experience on many new things. When learning about the replication feature. One small part of process called partial synchronization required the master node to give the slave nodes the chunks of write commands starting from a specific offset. This means the master node need a data structure to store those write command and that data must be easily copied and send to the slave node.

As I mainly based the features on Redis, I copied their solution and try to reimplement it in Java.

The requirements

The use case of partial synchronization is to help data replicate correct during network turbulence where nodes lose connection to one another for a short period of time. From this need, what we basically want is a buffer to some bytes that nodes can access after reconnected and be able to catch up quickly without having to download the whole snapshot file of the DB. Since it serves to store short term data somewhat like a cache, we’d like to be able limit it size to something reasonable. When reading, we would like to read it in sequential manners to keep the correct order of commands. And of course we also want it to be fast. To summarize, we’ll need a data structure with these properties:

A simple test to illustrate the functionality would like this:

@Test
void whenCopyOffset_withDataTrimmed_returnCorrectData() {
    ReplicationBacklog backlog = new ReplicationBacklog(4, 1);
    backlog.put(new byte[]{1,});
    backlog.put(new byte[]{2, 2});
    backlog.put(new byte[]{3, 3, 3});
    byte[] bytes;
    assertEquals(3, backlog.getStartingOffset());
    bytes = backlog.copyFromOffset(3);
    assertArrayEquals(new byte[]{3, 3, 3}, bytes);

    backlog.put(new byte[]{4, 4, 4, 4});
    assertEquals(6, backlog.getStartingOffset());
    bytes = backlog.copyFromOffset(6);
    assertArrayEquals(new byte[]{4, 4, 4, 4}, bytes);
}

@Test
void whenCopyOffset_withStringBytes_returnCorrectData() {
    ReplicationBacklog backlog = new ReplicationBacklog(999, 10);
    backlog.put("set a b\r\n".getBytes(StandardCharsets.UTF_8));
    backlog.put("set abc def\n".getBytes(StandardCharsets.UTF_8));

    byte[] bytes = backlog.copyFromOffset(0);
    assertEquals("set a b\r\nset abc def\n",new String(bytes));
}

Sadly I’ve searched everywhere and it seems a lock free efficient data structure that satisfies all the above mentioned doesn’t quite exist yet so what we’ll implement here can only address the other first three points. Concurrency can be obtained through adding locks later on.

Structure overview

We’ll be using a double ended linked list where each node holds a byte array. The double ended nature helps ensure ordering and memory cleanup, we can add new nodes to the tail end and remove nodes from the front in O(1) time complexity. Each node holds a byte array that contains are data. In this case, the byte representation of a ‘command’ e.g “put a 2”.

package com.vietblu;

public class BlockNode {

    private final byte[] array;
    private int used = 0;

    public BlockNode(int capacity) {
        this.array = new byte[capacity];
    }

    public byte[] getArray() {
        return array;
    }

    public int getUsed() {
        return used;
    }

    public int getCapacity() {
        return array.length;
    }

    public void put(byte[] data, int start, int len) {
        System.arraycopy(data, start, array, used, len);
        used += len;
    }

}

We keep track of the remaining free bytes. When the next command requires more space, a new node is appended to our list. Here ‘put’ is using arraycopy for demonstration purpose. In server usage, the code should try to write directly from network buffer to this buffer. With our building blocks in place, we can start implementing the main structure.

    private final int blockSize;
    private final int capacity;
    private final Deque<BlockNode> blockList;
    private int endingOffset = 0; // increase every write equal to the length of command written
    private int startingOffset = 0;
    private int spaceUsed = 0;
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
p u t _ a _ 2 \n g e t _ a \n

Operations

There are 3 operations we’ll need to support:

Storing new commands

Let’s start with how to store new commands:

    /**
     * Buffer the command, if capacity limit is reached, it will delete the oldest block
     *
     * @param command the command
     */
    public void put(byte[] command) {
        endingOffset += command.length;
        spaceUsed += command.length;
        BlockNode lastNode = blockList.getLast();
        int available = lastNode.getCapacity() - lastNode.getUsed();
        if (available >= command.length) {
            lastNode.put(command, 0, command.length);
        } else {
            trimFront();
            lastNode.put(command, 0, available);
            BlockNode newNode = new BlockNode(Math.max(command.length, blockSize));
            newNode.put(command, available, command.length - available);
            blockList.addLast(newNode);
        }
    }

Trimming old commands

The process to free up older data is straightforward. As long as the amount of space needed is still larger than our capacity, remove the first block and update the relevant tracking values

    /**
     * Remove front node if memory used exceeds capacity
     */
    private void trimFront() {
        while (spaceUsed > capacity) {
            BlockNode blockNode = blockList.removeFirst();
            spaceUsed -= blockNode.getUsed();
            startingOffset += blockNode.getUsed();
        }
    }

Reading off the buffer

Finally we look into reading off the buffer starting from a provided offset.

    /**
     * Copy data from provided offset to the ending offset.
     *
     * @param startReadOffset the offset to start reading from provided by caller
     * @return byte array of containing commands
     * @throws IllegalArgumentException if offset is larger than current available or smaller than minimum available
     */
    public byte[] copyFromOffset(int startReadOffset) {
        if (startReadOffset < startingOffset || startReadOffset > endingOffset) {
            throw new IllegalArgumentException("Offset not available");
        }
        int readingOffset = startingOffset;
        Iterator<BlockNode> iter = blockList.iterator();
        int nextWriteOffset = 0;
        byte[] result = new byte[endingOffset - startReadOffset];
        while (iter.hasNext()) {
            BlockNode node = iter.next();
            readingOffset += node.getUsed();
            if (readingOffset >= startReadOffset) {
                int startPos = node.getUsed() - (readingOffset - startReadOffset);
                System.arraycopy(node.getArray(), startPos, result, nextWriteOffset, readingOffset - startReadOffset);
                nextWriteOffset += readingOffset - startReadOffset;
                break;
            }
        }
        while (iter.hasNext()) {
            BlockNode node = iter.next();
            System.arraycopy(node.getArray(), 0, result, nextWriteOffset, node.getUsed());
            nextWriteOffset += node.getUsed();
        }
        return result;
    }

The steps can be summarized as follows:

Conclusion

There a lot of areas to improvement here, such as clearer code structure, better namings, adding more validations and implementing concurrency. However, this article illustrated the general idea on how a reusable backlog can be implemented through use of a doubly linked list backed by nodes of byte array. For more details, check out the full code in this repository