0%

SocketIO深度改造之旅-基础改造篇-3

3、功能篇#

3.1 ClientAPI设计思路#

1
2
3
4
5
emitToSession
emitToUser
emitToGroup
emitToGroupExcludeSelf
broadcast

目标:友好设计

img

使用异步事件通知模型:

发送伪代码1

Client.addQueue(msg);

发送伪代码2

1
2
3
4
5
6
7
8
Client -> queue<Message>
while(true) {
msg = queue.poll()
if (msg == null) {
break;
}
channe.writeAndFlush(msg);
}

基于引用计数方式实现内存零拷贝

目标:

1
2
3
4
5
6
7
8
Iterator var5 = channels.iterator();
while(var5.hasNext()) {
Channel channel = var5.next();
byteBuf.retain();
Future f = channel.writeAndFlush(byteBuf);
f.addListener(() -> { ReferenceCountUtil.release(byteBuf);
})
}

3.2 握手反馈改造#

在H5中socketio的握手在upgrade之前进行,如果握手失败,则进入onError逻辑,无法拿到任何握手失败的具体原因,此处改造将禁用原生握手反馈:进入socketio升级后第一时间handshake,控制和设计整个握手逻辑。

原始握手连接代码:

Socket.IO v2.2.0

socketio-client.js握手连接升级伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
xhr.onreadystatechange = function () {
console.log("onreadystatechange readyState:" + xhr.readyState)
console.log("onreadystatechange getAllResponseHeaders:" + xhr.getAllResponseHeaders());
if (xhr.readyState === 2) {
try {
var contentType = xhr.getResponseHeader('Content-Type');
if (self.supportsBinary && contentType === 'application/octet-stream') {
xhr.responseType = 'arraybuffer';
}
} catch (e) {}
}
if (4 !== xhr.readyState) return;
console.log("status -> status:" + xhr.status)
if (200 === xhr.status || 1223 === xhr.status) {
self.onLoad();
} else {
// make sure the `error` event handler that's user-set
// does not throw in the same tick and gets caught here
setTimeout(function () {
//401
console.log("setTimeout status -> status:" + xhr.status)
console.log("setTimeout contentText:" + xhr.responseText)
console.log("setTimeout contentType:" + xhr.getAllResponseHeaders())

self.onError(xhr.status);
}, 0);
}
};

+handshakeResultListener

常用的handshake拦截器

+IP黑名单

+客户端版本黑名单:提出有bug的客户端

结果显示:

img

3.3 限流设计大包处理#

socketio在对接受到的信令进行广播或者组播场景转发时,非常容易产生信令风暴时间,100个人的房间互相发送消息,将产生10000次的转发。

大包在堆上的影响是巨大的,大对象和巨型对象在堆上的分配和网络抓发对系统资源是崩溃的,所以对于大对象需要再第一时间执行丢弃逻辑。

socketio自身包的保护机制

1
2
private int maxHttpContentLength = 64 * 1024;
private int maxFramePayloadLength = 64 * 1024;

仅限于handshake阶段的包大小校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void handshake(ChannelHandlerContext ctx, final UUID sessionId, String path, FullHttpRequest req) {
final Channel channel = ctx.channel();

WebSocketServerHandshakerFactory factory =
new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true, configuration.getMaxFramePayloadLength());
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
if (handshaker != null) {
ChannelFuture f = handshaker.handshake(channel, req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't handshake " + sessionId, future.cause());
return;
}

channel.pipeline().addBefore(SocketIOChannelInitializer.WEB_SOCKET_TRANSPORT, SocketIOChannelInitializer.WEB_SOCKET_AGGREGATOR,
new WebSocketFrameAggregator(configuration.getMaxFramePayloadLength()));
connectClient(channel, sessionId);
}
});
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}

业务包定制的保护机制

+SocketIOFrameTooLargeException.java

+PacketDecoder.java

1
2
3
4
5
6
7
8
9
10
11
try {
Event event = jsonSupport.readValue(packet.getNsp(), in, Event.class);
if (Objects.isNull(event) || StringUtil.isNullOrEmpty(event.getName())) {
handleNPEEvent(head, in);
}
packet.setName(event.getName());
packet.setData(event.getArgs());
} catch (SocketIOFrameTooLargeException e) {
head.getStore().set(SocketIOFrameTooLargeException.class.getName(), true);
throw new SocketIOException(e);
}

+CustomJacksonJsonSupport.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
int avaliable = src.available();
//大的信令直接丢弃
boolean bigEvent = false;
if (discardByteSize > 0) {
if (avaliable > discardByteSize) {
bigEvent = true;
if (discard) {
throw new SocketIOFrameTooLargeException(String.format(ERROR, avaliable, discardByteSize));
}
}
}
t = super.readValue(namespaceName, src, valueType);
if (bigEvent) {
log.warn("BeaconJacksonJsonSupport SocketIOFrameTooLargeException: readValue:{} > discardByteSize:{}", avaliable, discardByteSize);
}
} catch (SocketIOFrameTooLargeException e) {
throw e;
}

