Node.js 之 RPC 调用
什么是 RPC 调用
RPC 全称为 Remote Procedure Call(远程过程调用)。
- 是指两个计算机之间的网络通信。
- 客户端与服务端需约定一个数据格式。
- 基于 TCP 或 UDP 协议。
TCP 通信方式
单工通信
Client 端和 Server 端,只能单方向传输数据,不能双向传输。
Client --> Server
或者Client <-- Server
半双工通信
Client 端和 Server 端,可以双向交替传输数据。在同一时间段内,只能由其中一端向另外一端传输数据。
全双工通信
Client 端和 Server 端,可以双向同时传输数据。在同一时间段内,可双向交叉传输数据。
Node.js Buffer 模块
使用 Node.js Buffer 模块,编码/解码二进制数据包。
官方文档:Buffer | Node.js v14.20.0 Documentation
知识扩展:
Node.js Net 模块
使用 Node.js Net 模块,搭建多路复用的 RPC 通道。
示例:单工通信
由客户端向服务端传送数据。
客户端 Client
const net = require('net');
const socket = new net.Socket({});
socket.connect({
host: '127.0.0.1',
port: 5000,
});
socket.write('Hello World!');
服务端 Server
const net = require('net');
const server = net.createServer((socket) => {
// 监听 data 事件
socket.on('data', (data) => {
// 参数 data 为 Buffer 类型
console.log(data, data.toString());
});
});
server.listen(5000);
示例:半双工通信
客户端 Client
const net = require('net');
const socket = new net.Socket({});
// 建立连接
socket.connect({
host: '127.0.0.1',
port: 5000,
});
// id 集合
const ids = [
10001,
10002,
10003,
10004,
10005,
10006,
10007,
10008,
10009,
10010,
];
// id 随机索引
let index = Math.floor(Math.random() * ids.length);
// 传送第一条数据
socket.write(encode(index));
// 监听从服务端返回的数据
socket.on('data', (buffer) => {
console.log(buffer.toString());
// 接收到数据之后,按照半双工通信逻辑,开始下一次请求。
index = Math.floor(Math.random() * ids.length);
socket.write(encode(index));
});
// 编码请求包
function encode(index) {
buffer = Buffer.alloc(4);
buffer.writeInt32BE(
ids[index]
);
return buffer;
}
服务端 Server
const net = require('net');
const dataList = {
10001: '第 1 条数据',
10002: '第 2 条数据',
10003: '第 3 条数据',
10004: '第 4 条数据',
10005: '第 5 条数据',
10006: '第 6 条数据',
10007: '第 7 条数据',
10008: '第 8 条数据',
10009: '第 9 条数据',
10010: '第 10 条数据',
};
const server = net.createServer((socket) => {
socket.on('data', (buffer) => {
const id = buffer.readInt32BE();
// 伪造延迟 100 毫秒
setTimeout(() => {
socket.write(
Buffer.from(dataList[id])
);
}, 100);
})
});
server.listen(5000);
示例:全双工通信
客户端 Client
const net = require('net');
const socket = new net.Socket({});
socket.connect({
host: '127.0.0.1',
port: 4000
});
const LESSON_IDS = [
10001,
10002,
10003,
10004,
10005,
10006,
10007,
10008,
10009,
10010,
]
let id = Math.floor(Math.random() * LESSON_IDS.length);
let oldBuffer = null;
socket.on('data', (buffer) => {
// 把上一次data事件使用残余的buffer接上来
if (oldBuffer) {
buffer = Buffer.concat([oldBuffer, buffer]);
}
let completeLength = 0;
// 只要还存在可以解成完整包的包长
while (completeLength = checkComplete(buffer)) {
const package = buffer.slice(0, completeLength);
buffer = buffer.slice(completeLength);
// 把这个包解成数据和seq
const result = decode(package);
console.log(`包${result.seq},返回值是${result.data}`);
}
// 把残余的buffer记下来
oldBuffer = buffer;
})
let seq = 0;
/**
* 二进制包编码函数
* 在一段rpc调用里,客户端需要经常编码rpc调用时,业务数据的请求包
*/
function encode(data) {
// 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
// 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送
const body = Buffer.alloc(4);
body.writeInt32BE(LESSON_IDS[data.id]);
// 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
// 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
const header = Buffer.alloc(6);
header.writeInt16BE(seq)
header.writeInt32BE(body.length, 2);
// 包头和包体拼起来发送
const buffer = Buffer.concat([header, body])
console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`);
seq++;
return buffer;
}
/**
* 二进制包解码函数
* 在一段rpc调用里,客户端需要经常解码rpc调用时,业务数据的返回包
*/
function decode(buffer) {
const header = buffer.slice(0, 6);
const seq = header.readInt16BE();
const body = buffer.slice(6)
return {
seq,
data: body.toString()
}
}
/**
* 检查一段buffer是不是一个完整的数据包。
* 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长
* 如果是,则返回这个包长,意味着这个请求包是完整的。
* 如果不是,则返回0,意味着包还没接收完
* @param {} buffer
*/
function checkComplete(buffer) {
if (buffer.length < 6) {
return 0;
}
const bodyLength = buffer.readInt32BE(2);
return 6 + bodyLength
}
for (let k = 0; k < 100; k++) {
id = Math.floor(Math.random() * LESSON_IDS.length);
socket.write(encode({ id }));
}
服务端 Server
const net = require('net');
const server = net.createServer((socket) => {
let oldBuffer = null;
socket.on('data', function (buffer) {
// 把上一次data事件使用残余的buffer接上来
if (oldBuffer) {
buffer = Buffer.concat([oldBuffer, buffer]);
}
let packageLength = 0;
// 只要还存在可以解成完整包的包长
while (packageLength = checkComplete(buffer)) {
const package = buffer.slice(0, packageLength);
buffer = buffer.slice(packageLength);
// 把这个包解成数据和seq
const result = decode(package);
// 计算得到要返回的结果,并write返回
socket.write(
encode(LESSON_DATA[result.data], result.seq)
);
}
// 把残余的buffer记下来
oldBuffer = buffer;
})
});
server.listen(4000);
/**
* 二进制包编码函数
* 在一段rpc调用里,服务端需要经常编码rpc调用时,业务数据的返回包
*/
function encode(data, seq) {
// 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
// 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回
const body = Buffer.from(data)
// 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
// 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
const header = Buffer.alloc(6);
header.writeInt16BE(seq)
header.writeInt32BE(body.length, 2);
const buffer = Buffer.concat([header, body])
return buffer;
}
/**
* 二进制包解码函数
* 在一段rpc调用里,服务端需要经常解码rpc调用时,业务数据的请求包
*/
function decode(buffer) {
const header = buffer.slice(0, 6);
const seq = header.readInt16BE();
// 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包
// 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可
const body = buffer.slice(6).readInt32BE()
// 这里把seq和数据返回出去
return {
seq,
data: body
}
}
/**
* 检查一段buffer是不是一个完整的数据包。
* 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长
* 如果是,则返回这个包长,意味着这个请求包是完整的。
* 如果不是,则返回0,意味着包还没接收完
* @param {} buffer
*/
function checkComplete(buffer) {
if (buffer.length < 6) {
return 0;
}
const bodyLength = buffer.readInt32BE(2);
return 6 + bodyLength
}
// 假数据
const LESSON_DATA = {
10001: '第 1 条数据',
10002: '第 2 条数据',
10003: '第 3 条数据',
10004: '第 4 条数据',
10005: '第 5 条数据',
10006: '第 6 条数据',
10007: '第 7 条数据',
10008: '第 8 条数据',
10009: '第 9 条数据',
10010: '第 10 条数据',
}
(完)