This is part of Early Gnutella

Using NIO in the LimeWire Core

Christopher Rohrs

November 15, 2002

Introduction

Currently LimeWire uses two threads per connection, one for reading and one for writing. These threads consume memory (from thread stacks) and CPU (from context switching). Hence LimeWire ultrapeers cannot scale to hundreds of leaf nodes.

Java 1.4 provides a solution to this problem: NIO ("new IO"), which adds non-blocking and multiplexed IO. NIO uses two important abstractions: Buffer and Channel. Buffer is simply a chunk of data (not to be confused with com.limegroup.gnutella.util.Buffer); subclasses like ByteBuffer provide additional functionality. Channel is a bidirectional stream, similar to InputStream and OutputStream. NIO programs read data from a Channel to a Buffer, and write data from a Buffer to a Channel. Channels can be non-blocking, which means, for example, that no IO operation will block waiting for data to be read or written to the network. NIO also introduces Selector, an important class that allows a single thread to service multiple Channel's.

Unfortunately non-blocking IO substantially increases the complexity of networking code. State that is implicitly stored on the call stack (local variables, stack pointer, etc.) of blocking code must be explicitly stored during non-blocking IO. For example, when using non-blocking IO, the Connection.receive() method may only read half a message. This half must be buffered in Connection so subsequent calls to receive() can finish the receive.

A prototype version of LimeWire using non-blocking IO is available from the nio-branch branch of the CVS repository. This branch was split off from the mainline around version 2.5.3. The branch point is tagged nio-branch-point. Our goal was to make the minimal changes to increase the number of leaves per ultrapeer. For this reason we only use NIO to read and write messages. We continue to use blocking IO for connection handshaking, uploads, and downloads.

The remainder of this document describes our implementation. It assumes knowledge of LimeWire's basic LimeWire_design_document design. First we describe changes to implement message reading, writing, and routing in a non-blocking manner. Then we describe changes to drive message handling on systems with and without NIO. Finally we describe future work, including non-blocking implement connection fetching and handshaking.

Connection Changes

The class diagram below shows the new design of Connection/ManagedConnection. Each Connection now has a reference to a ConnectionListener callback in order to deliver connection events. As we will see in the next section, ConnectionListener provides the critical link between each connection and the rest of the backend. Each Connection also has a MessageReader and MessageWriter object to keep track of state:

Image:NIOSupportImage002.png

Reading Messages

To read a message from a Connection c, one repeatedly calls c.read().When a message has been completely read, it is passed to c's ConnectionListener callback cb via cb.read(..).To use an analogy, calling read() is like turning a crank on a pump; after a while, a message pops out via the callback.It may take several calls to read() to trigger one callback. Likewise, a single call to read() is allowed to trigger multiple callbacks, though the current implementation will never do this. Here is the complete specification of the read() method:

/**
* Reads as much data from the channel as possible. For any messages m
* received (zero or many), calls listener.read(this, m).Calls
* listener.read(this, error) for any non-fatal errors, and
* listener.error(c) for any fatal errors, like the connection being closed.
*/
public void read();

The Connection.read() method is implemented by delegating to MessageReader.If only part of a message is available from the network, MessageReader buffers the part until read() is called again. MessageReader has two states: reading the header and reading the payload. Message is basically unchanged, though the message creation logic has been moved from Message.read(..) to MessageReader.

Writing Messages

To write a message m to a Connection c, one calls c.write(m).This attempts to send as much of the message as possible. If the entire message could not be sent, one calls c.write() repeatedly until the message goes through. To use the same analogy, calling the write(..) methods is like turning the crank on a pump.Here is the complete specification of the write methods:

/**
* Returns true if this has queued data.  For Connection, hasQueued()
* implies that this is unable to accept more messages; subclasses with
* additional buffering may change that.
*/

public boolean hasQueued();



