amqp-client源码解析2:通信过程
1. SocketFrameHandler
public class SocketFrameHandler implements FrameHandler {
private final Socket _socket;
private final ExecutorService _shutdownExecutor;
private final DataInputStream _inputStream;
private final DataOutputStream _outputStream;
public static final int SOCKET_CLOSING_TIMEOUT = 1;
public SocketFrameHandler(Socket socket) throws IOException {
this(socket, null);
}
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
}
public DataInputStream getInputStream() {
return _inputStream;
}
public void sendHeader(int major, int minor, int revision) throws IOException {
synchronized (_outputStream) {
_outputStream.write("AMQP".getBytes("US-ASCII"));
_outputStream.write(0);
_outputStream.write(major);
_outputStream.write(minor);
_outputStream.write(revision);
try {
_outputStream.flush();
} catch (SSLHandshakeException e) {
LOGGER.error("TLS connection failed: {}", e.getMessage());
throw e;
}
}
}
@Override
public void sendHeader() throws IOException {
sendHeader(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION);
if (this._socket instanceof SSLSocket) {
TlsUtils.logPeerCertificateInfo(((SSLSocket) this._socket).getSession());
}
}
@Override
public void initialize(AMQConnection connection) {
connection.startMainLoop();
}
@Override
public Frame readFrame() throws IOException {
synchronized (_inputStream) {
return Frame.readFrom(_inputStream);
}
}
@Override
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}
@Override
public void flush() throws IOException {
_outputStream.flush();
}
@Override
public void close() {
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
Callable flushCallable = new Callable() {
@Override
public Void call() throws Exception {
flush();
return null;
}
};
Future flushTask = null;
try {
if (this._shutdownExecutor == null) {
flushCallable.call();
} else {
flushTask = this._shutdownExecutor.submit(flushCallable);
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
}
} catch (Exception e) {
if (flushTask != null) {
flushTask.cancel(true);
}
}
try { _socket.close(); } catch (Exception _e) {}
}
}
2. ConnectionFactory
public class ConnectionFactory implements Cloneable {
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
throws IOException, TimeoutException {
if (this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);
if (clientProvidedName != null) {
Map properties = new HashMap<>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
if (isAutomaticRecoveryEnabled()) {
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
List addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw (TimeoutException) lastException;
}
}
throw new IOException("failed to connect");
}
}
}
3. AMQConnection
3.1. start()
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
public void start() throws IOException, TimeoutException {
initializeConsumerWorkService();
initializeHeartbeatSender();
this._running = true;
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();
_channel0.enqueueRpc(connStartBlocker);
try {
_frameHandler.setTimeout(handshakeTimeout);
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
this._frameHandler.initialize(this);
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout / 2).getMethod();
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
Version serverVersion = new Version(connStart.getVersionMajor(), connStart.getVersionMinor());
if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion, serverVersion);
}
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
int negotiatedChannelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax());
int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);
if (channelMax != negotiatedChannelMax) {
LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);
}
_channelManager = instantiateChannelManager(channelMax, threadFactory);
int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax());
this._frameMax = frameMax;
int negotiatedHeartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat());
int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);
if (heartbeat != negotiatedHeartbeat) {
LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);
}
setHeartbeat(heartbeat);
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder().virtualHost(_virtualHost).build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
}
}
3.2. startMainLoop()
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
private class MainLoop implements Runnable {
@Override
public void run() {
boolean shouldDoFinalShutdown = true;
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
readFrame(frame);
}
} catch (Throwable ex) {
if (ex instanceof InterruptedException) {
shouldDoFinalShutdown = false;
} else {
handleFailure(ex);
}
} finally {
if (shouldDoFinalShutdown) {
doFinalShutdown();
}
}
}
}
private void readFrame(Frame frame) throws IOException {
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
} else {
if (frame.channel == 0) {
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
ChannelManager cm = _channelManager;
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
} catch (UnknownChannelException e) {
LOGGER.info("Received a frame on an unknown channel, ignoring it");
return;
}
channel.handleFrame(frame);
}
}
}
}
} else {
handleSocketTimeout();
}
}
}
4. AMQChannel
4.1. rpc()
public abstract class AMQChannel extends ShutdownNotifierComponent {
protected final int _rpcTimeout;
private RpcWrapper _activeRpc = null;
public AMQCommand rpc(Method m) throws IOException, ShutdownSignalException {
return privateRpc(m);
}
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
if (_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
public void rpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
private void doEnqueueRpc(Supplier rpcWrapperSupplier) {
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}
}
4.2. handleFrame()
public abstract class AMQChannel extends ShutdownNotifierComponent {
private AMQCommand _command = new AMQCommand();
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) {
_command = new AMQCommand();
handleCompleteInboundCommand(command);
}
}
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
this._trafficListener.read(command);
if (!processAsync(command)) {
if (_checkRpcResponseType) {
synchronized (_channelMutex) {
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
return;
}
}
}
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
if (nextOutstandingRpc != null) {
nextOutstandingRpc.complete(command);
markRpcFinished();
}
}
}
public RpcWrapper nextOutstandingRpc() {
synchronized (_channelMutex) {
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
return result;
}
}
public abstract boolean processAsync(Command command) throws IOException;
}
5. ChannelN#processAsync()
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
@Override
public boolean processAsync(Command command) throws IOException {
Method method = command.getMethod();
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}
if (isOpen()) {
if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
} else if (method instanceof Basic.Return) {
callReturnListeners(command, (Basic.Return) method);
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
return true;
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
return false;
} else if (method instanceof Basic.Cancel) {
return true;
} else {
return false;
}
} else {
if (method instanceof Channel.CloseOk) {
return false;
} else {
return true;
}
}
}
}