Working on the database project Keva has given me the chance to learn and experience on many new things. When learning about the replication feature. One small part of process called partial synchronization required the master node to give the slave nodes the chunks of write commands starting from a specific offset. This means the master node need a data structure to store those write command and that data must be easily copied and send to the slave node.
As I mainly based the features on Redis, I copied their solution and try to reimplement it in Java.
The requirements
The use case of partial synchronization is to help data replicate correct during network turbulence where nodes lose connection to one another for a short period of time. From this need, what we basically want is a buffer to some bytes that nodes can access after reconnected and be able to catch up quickly without having to download the whole snapshot file of the DB. Since it serves to store short term data somewhat like a cache, we’d like to be able limit it size to something reasonable. When reading, we would like to read it in sequential manners to keep the correct order of commands. And of course we also want it to be fast. To summarize, we’ll need a data structure with these properties:
- Memory configurable, use a limited amount of memory, if all more data comes in, we can discard older ones.
- Data can be retrieve in correct sequential order.
- Operations for discarding old data, writing new one, retrieving in order should be all relatively fast.
- Support multi thread concurrency if possible.
A simple test to illustrate the functionality would like this:
@Test
void whenCopyOffset_withDataTrimmed_returnCorrectData() {
ReplicationBacklog backlog = new ReplicationBacklog(4, 1);
backlog.put(new byte[]{1,});
backlog.put(new byte[]{2, 2});
backlog.put(new byte[]{3, 3, 3});
byte[] bytes;
assertEquals(3, backlog.getStartingOffset());
bytes = backlog.copyFromOffset(3);
assertArrayEquals(new byte[]{3, 3, 3}, bytes);
backlog.put(new byte[]{4, 4, 4, 4});
assertEquals(6, backlog.getStartingOffset());
bytes = backlog.copyFromOffset(6);
assertArrayEquals(new byte[]{4, 4, 4, 4}, bytes);
}
@Test
void whenCopyOffset_withStringBytes_returnCorrectData() {
ReplicationBacklog backlog = new ReplicationBacklog(999, 10);
backlog.put("set a b\r\n".getBytes(StandardCharsets.UTF_8));
backlog.put("set abc def\n".getBytes(StandardCharsets.UTF_8));
byte[] bytes = backlog.copyFromOffset(0);
assertEquals("set a b\r\nset abc def\n",new String(bytes));
}
Sadly I’ve searched everywhere and it seems a lock free efficient data structure that satisfies all the above mentioned doesn’t quite exist yet so what we’ll implement here can only address the other first three points. Concurrency can be obtained through adding locks later on.
Structure overview
We’ll be using a double ended linked list where each node holds a byte array. The double ended nature helps ensure ordering and memory cleanup, we can add new nodes to the tail end and remove nodes from the front in O(1) time complexity. Each node holds a byte array that contains are data. In this case, the byte representation of a ‘command’ e.g “put a 2”.
package com.vietblu;
public class BlockNode {
private final byte[] array;
private int used = 0;
public BlockNode(int capacity) {
this.array = new byte[capacity];
}
public byte[] getArray() {
return array;
}
public int getUsed() {
return used;
}
public int getCapacity() {
return array.length;
}
public void put(byte[] data, int start, int len) {
System.arraycopy(data, start, array, used, len);
used += len;
}
}
We keep track of the remaining free bytes. When the next command requires more space, a new node is appended to our list. Here ‘put’ is using arraycopy
for demonstration purpose. In server usage, the code should try to write directly from network buffer to this buffer. With our building blocks in place, we can start implementing the main structure.
private final int blockSize;
private final int capacity;
private final Deque<BlockNode> blockList;
private int endingOffset = 0; // increase every write equal to the length of command written
private int startingOffset = 0;
private int spaceUsed = 0;
blockSize
controls the amount of bytes for each block node whilecapacity
denotes the combined limit for the entire buffer.blockList
is our main data storage.endingOffset
points to the end of available data whilestartingOffset
tells us where we can start reading the bytes from. Given this byte array, the start value would be 0 whilecurrentOffset
is 13.- Given each block size is 8 bytes, the data will be stored as
[p u t _ a 2 \n] [g e t _ a \n X X X]
with ‘_’ denoting a ‘space’ and ‘X’ denoting empty slots.
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
p | u | t | _ | a | _ | 2 | \n | g | e | t | _ | a | \n |
spaceUsed
combined withcapacity
let us know when our limit is reached and we have to perform trimming of previously stored data.
Operations
There are 3 operations we’ll need to support:
- Storing a command into the buffer. Server will continuously store new commands using this method
- Trimming the buffer when we run out of memory space. It should be executed internally by our data structure to keep free up slots for new commands.
- Allows reading from a custom starting position. This is used for when a replica asks for the latest commands. For example, replica could have read until offset 100, then goes down, when it starts back up, it’ll ask the main node for commands that happened after the offset.
Storing new commands
Let’s start with how to store new commands:
/**
* Buffer the command, if capacity limit is reached, it will delete the oldest block
*
* @param command the command
*/
public void put(byte[] command) {
endingOffset += command.length;
spaceUsed += command.length;
BlockNode lastNode = blockList.getLast();
int available = lastNode.getCapacity() - lastNode.getUsed();
if (available >= command.length) {
lastNode.put(command, 0, command.length);
} else {
trimFront();
lastNode.put(command, 0, available);
BlockNode newNode = new BlockNode(Math.max(command.length, blockSize));
newNode.put(command, available, command.length - available);
blockList.addLast(newNode);
}
}
- Firstly we increase the
endingOffset
so when reading we know where to stop and update thespaceUsed
which will be used when trimming. - Next, check the if the last block can contain the full commands. If it does, store the entire command into the block.
- If it doesn’t, we have to add a new block. Before we can add, we must try to free up memory if needed. Then to maximize space usage, store as much as we can into the current block then store the rest in the next block.
Trimming old commands
The process to free up older data is straightforward. As long as the amount of space needed is still larger than our capacity, remove the first block and update the relevant tracking values
/**
* Remove front node if memory used exceeds capacity
*/
private void trimFront() {
while (spaceUsed > capacity) {
BlockNode blockNode = blockList.removeFirst();
spaceUsed -= blockNode.getUsed();
startingOffset += blockNode.getUsed();
}
}
Reading off the buffer
Finally we look into reading off the buffer starting from a provided offset.
/**
* Copy data from provided offset to the ending offset.
*
* @param startReadOffset the offset to start reading from provided by caller
* @return byte array of containing commands
* @throws IllegalArgumentException if offset is larger than current available or smaller than minimum available
*/
public byte[] copyFromOffset(int startReadOffset) {
if (startReadOffset < startingOffset || startReadOffset > endingOffset) {
throw new IllegalArgumentException("Offset not available");
}
int readingOffset = startingOffset;
Iterator<BlockNode> iter = blockList.iterator();
int nextWriteOffset = 0;
byte[] result = new byte[endingOffset - startReadOffset];
while (iter.hasNext()) {
BlockNode node = iter.next();
readingOffset += node.getUsed();
if (readingOffset >= startReadOffset) {
int startPos = node.getUsed() - (readingOffset - startReadOffset);
System.arraycopy(node.getArray(), startPos, result, nextWriteOffset, readingOffset - startReadOffset);
nextWriteOffset += readingOffset - startReadOffset;
break;
}
}
while (iter.hasNext()) {
BlockNode node = iter.next();
System.arraycopy(node.getArray(), 0, result, nextWriteOffset, node.getUsed());
nextWriteOffset += node.getUsed();
}
return result;
}
The steps can be summarized as follows:
- Validates the the position to start reading.
- We initiate the array for result with size based on where the read wants to start and the ending offset.
- Traverse each blocks one by one until we reach the block where we want to start reading from.
- Reads the data in that block, from our offset until the end of the block.
- Traverse the rest of the blocks and copy everything into our result
Conclusion
There a lot of areas to improvement here, such as clearer code structure, better namings, adding more validations and implementing concurrency. However, this article illustrated the general idea on how a reusable backlog can be implemented through use of a doubly linked list backed by nodes of byte array. For more details, check out the full code in this repository