/**
* Attempts to send m, or as much as possible.  First attempts to add m to
* this' send queue, possibly discarding other queued messages (or m) for
* which sending has not yet started.  Then attempts to send as much data to
* the network as possible without blocking.  Calls
* listener.needsWrite(this) if not all data was sent.  Calls
* listener.error(this) if connection closed.
*
* This method is called from deep within the bowels of the message handling
* code.  That's why it doesn't block and generates needWrite events through
* a callback in addition to a return value. 
*
*
@return true iff this still has unsent queued data.  If true, the caller
*  must subsequently call write() again
*/

public boolean write(Message m);



/**
* Sends as much queued data as possible, if any.  Calls
* listener.error(this) if connection closed.  Typically does NOT call
* listener.needsWrite(this), though subclasses may change this behavior.
<p>
*
* This method is called within the Selector code, which typically
* needs to know whether to register the write operation.  That's why this
* returns a value instead of using the callback.
*
*
@return true iff this still has unsent queued data.  If true, the caller
*  must subsequently call write() again
*/

public boolean write();

The “queue” mentioned above is essentially a single message stored in MessageWriter.If a message cannot be sent completely in a call to write(m), e.g. if the network is congested, MessageWriter buffers the remaining bytes inside a ByteBuffer.ManagedConnection extends Connection to provide additional buffering of sent messages, just like on the mainline.As usual, this queue is prioritized to implement the SACHRIFC flow control algorithm.

Connection Driving

The ConnectionListener interface links connections with the rest of the LimeWire backend.Each LimeWire client uses a single ConnectionListener instance to decide when to call the read(..) and write(..) methods. There are two key subclasses of ConnectionListener, both derived from ConnectionDriver: BlockingConnectionDriver and NonBlockingConnectionDriver. The newConnectionDriver() factory method of ConnectionDriver returns an appropriate subclass instance depending on the system’s support for NIO. ConnectionDriver and ConnectionManager reference each other. This is shown in the class diagram below:

Image:NIOSupportImage004.png

The read(c, m) of ConnectionDriver delivers a message m from a connection c to the rest of the backend via the MessageRouter instance. (Actually it takes a detour through ManagedConnection first, but only for historical reasons.) Likewise, the error(c) method of ConnectionDriver removes the connection c via ConnectionManager. Users without Java 1.4—this includes all Macintosh users—cannot use NIO. On these systems, ConnectionManager uses a single BlockingConnectionDriver instance.BlockingConnectionDriver instantiates two threads per connection: one for reading and one for writing.These threads simply loop forever, calling read and write, which will block in the underlying socket calls. The write thread is analogous to the inner thread of ManagedConnection on the main source code. On systems with NIO support, ConnectionManager uses a single NonBlockingConnectionDriver instance.The NonBlockingConnectionDriver instance creates a single thread to read and write from all connections. This thread repeatedly calls Selector.select to switch between connections, as shown below:

while (true) {
   
register read events with selector for any new connections;
    selector.select
();
   
for each readable connection c
        c.read
();     //may trigger message routing and non-blocking writes
   
register any connections needing write;
   
for each connection c needing a write
        c.write
();    //rarely used
}

Note that the calls to Connection.read() can trigger message routing events and result in non-blocking calls to Connection.write(m). The calls to Connection.write() are only need if a previous call to write(m) could not successfully write the complete message m.

Future Work

The code on nio-branch of CVS is mostly stable. A few odds and ends still need to be done:

  • Re-enable outgoing message throttling in ManagedConnection. This is non-trivial, because the implementation of ThrottledOutputStream inherently relied on blocking IO behavior.
  • Update tests to make sure they work. This will require using the new Connection/ConnectionListener interface.The SimpleConnection class of tests/com/limegroup/gnutella/connection may help.
  • Fix odd TODO items in the code.

Our implementation no longer implements old-fashioned “reject” connections. These are difficult to implement with the NIO design. If necessary, they could be implemented with a State pattern, but I believe they are obsolete now that X-Try/X-Try-Ultrapeer headers are widely used. A few non-trivial issues remain. These are discussed below. The first two, backwards Java compatibility and random Java crashes must be solved for release. The remaining issue, adding non-blocking handshaking is simply an optimization.

