When developing the server for a learning project database Keva I got the chance to learn a bit more about non-blocking I/O (NIO) and their libraries in Java.

However when searching trying to implemented, I ran into many problems with reading data coming in from the connection. Searching for examples on the Internet didn’t yield satisfactory results. Most of the examples are simple servers that read the whole messages in one go, nothing I could copy. Luckily I found this amazing article that helped me understood the basic idea behind it. That along with reading more about Netty implementations, I finally was abled to implement a working prototype by myself. You can check out the source code here (it’s only a few short files).

The basics

There are two basic parts for this problem, first the NIO part, and then the server part.

Fundamentally, NIO from the application level just means not waiting around. For example, when we call a “read” method on a socket, the results are returned immediately whether we can read it or not, the process continues to work on the next line of code instead of waiting for data. We can pass in a callback function to handle the results whenever it’s ready.

The server’s primary logic is to take in messages from clients, process them, and return the results to those clients, all via the network.

In a traditional blocking server. When we read bytes from a connection, the server will have to wait for the whole message to arrive before processing, since we can only read a limited amount of data in the buffer. To handle multiple clients, we spawn multiple threads.

For the NIO server, a thread doesn’t need to stop and wait for the whole message, so we can read what we can, then continue to do other stuff, and come back to read again when there is new data. The main problem is how to we manage bytes being read asynchronously to construct correct messages. This is the problem I struggled with and finally managed to solve (but probably not in the optimal way though).

The idea

So my idea to this problem is using the event-driven architecture. Specifically, we can have 2 thread groups, the main thread group, which is responsible for accepting connection (this can just be 1 thread), and the worker thread group, which is responsible for reading, parsing, and writing the results to the socket. The worker group is very importantly since I’m using it for executing read writes but it’s also used by Java’s NIO2 library to invoke completion handlers. The reason for having 2 different thread group is because I/O processing tasks can take a long time and use up all threads. If we use only 1 thread group then when work load is high, new users won’t be able to connect. Having a seperate small size thread pool makes it possible for as much clients to connect even when all worker threads are busy thus increasing availability.

For example purposes, this will be a TCP echo server, and messages will use the \n line ending character as delimiter between them.

So what happens when data arrives? Well it could be in any of these forms below:

  1. part\n : It could be a full message or the last part of a message.
  2. a partial mess : A partial message, we need a way to store it while waiting for the rest of the message to arrive.
  3. last part\n mess 2\n mess 3\n start new : We can expect to receive many messages or portion of them in a single socket read as well.

The flow

So the process will look like this:

Bootstraping the server

private final ExecutorService worker = Executors.newFixedThreadPool(4);
private final ExecutorService main = Executors.newFixedThreadPool(1);
group = AsynchronousChannelGroup.withThreadPool(worker);
server = AsynchronousServerSocketChannel.open(group);
final int port = 8989;
server.bind(new InetSocketAddress(port));
main.submit(() -> server.accept(null, new AcceptHandler(server, main, worker)));
System.out.println("Server started at: " + port);
System.in.read();

When client is connected:

public void completed(AsynchronousSocketChannel channel, Object attachment) {
    main.submit(() -> server.accept(null, this));

    final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
    final StringBuffer messBuf = new StringBuffer();
    final Queue<String> writeQueue = new ConcurrentLinkedQueue<>();
    worker.submit(() -> channel.read(buffer, null,
                                       new ReadHandler(worker, channel, buffer, messBuf, writeQueue)));
}

When a read finishes:

int startIdx = 0;
int endIdx;
while (frame.indexOf(DELIM, startIdx) != -1) {
    endIdx = frame.indexOf(DELIM, startIdx) + 1;
    messBuf.append(frame, startIdx, endIdx);
    writeQueue.add(messBuf.toString());
    this.messBuf = new StringBuffer();
    startIdx = endIdx;
}
messBuf.append(frame, startIdx, frame.length());
channel.read(buffer, null, this);
if (!writeQueue.isEmpty()) {
    String message = writeQueue.peek();
    if (message != null) {
        writeQueue.remove();
        ByteBuffer writeBuf = ByteBuffer.wrap(message.getBytes());
        channel.write(writeBuf, null, new WriteHandler(worker, channel, writeBuf, writeQueue));
    }
}

When a write finishes:

public void completed(Integer bytesWritten, Object attachment) {
    if (bytesWritten > 0 && writeBuf.hasRemaining()) {// write not finished, continue writing this buffer
        worker.submit(() -> channel.write(writeBuf, null, this));
    } else {
        // Continue to write from the queue
        String message = writeQueue.peek();
        if (message != null) {
            writeQueue.remove();
            ByteBuffer writeBuf = ByteBuffer.wrap(message.getBytes());
            channel.write(writeBuf, null, new WriteHandler(worker, channel, writeBuf, writeQueue));
        }
    }
}

The result

Well the implementation worked (as least it for the test suite I wrote for it):

@Test
void buf8_echo1Less8_success() throws Exception {
  final SocketClient client = startClient();
  final String abcde = client.exchange("abcde");
  client.disconnect();
  assertEquals("abcde", abcde);
}
final SocketClient client = startClient();
final List<String> abcd = client.exchange("12345678\n987654321\nabc\nd", 4);
client.disconnect();

assertEquals("12345678", abcd.get(0));
assertEquals("987654321", abcd.get(1));
assertEquals("abc", abcd.get(2));
assertEquals("d", abcd.get(3));
final ExecutorService executor = Executors.newFixedThreadPool(3);
final int taskNum = 10000;
for (int i = 0; i < taskNum; i++) {
    tasks.add(() -> {
        final SocketClient client = startClient();
        final String res = client.exchange(mess16);
        client.disconnect();
        return res;
    });
}

Maybe the way I test is kinda wrong, if you notice a mistake, I’m open to feedbacks. This is just a way to implement it, and it’s actually a very naive, slow one. I used string mainly in my code so I had to convert the buffer to string. A better approach would be to deal with the bytes directly. Also the way I implemented the writeQueue required bytes being copied from buffers to the string holders. Modern NIO servers are implemented with zero-copy techniques for dealing with the buffers, for example Netty have their own type of buffers that stores pointers to the original buffers used to read. That could be topic for more research however I’m quite satisfied with these results for now, hope this was useful to you.