3.4 对于反射改造说明#

见:https://j360.me/2020/01/01/SocketIO%E9%AB%98%E6%80%A7%E8%83%BD%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E8%AE%BE%E8%AE%A1%E6%8E%A2%E7%B4%A2/

3.5 多端口改造(可选)#

目标:同时支持ws、wss(参考Nginx)

现状:默认对于SSL支持是个开关操作,无法向Nginx一样可同时兼任两者,且共享rewrite信息

3.6 SSL证书改造#

这里使用单向SSL证书,单向证书由客户端验证服务器的合法性。

SSL单向认证流程:

img

SSL选型:

  • JDKSSL:JdkSSLEngine 实现接入方便,SocketIO默认接入实现
  • OpenSSL:性能更好,通过原生方式调用SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns OpenSSL if available, otherwise returns the JDK provider.
*/
private static SslProvider findSslProvider() {
if (OpenSsl.isAvailable()) {
logger.info("Using OPENSSL provider.");
return SslProvider.OPENSSL;
} else if (checkJdkProvider()) {
logger.info("Using JDK provider.");
return SslProvider.JDK;
}
throw new IllegalStateException(
"Could not find any valid TLS provider, please check your dependency or deployment environment, " +
"usually netty-tcnative, Conscrypt, or Jetty NPN/ALPN is needed.");
}

使用JdksslEngine性能较差,OPenSSLEngine是其性能的10倍。

  • netty默认会尝试加载OpenSSLEngine,如果失败再去加载JdkSSLEngine。

​ netty-tcnative-boringssl- 使用这个,打包了openssl依赖,不用关心目标机器是否安装了openssl以及是啥版本;

  • 如果不使用boringssl的话,就需要在目标机器上安装合适的openssl版本1.0.0.0非常麻烦;

​ 之前使用boringssl之所以没有成功,是因为netty版本不对,根据mvn中央仓库,查找时间接近的版本;

  • 发现netty4.1.33和boringssl的4.0.20发布日期比较接近,版本就匹配上了。

​ svn服务器由于在上面删除了openssl,然后又编译安装了openssl,导致环境出问题,现在无法连接,教训;

  • 编译安装之前,一定要在本地测试,或者先备份系统;

​ 方向跑偏了,花费了大量时间在openssl安装及编译上面;

  • ssl证书,netty需要的是pem格式;而且需要把key文件转换为.pk8格式,转换过程自己随意设置个简单的密码即可;
1
openssl pkcs8 -in my.key -topk8 -out my.pk8

感兴趣可以翻翻Netty源码:io.netty.handler.ssl.OpenSsl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static {
Throwable cause = null;

if (SystemPropertyUtil.getBoolean("io.netty.handler.ssl.noOpenSsl", false)) {
cause = new UnsupportedOperationException(
"OpenSSL was explicit disabled with -Dio.netty.handler.ssl.noOpenSsl=true");

logger.debug(
"netty-tcnative explicit disabled; " +
OpenSslEngine.class.getSimpleName() + " will be unavailable.", cause);
} else {
// Test if netty-tcnative is in the classpath first.
try {
Class.forName("io.netty.internal.tcnative.SSL", false, OpenSsl.class.getClassLoader());
} catch (ClassNotFoundException t) {
cause = t;
logger.debug(
"netty-tcnative not in the classpath; " +
OpenSslEngine.class.getSimpleName() + " will be unavailable.");
}

// If in the classpath, try to load the native library and initialize netty-tcnative.
if (cause == null) {
try {
// The JNI library was not already loaded. Load it now.
loadTcNative();
} catch (Throwable t) {
cause = t;
logger.debug(
"Failed to load netty-tcnative; " +
OpenSslEngine.class.getSimpleName() + " will be unavailable, unless the " +
"application has already loaded the symbols by some other means. " +
"See http://netty.io/wiki/forked-tomcat-native.html for more information.", t);
}

try {
String engine = SystemPropertyUtil.get("io.netty.handler.ssl.openssl.engine", null);
if (engine == null) {
logger.debug("Initialize netty-tcnative using engine: 'default'");
} else {
logger.debug("Initialize netty-tcnative using engine: '{}'", engine);
}
initializeTcNative(engine);

// The library was initialized successfully. If loading the library failed above,
// reset the cause now since it appears that the library was loaded by some other
// means.
cause = null;
} catch (Throwable t) {
if (cause == null) {
cause = t;
}
logger.debug(
"Failed to initialize netty-tcnative; " +
OpenSslEngine.class.getSimpleName() + " will be unavailable. " +
"See http://netty.io/wiki/forked-tomcat-native.html for more information.", t);
}
}
}