Older Versions of Java

While the code on nio-branch works with blocking IO, it still requires Java 1.4.1.This creates a problem for Macintosh users and users with Java 1.3. This is not a fatal problem, however; we simply wrap all NIO classes with our own interfaces. The key idea is to replace every reference to ByteBuffer with our own abstract class, LByteBuffer, for "light ByteBuffer" or "LimeWire ByteBuffer". (Alternatively, our classes could have the same names but different package names, i.e. com.limegroup.gnutella.nio.ByteBuffer, much like the com.sun.java.util.collections classes.) LByteBuffer provides a similar interface to ByteBuffer. There are two implementations of LByteBuffer: LByteBufferAdaptor simply delegates to a real ByteBuffer, while LByteBufferImpl reimplements basic ByteBuffer functionality. A factory method in LByteBuffer allocates LByteBufferAdaptor if Java 1.4 is available, or LByteBufferImpl otherwise:

/** Provides a ByteBuffer-like interface. */
public abstract class LByteBuffer {
   
public static LByteBuffer allocate(int bytes) {
       
if (java14)
           
return new LByteBufferAdaptor(ByteBuffer.allocate(bytes));
       
else
            return new
LByteBufferImpl(bytes);
   
}

   
public abstract byte get(int i);
   
public abstract boolean hasRemaining();
    ...
}

/** For Java 1.4, delegates to a real ByteBuffer */
class LByteBufferAdaptor extends LByteBuffer {
   
ByteBuffer delegate;
   
public LByteBufferAdaptor(ByteBuffer buf) {
       
this.delegate=buf;
   
}
   
public byte get(int i) {
       
return delegate.get(i);
   
}
   
...
}

/** For earlier versions of Java, implement ourselves. */
class LByteBufferImpl extends LByteBuffer {
   
byte[] bytes;
   
public byte get(int i) {
       
return byte[i];
   
}
   
...
}

Likewise, we need to replace all references to SocketChannel with LSocketChannel. One subclass, LSocketChannelImpl, simply uses streams for reading and writing and does not support non-blocking or multiplexed IO.Another subclass, LSocketChannelAdaptor, accesses LByteBufferAdaptor's ByteBuffer directly:

/** Provides a SocketChannel-like interface */
public abstract class LSocketChannel {
   
public static LSocketChannel getLChannel(Socket socket) {
       
if (java14)
           
return new LSocketChannelAdaptor(socket.getChannel);
       
else
            return new
LSocketChannelImpl(socket);
   
}

   
public abstract int read(LByteBuffer dst) throws IOException;
   
public abstract int write(LByteBuffer src) throws IOException;
   
public abstract void configureBlocking(boolean block)
       
throws UnsupportedOperationException;
   
public abstract void register(Selector selector, int ops, Object attachment)
       
throws UnsupportedOperationException;
    ...
}

/** For Java 1.4, delegates to real SocketChannel and ByteBuffer. */
class LSocketChannelAdaptor() extends LSocketChannel {
   
private SocketChannel delegate;
   
public LSocketChannelAdaptor(SocketChannel channel) {
       
this.delegate=delegate;
   
}
   
public configureBlocking(boolean block) {
       
delegate.configureBlocking(block);
   
}
   
public void register(Selector selector, int ops, Object attachment)
       
delegate.register(selector, ops, attachment);
   
}
   
public int read(LByteBuffer dst) throws IOException {
       
delegate.read(((LByteBufferAdaptor)dst).delegate);
   
}
   
...
}

/** For earlier versions of Java, implement with blocking streams. */
class LSocketChannelImpl extends LSocketChannel {
   
InputStream in;
    OutputStream out;
   
public LSocketChannelImpl(Socket s) {
       
in=s.getInputStream();
        out=s.getOutputStream
();
   
}
   
public configureBlocking(boolean block) {
       
throw new UnsupportedOperationException();
   
}
   
public void register(Selector selector, int ops, Object attachment)
       
throw new UnsupportedOperationException();
   
}
   
