NodeIDType
- This class is designed for NIO transport. The nice thing about
this class is that all steps are truly asynchronous. Even connect
calls. There is no blocking on connect, read, or write anywhere
and no polling in the code.
The code can be used to send a byte stream to a numbered node
(with ID->InetSocketAddress mappings specified using the
NodeConfig interface) or to directly send messages to an
InetSocketAddress. Received data is processed using the
DataProcessingWorker interface. Received data can not be
associated with a node ID as the received data just looks like a
byte stream to the receiving node. The supplied
DataProcessingWorker is expected to know what to do with the
received byte streams.
The high-level organization of this code has a selector thread
that waits for connect, accept, read, or write events on socket
channels that could result from accepting connections (server
side) or from initiating connections upon send requests by
application threads. The selector thread also writes to socket
channels and reads incoming data from socket channels and passes
them off to DataProcessingWorker. The selector thread is the only
thread to touch socket channel selection keys, e.g., to change ops
or select() on them. Other application threads may invoke send()
concurrently to send data. This data is queued in pendingWrites, a
synchronized structure between application threads and the
selector thread. A pending write is associated with an
InetSocketAddress. So, the selector thread can always re-establish
a connection to continue writing the byte stream in case the
existing connection fails. The selector thread reads from
pendingWrites and sets selection ops to wait for write-ready
events accordingly.
To enable non-blocking connects, the application threads queue
connect events in the synchronized structure pendingConnects. The
selector thread reads from pendingConnects and sets selection ops
to wait for a connect event as needed.
A map SockAddrToSockChannel keeps track of the current socket
channel being used to send data to a given InetSocketAddress. Note
that this mapping can change if connections fail and are
re-established. A failed connection can get re-established on
demand by an application thread or by the selector thread when it
tries to actually write the data to a socket channel and
encounters an exception.public class NIOTransport<NodeIDType> extends java.lang.Object implements java.lang.Runnable, HandshakeCallback
Modifier and Type | Class and Description |
---|---|
protected static class |
NIOTransport.AlternatingByteBuffer |
Modifier and Type | Field and Description |
---|---|
protected static SSLDataProcessingWorker.SSL_MODES |
DEFAULT_SSL_MODE
A flag to easily enable or disable SSL by default.
|
static int |
MAX_PAYLOAD_SIZE
Invoked only by the selector thread.
|
static int |
MAX_QUEUED_SENDS
Number of sends that can be queued because the connection was established
but the remote end crashed before the send was complete.
|
static int |
MIN_INTER_CONNECT_TIME
Milliseconds before reconnection attempts.
|
protected NodeIDType |
myID
Usually an id that corresponds to a socket address as specified in
NodeConfig.
|
protected NodeConfig<NodeIDType> |
nodeConfig |
protected DataProcessingWorker |
worker |
protected static int |
WRITE_BUFFER_SIZE |
Constructor and Description |
---|
NIOTransport(java.net.InetAddress address,
int port,
DataProcessingWorker worker,
SSLDataProcessingWorker.SSL_MODES sslMode) |
NIOTransport(java.net.InetSocketAddress isa,
NodeConfig<NodeIDType> nc,
DataProcessingWorker worker,
boolean start,
SSLDataProcessingWorker.SSL_MODES sslMode) |
NIOTransport(int port,
DataProcessingWorker worker) |
NIOTransport(NodeIDType id,
NodeConfig<NodeIDType> nc,
DataProcessingWorker worker)
The constructor to use for ID-based communication.
|
NIOTransport(NodeIDType id,
NodeConfig<NodeIDType> nc,
DataProcessingWorker worker,
boolean start,
SSLDataProcessingWorker.SSL_MODES sslMode) |
NIOTransport(NodeIDType id,
NodeConfig<NodeIDType> nc,
DataProcessingWorker worker,
SSLDataProcessingWorker.SSL_MODES sslMode) |
Modifier and Type | Method and Description |
---|---|
protected static void |
cleanup(java.nio.channels.SelectionKey key) |
static boolean |
getCompression() |
static int |
getCompressionThreshold() |
protected java.net.InetAddress |
getListeningAddress() |
protected int |
getListeningPort() |
java.net.InetSocketAddress |
getListeningSocketAddress() |
static java.util.logging.Logger |
getLogger() |
protected java.net.InetAddress |
getNodeAddress() |
protected int |
getNodePort() |
protected static int |
getPayloadLength(java.nio.ByteBuffer buf) |
protected int |
getPendingSize() |
SSLDataProcessingWorker.SSL_MODES |
getSSLMode() |
void |
handshakeComplete(java.nio.channels.SelectionKey key) |
boolean |
isDisconnected(NodeIDType node) |
protected boolean |
isStarted() |
boolean |
isStopped() |
static void |
main(java.lang.String[] args) |
protected static boolean |
outOfRange(int length) |
void |
run() |
int |
send(java.net.InetSocketAddress isa,
byte[] data) |
int |
send(java.net.InetSocketAddress isa,
byte[] data,
int batchSize)
For performance testing.
|
int |
send(NodeIDType id,
byte[] data)
send() methods are called by external application threads.
|
static void |
setCompression(boolean b) |
static void |
setCompressionThreshold(int t) |
void |
setMaxQueuedSends(int maxQ) |
void |
setMinInterConnectTime(int minInterConnectTime) |
NIOTransport<NodeIDType> |
setName(java.lang.String name) |
static void |
setUseSenderTask(boolean b) |
void |
stop()
To close NIOTransport instances gracefully.
|
java.lang.String |
toString() |
public static final int MAX_QUEUED_SENDS
public static final int MIN_INTER_CONNECT_TIME
protected static final int WRITE_BUFFER_SIZE
protected final NodeIDType myID
null
be which means
wildcard address or it can be a InetSocket address in the case of Local
Name Serversprotected final DataProcessingWorker worker
protected final NodeConfig<NodeIDType> nodeConfig
protected static final SSLDataProcessingWorker.SSL_MODES DEFAULT_SSL_MODE
public static final int MAX_PAYLOAD_SIZE
public NIOTransport(NodeIDType id, NodeConfig<NodeIDType> nc, DataProcessingWorker worker) throws java.io.IOException
id
- nc
- worker
- java.io.IOException
public NIOTransport(NodeIDType id, NodeConfig<NodeIDType> nc, DataProcessingWorker worker, boolean start, SSLDataProcessingWorker.SSL_MODES sslMode) throws java.io.IOException
id
- nc
- worker
- start
- sslMode
- java.io.IOException
public NIOTransport(java.net.InetSocketAddress isa, NodeConfig<NodeIDType> nc, DataProcessingWorker worker, boolean start, SSLDataProcessingWorker.SSL_MODES sslMode) throws java.io.IOException
isa
- nc
- worker
- start
- sslMode
- java.io.IOException
public NIOTransport(NodeIDType id, NodeConfig<NodeIDType> nc, DataProcessingWorker worker, SSLDataProcessingWorker.SSL_MODES sslMode) throws java.io.IOException
id
- nc
- worker
- sslMode
- java.io.IOException
public NIOTransport(int port, DataProcessingWorker worker) throws java.io.IOException
port
- worker
- java.io.IOException
public NIOTransport(java.net.InetAddress address, int port, DataProcessingWorker worker, SSLDataProcessingWorker.SSL_MODES sslMode) throws java.io.IOException
address
- port
- worker
- sslMode
- java.io.IOException
public void setMaxQueuedSends(int maxQ)
maxQ
- Refer MAX_QUEUED_SENDS
.public void setMinInterConnectTime(int minInterConnectTime)
minInterConnectTime
- Refer MIN_INTER_CONNECT_TIME
.public static final java.util.logging.Logger getLogger()
public NIOTransport<NodeIDType> setName(java.lang.String name)
name
- this
public int send(NodeIDType id, byte[] data) throws java.io.IOException
id
- data
- java.io.IOException
public int send(java.net.InetSocketAddress isa, byte[] data) throws java.io.IOException
isa
- data
- java.io.IOException
public int send(java.net.InetSocketAddress isa, byte[] data, int batchSize) throws java.io.IOException
data
batchSize
number of
times in order to simulate the performance of batched sends.isa
- data
- batchSize
- java.io.IOException
protected static final int getPayloadLength(java.nio.ByteBuffer buf) throws java.io.IOException
java.io.IOException
public void run()
run
in interface java.lang.Runnable
protected boolean isStarted()
public void stop()
public boolean isStopped()
protected java.net.InetAddress getNodeAddress()
protected int getNodePort()
public boolean isDisconnected(NodeIDType node)
node
- node
got disconnected.public static final void setCompression(boolean b)
b
- public static final boolean getCompression()
public static final void setCompressionThreshold(int t)
t
- public static final int getCompressionThreshold()
public static final void setUseSenderTask(boolean b)
b
- public SSLDataProcessingWorker.SSL_MODES getSSLMode()
protected static final void cleanup(java.nio.channels.SelectionKey key)
public java.net.InetSocketAddress getListeningSocketAddress()
protected int getListeningPort()
protected java.net.InetAddress getListeningAddress()
protected static final boolean outOfRange(int length)
public java.lang.String toString()
toString
in class java.lang.Object
public void handshakeComplete(java.nio.channels.SelectionKey key)
handshakeComplete
in interface HandshakeCallback
protected int getPendingSize()
public static void main(java.lang.String[] args)
args
-