在之前的文章中,我详细的介绍了站点的实时人数是如何实现的?,在方案中,我留下了一个悬念,如何使用 SharedWorker 在页面之间共享 Socket 实例,实现跨页面单例 WebSocket。
探索这个问题的背景是无意间在知乎看到了 WebSocket 的一个问题,其中有回答提到 WebSocket 连接过于占用服务器资源。当页面重复打开很多个时,每个页面都会建立 WebSocket 连接,确实无意间的增大了服务器压力。虽说这个问题我之前也有考虑过,但是由于个人站点并不会出现大量的用户同时在线,所以并没有深入研究。我们使用的是 Socket.IO 的方案建立 WebSocket 连接,Socket.IO 本身就是对 WebSocket 的封装,所以从开销上来说,Socket.IO 会比原生 WebSocket 多一些开销。关于 Socket.IO 连接对服务器侧的内存开销,文档中有所提及:Memory Usage | Socket.IO。
虽然这点性能优化不痛不痒也没有任何提升,但是既然有这个问题,那我们这次就试着去解决这个问题。
我们要解决的问题很明确,就是当一个浏览器打开两个或以上的我站的页面时(多个 Tab),复用同一个 Socket 实例。
通过搜索关键字,我们了解到一个 API,SharedWorker,SharedWorker 是一个在多个浏览上下文(例如多个窗口、标签或 iframe)之间共享的 Worker。SharedWorker 有一个全局作用域,可以在多个浏览上下文中使用,这样就可以实现我们的目标。
我们只需要把原本在 Socket 实例的代码放到 SharedWorker 中,然后在页面中与 Worker 中的 Socket 实例通信即可。由于之前的 SocketClient 对 Socket 进行了抽象,在此次重构中并不需要修改太多代码,而是只需要实现通信层即可。
先来看看 SharedWorker 是如何使用的。
上面的例子中,我们创建了一个 SharedWorker,然后在页面中与 Worker 通信。在 Worker 中,我们监听了 onconnect
事件,当 Worker 连接时,我们监听了 port 的 message 事件,然后在页面中,我们监听了 port 的 message 事件,worker 中的消息在页面中进行进一步处理。
SharedWorker 同 Worker 一样,消息通过 MessageChannel 进行传递。而 worker.port
本质就是对 MessageChannel 的封装。
现在我们运行这个基本的例子,看看效果。
https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs
当我们同时打开两个 Tab 时,我们可以看到两个 Tab 都会收到 Worker 发送的消息。
通过 chrome://inspect/#workers
我们可以看到 Worker 的信息。当打开多个 Tab 时,SharedWorker 会在多个 Tab 之间共享;当多个页面都被关闭时,Worker 会被销毁。
上面的例子中,我们把 worker 的实现放在了 public 目录下,这样的方式并不适用于工程化的项目中,因为在工程化项目中,我们不应该直接引用 public 目录下的文件,而是应该通过 import 的方式引入。第二,我们或许需要在 Worker 实现中引用外部库,或者使用 TypeScript 编写 Worker 实现。而对于这些需求,直接裸写 js 的方式是无法实现的。
根据 Sukka 大佬指出,Next.js 可以直接用 URL 导入 Worker。
所以我们可以这样去导入一个 Worker。
现在我们在 Worker 中实现 Socket 的逻辑。
大概梳理一下流程:
emit
方法。具体的代码实现如下:
Note下面的例子中,我们使用了 TypeScript 去编写 Worker,并且可以直接在 Worker 中使用 import 引用外部库,为了更好的支持 Worker Scope 的类型,所以在顶部我们使用
/// <reference lib="webworker" />
增加相关的类型支持。
Worker 写完了,那么现在就要写主线程和 Worker 通信的代码了。
先看看流程:
具体的代码实现如下:
Important下面的例子中,我们在 constructor 中异步加载去初始化 SharedWorker 是为了避免在 Server Side 不存在 Worker 的情况下报错。
那么就大功告成了,SocketWorker
基本还是从原有的 SocketClient
中抽象出来的,基本实现了相同的方法,所以在业务中使用没有太大的变化,迁移过程也非常平滑。
对了,这次的重构位于 Shiro/550abd。可供参考。
完成的实现位于:
let i = 0
const ports = []
onconnect = (e) => {
const port = e.ports[0];
ports.push(port)
port.addEventListener("message", (e) => {
const workerResult = `Result: ${e.data[0] * e.data[1]}`;
port.postMessage(workerResult);
});
port.start();
};
setInterval(() => {
i++;
ports.forEach(port => port.postMessage("Message from worker, time: " + i))
}, 1000);
const worker = new SharedWorker(new URL('./io.worker', import.meta.url), {
name: 'shiro-ws-worker',
})
import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'
/// <reference lib="webworker" />
let ws: Socket | null = null
function setupIo(config: { url: string }) {
if (ws) return
// 使用 socket.io
console.log('Connecting to io, url: ', config.url)
ws = io(config.url, {
timeout: 10000,
reconnectionDelay: 3000,
autoConnect: false,
reconnectionAttempts: 3,
transports: ['websocket'],
})
if (!ws) return
ws.on('disconnect', () => {
boardcast({
type: 'disconnect',
})
})
/**
* @param {any} payload
*/
ws.on('message', (payload) => {
console.log('ws', payload)
boardcast({
type: 'message',
payload,
})
})
ws.on('connect', () => {
console.log('Connected to ws.io server from SharedWorker')
if (waitingEmitQueue.length > 0) {
waitingEmitQueue.forEach((payload) => {
if (!ws) return
ws.emit('message', payload)
})
waitingEmitQueue.length = 0
}
boardcast({
type: 'connect',
// @ts-expect-error
payload: ws.id,
})
})
ws.open()
boardcast({
type: 'sid',
payload: ws.id,
})
}
const ports = [] as MessagePort[]
self.addEventListener('connect', (ev: any) => {
const event = ev as MessageEvent
const port = event.ports[0]
ports.push(port)
port.onmessage = (event) => {
const { type, payload } = event.data
console.log('get message from main', event.data)
switch (type) {
case 'config':
setupIo(payload)
break
case 'emit':
if (ws) {
if (ws.connected) ws.emit('message', payload)
else waitingEmitQueue.push(payload)
}
break
case 'reconnect':
if (ws) ws.open()
break
case 'init':
port.postMessage({ type: 'ping' })
if (ws) {
if (ws.connected) port.postMessage({ type: 'connect' })
port.postMessage({ type: 'sid', payload: ws.id })
}
break
default:
console.log('Unknown message type:', type)
}
}
port.start()
})
function boardcast(payload: any) {
console.log('[ws] boardcast', payload)
ports.forEach((port) => {
port.postMessage(payload)
})
}
const waitingEmitQueue: any[] = []
interface WorkerSocket {
sid: string
}
class SocketWorker {
private socket: WorkerSocket | null = null
worker: SharedWorker | null = null
constructor() {
if (isServerSide) return
// @ts-expect-error
import('./io.worker').then(({ default: SharedWorker }) => {
if (isServerSide) return
const worker = new SharedWorker()
this.prepare(worker)
this.worker = worker
})
}
async getSid() {
return this.socket?.sid
}
private setSid(sid: string) {
this.socket = {
...this.socket,
sid,
}
}
bindMessageHandler = (worker: SharedWorker) => {
worker.port.onmessage = (event: MessageEvent) => {
const { data } = event
const { type, payload } = data
switch (type) {
case 'ping': {
worker?.port.postMessage({
type: 'pong',
})
console.log('[ws worker] pong')
break
}
case 'connect': {
window.dispatchEvent(new SocketConnectedEvent())
setSocketIsConnect(true)
const sid = payload
this.setSid(sid)
break
}
case 'disconnect': {
window.dispatchEvent(new SocketDisconnectedEvent())
setSocketIsConnect(false)
break
}
case 'sid': {
const sid = payload
this.setSid(sid)
break
}
case 'message': {
const typedPayload = payload as string | Record<'type' | 'data', any>
if (typeof typedPayload !== 'string') {
return this.handleEvent(
typedPayload.type,
camelcaseKeys(typedPayload.data),
)
}
const { data, type } = JSON.parse(typedPayload) as {
data: any
type: EventTypes
}
this.handleEvent(type, camelcaseKeys(data))
}
}
}
}
prepare(worker: SharedWorker) {
const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
this.bindMessageHandler(worker)
worker.port.postMessage({
type: 'config',
payload: {
url: `${gatewayUrlWithoutTrailingSlash}/web`,
},
})
worker.port.start()
worker.port.postMessage({
type: 'init',
})
}
handleEvent(type: EventTypes, data: any) {
// Handle biz event
}
emit(event: SocketEmitEnum, payload: any) {
this.worker?.port.postMessage({
type: 'emit',
payload: { type: event, payload },
})
}
reconnect() {
this.worker?.port.postMessage({
type: 'reconnect',
})
}
static shared = new SocketWorker()
}
export const socketWorker = SocketWorker.shared
export type TSocketClient = SocketWorker