...
}

The code above still has one serious problem: the LSocketChannel class statically uses LSocketChannelAdaptor, which in turn uses NIO classes (SocketChannel and ByteBuffer).This can cause NoClassDefFoundError’s on older systems, since Java is allowed to aggressively load any statically referred classes. (See Chapter 12.1.2 of the first edition of The Java Language Specification.I’ve actually seen this behavior on Linux.)The solution? The factory methods of LSocketChannel and LByteBuffer should use reflection when instantiating LSocketChannelAdaptor and LByteBuffer classes. If many NIO adaptors are needed, it could be wise to create a factory class instead of using factory methods. I do not think this is currently needed.

4.2 Random Java Exceptions

The Java NIO library appears to be somewhat flaky. Java 1.4.0 is extremely problematic and should never be used; OP_WRITE selector events, for example, are problematic on Windows.Even Java 1.4.1 still has some problems.We've reported three bugs with socket closing, for example: 4724030, 4726957, and 4720952. There appears to be a serious bug in Java 1.4.1-beta on Windows that can result in NullPointerExceptions when selecting between channels. (I have not yet tried the 1.4.1 FCS release.) This happens randomly and it results in stack traces as follows:

      java.lang.NullPointerException
         at sun.nio.ch.WindowsSelectorImpl$SubSelector.processFdSet(WindowsSelectorImpl.java:303)
         at sun.nio.ch.WindowsSelectorImpl$SubSelector.processSelectedKeys(
                                                                 WindowsSelectorImpl.java:281)
         at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$2600(WindowsSelectorImpl.java:245)
         at sun.nio.ch.WindowsSelectorImpl.updateSelectedKeys(WindowsSelectorImpl.java:406)
         at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:142)
         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:62)
         at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:185)
         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
         at java.io.InputStream.read(InputStream.java:88)
         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:64)
         at com.limegroup.gnutella.ByteReader.readLine(ByteReader.java:125)
         at com.limegroup.gnutella.Connection.readLine(Connection.java:663)
         at com.limegroup.gnutella.Connection.readHeaders(Connection.java:606)
         at com.limegroup.gnutella.Connection.readHeaders(Connection.java:589)
         at com.limegroup.gnutella.Connection.concludeOutgoingHandshake(Connection.java:373)
         at com.limegroup.gnutella.Connection.initializeOutgoing(Connection.java:342)
         at com.limegroup.gnutella.Connection.initializeWithoutRetry(Connection.java:303)
         at com.limegroup.gnutella.Connection.initialize(Connection.java:224)
         at com.limegroup.gnutella.ManagedConnection.initialize(ManagedConnection.java:349)

Here is another similar stack trace that is apparently caused by the same bug:

    java.lang.NullPointerException
        at sun.nio.ch.WindowsSelectorImpl$SubSelector.processFdSet(
                                              WindowsSelectorImpl.java:303)
        at sun.nio.ch.WindowsSelectorImpl$SubSelector.processSelectedKeys(
                                              WindowsSelectorImpl.java:281)
        at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$2600(
                                              WindowsSelectorImpl.java:245)
        at sun.nio.ch.WindowsSelectorImpl.updateSelectedKeys(
                                              WindowsSelectorImpl.java:406)
        at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:142)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:62)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:67)
        at com.limegroup.gnutella.connection.ConnectionDriver.loopForMessages(
                                               ConnectionDriver.java:121)

It is unclear whether these bugs are showstoppers. Perhaps we can simply catch NullPointerException around all NIO calls.

4.3. Handshaking

