阐述Dubbo服务提供方的解码原理
温馨提示:这篇文章已超过370天没有更新,请注意相关的内容是否还可用!
1 解码概述
在Dubbo服务消费方(客户端)和服务提供方(服务端)进行网络通信时,服务提供方会通过socket把需要发送的内容序列化为二进制流后发出。接着二进制流通过网络流向服务提供方。服务提供方接收到该请求后会解析该请求包,对接收到的数据进行反序列化后对请求进行处理。
(图片来源网络,侵删)
服务提供方接收到请求后解析请求包(即Dubbo协议的内容)的过程即服务提供方的解码过程。
2 解码原理解析
2.1 解码入口
服务提供方解析请求包最终是借助于 InternalDecoder 的 decode 方法来实现的。具体实现代码如下所示。
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
}
2.2 解码实现细节
在 InternalDecoder 的 decode 方法内部通过调用具体的解码实现来进行解码。
如 ExchangeCodec 的 decode 方法,其主要内容为:
(1)解析 Dubbo 协议头-header;
(2)解析 Dubbo 协议帧数据部分-body。
具体实现如下所示。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 获取dubbo协议帧的字节数
int readable = buffer.readableBytes();
// 将dubbo协议头读取到数据header
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
// 解析dubbo协议帧数据部分
return decode(channel, buffer, readable, header);
}
其中将dubbo协议头读取到header数组的实现如下所示。
@Override
public void readBytes(byte[] dst) {
readBytes(dst, 0, dst.length);
}
2.2.1 解析 Dubbo 协议帧的 header
解析 Dubbo 协议头时主要做的事情为:
(1)校验协议头的魔法值是否有效;
(2)校验 Dubbo 协议帧是否为一个完整的协议帧。
判断的方式包括:dubbo协议帧实际字节数是否小于协议头长度;dubbo协议帧实际字节数是否小于理论上的dubbo协议帧总字节数(header长度+body长度)。
具体实现如下所示。
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);
}
}
}
}
2.2.2 解析 Dubbo 协议帧的body
对dubbo协议帧的body进行解析时调用了 decodeBody 方法,其中包括了服务提供方对请求内容的解析和服务消费方对响应信息的解析。
具体实现如下所示。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
} else {
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
}
res.setResult(data);
} else {
res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
Request req;
try {
Object data;
if ((flag & FLAG_EVENT) != 0) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
req = new HeartBeatRequest(id);
((HeartBeatRequest) req).setProto(proto);
data = null;
} else {
req = new Request(id);
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
req.setEvent(true);
} else {
req = new Request(id);
data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
// bad request
req = new Request(id);
req.setBroken(true);
req.setData(t);
}
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
return req;
}
}
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!