With the changes implemented above, LimeWire ultrapeers should support considerably more leaves. However, additional performance is possible by using non-blocking IO for connection fetching and handshaking.This is non-trivial. It requires an outgoing (incoming) Connection to keep track of five states:

  • STATE1_CONNECTING: establishing socket
  • STATE2_HANDSHAKING_CONNECT: writing (reading) “GNUTELLA CONNECT/0.6” and headers
  • STATE3_HANDSHAKING_OK1: reading (writing) "GNUTELLA/0.6 200 OK" and headers
  • STATE4_HANDSHAKING_OK2: writing (reading) "GNUTELLA/0.6 200 OK" and headers
  • STATE5_INITIALIZED: reading and writing messages

Here is partial code to implement this for the outgoing case:

class Connection {
   
private int state=STATE1_CONNECTING;
   
private Buffer handshakeBuf;

    
/** Advances the handshake process.  Non-blocking. 
      * 
@return true iff initialized. */
    
private boolean initializeOutgoing() throws IOException {
        
if (state==STATE1_SOCKET_CONNECTING) {
            
//Try to establish socket
            
if (channel.finishConnect()) {
                
state=STATE2_HANDSHAKE_CONNECT
                 handshakeBuf=ByteBuffer.allocate
(MAX_HEADER_SIZE)
                
handshakeBuf.add("GNUTELLA CONNECT/0.6\r\n");
                 handshakeBuf.add
(outgoingHeaders());
            
}
         }                               
        
if (state==STATE2_HANDSHAKE_CONNECT) {
            
//Try to write "GNUTELLA CONNECT/0.6"
            
channel.write(handshakeBuf);
            
if (!handshakeBuf.hasRemaining()) {
                
state=STATE3_HANDSHAKE_OK1;
                 handshakeBuf.clear
();
            
}
         }
        
if (state==STATE3_HANDSHAKE_OK1) {
            
//Try to read "GNUTELLA OK" and headers
            
channel.read(handshakeBuf);
            
if (parseHeaders(handshakeBuf)) {  //returns true if done
                
state=STATE4_HANDSHAKE_OK2;
                 handshakeBuf.clear
();
                 handshakeBuf.add
("GNUTELLA/0.6 200 OK\r\n");
                 handshakeBuf.add
(responseHeaders());
            
}
         }
        
if (state==STATE4_HANDSHAKE_OK2) {
            
channel.write(handshakeBuf);
            
if (!handshakeBuf.hasRemaining()) {
                
state=STATE5_INITIALIZED;
                 handshakeBuf=
null;
                
return true;
            
}
         }
        
return false;
    
}

    
...
}

To drive the handshaking process, we need to call initializedOutgoing()/initializeIncoming() from the read() and write() methods of Connection:

public void read() {
   
if (state!=STATE5_INITIALIZED) {
       
boolean done=outgoing
                   ? initializeOutgoing
()
                  
: initializeIncoming();
       
if (done)
           
_listener.initializedConnection(this)
    }
else {
       
//Read message as before
       
...
   
}                          
}

public void write() {
   
if (state!=STATE5_INITIALIZED) {
       
boolean done=outgoing
                   ? initializeOutgoing
()
                  
: initializeIncoming();
       
if (done)
           
_listener.initialized(this)
    }
else {
       
//Write message as before
       
...
   
}                          
}

This code is obviously quite complex. The State pattern may be a good way of simplifying this implementation. This would make it easier to implement the 0.4 protocol. It may also help implement Anurag's authentication changes, which require additional states. With these primitives, we can now add support for non-blocking connection fetching. The idea is to add ManagedConnection's to the Selector from Section 3 as soon as the connection is instantiated. The connection is only added to the list of initialized connections when it is initialized.

Conclusion

Non-blocking IO is the only way to significantly reduce the number of threads needed by LimeWire ultrapeers. It is possible to add non-blocking IO to the LimeWire core while still supporting blocking IO on older platforms. Unfortunately this is not trivial. It requires changes to a large number of core classes, including some of the trickiest ones (e.g., ManagedConnection). It also makes several classes (e.g., Connection) noticeably harder to use. It is still unclear whether NIO will yield enough return on investment. Perhaps it makes sense to study ultrapeers more closely before making these changes. If studies show that ultrapeers are spread too thin, it may be wise to make this change.