站点的实时人数是如何实现的?
经常光顾我站的读者应该都看到过我站底部有显示当前在线的人数。在不久之前我还为其增加了显示当前正在阅读具体哪篇文章的人数排行,并且在具体的某篇文章左侧增加时间线,用于展示当前所有正在阅读此文章的读者阅读进度。
如果你看过我站的开源项目的话大概知道此站是由 Mix Space 驱动的,既然已经需要有服务器驱动,那么我基本没有其他平台提供的服务了,能自己去实现的基本都是自己去实现。比如这个功能,如果你用 SasS 服务,则可以选择 Liveblocks。当然这里不做讨论,因为此文章的目的是如何自己去实现这样一个功能,服务端以及前端。
设计与实现
实时人数的统计,需要长连接。这里可以使用 WebSocket,当然如果你不需要特别实时的情况也可以使用轮询的方式。这里我们还是以 Socket.IO 实现,Socket.IO 是 WebSocket 的上层封装。
下面对上面的功能进行拆解分析。
站点的实时人数
所有的实时人数是比较好统计的。我们可以考虑用所有的 WebSocket 连接去统计当前的在线人数。
如果使用这个方法,那么没有连接到 WebSocket 的读者,不计入在线人数。
因为站点不需要登录就能访问,而每个访问都会建立一条 WebSocket 连接,如果同一个用户同时打开多个页面访问站点就会重复计算在线人数。
对于上面的第一点,我们不作考虑。对于第二点,我们可以稍作分析,在一些场景进行限制。
每个打开的页面都要建立 WebSocket 连接是可以接受的,但是需要对这些同用户的进行去重。
可以采用下面的办法:
- 去重同一个连接 WebSocket 的 IP(如果在大内网或者 CDN 的情况下会过于激进)
- 在客户端生成唯一的连接的 SessionId 在浏览器内部共享(可采用 Local Storage)
- 根据用户的登录状态去重,或者本地记录的历史评论数据
- 可以使用 SharedWorker 在页面之间共享 Socket 实例,该方法较为复杂,后续我可以再出一篇文章讲解。
综上,最后我们选择 2,3 作为最后的实现。
那么,在前端我们需要在没有 SessionId 的时候生成一个,如果在 Local Storage 存在则复用。
import { customAlphabet } from 'nanoid'
const alphabet = `1234567890abcdefghijklmnopqrstuvwxyz`
const nanoid = customAlphabet(alphabet)
const defaultSessionId = nanoid(8)
const storageKey = buildNSKey('web-session')
const getSocketWebSessionId = () => {
if (!isClientSide) {
return ''
}
const sessionId = localStorage.getItem(storageKey)
if (sessionId) return sessionId
localStorage.setItem(storageKey, defaultSessionId)
return defaultSessionId
}
首先使用 nanoid 生成一个默认的 sessionId,我们需要把已经存在的 SessionId 传递给 Socket Client。至于上面第三点的 Session Id 因为设计到登录态的异步,所以我这里只能选择后续更新当前 Socket Client 的 Session Id。
const client = io(GATEWAY_URL, {
timeout: 10000,
reconnectionDelay: 3000,
autoConnect: false,
reconnectionAttempts: 3,
transports: ['websocket'],
query: {
socket_session_id: getSocketWebSessionId(),
},
})
此处把生成的 WebSessionId 通过 query 传递。
ImportantSocket.IO client 会有一个sid
的值,这是每个 client 的唯一 id,请不要和我们生成的 session id 混淆
在服务端消费传递的 sessionId。
async handleConnection(socket: SocketIO.Socket) {
const webSessionId =
socket.handshake.query['socket_session_id'] ||
// fallback sid
socket.id
await this.gatewayService.setSocketMetadata(socket, {
sessionId: webSessionId,
})
}
这里我使用了 setSocketMetadata
去存储 socket 上的元数据,以便后续获取和更新这些值。这些 Socket 附加的元数据是存储在 Redis 上,具体实现可以参考 https://github.com/mx-space/core/blob/c82cb8ff54dc7b98bc411e03de81ede932aa612b/apps/core/src/processors/gateway/gateway.service.ts 。
那么,我们在服务端的统计站点总人数,实现如下:
async getCurrentClientCount() {
const server = this.namespace.server
const socketsMeta = await Promise.all(
await server
.of(`/${namespace}`)
.fetchSockets()
.then((sockets) => {
return sockets.map((socket) =>
this.gatewayService.getSocketMetadata(socket),
)
}),
)
return uniqBy(socketsMeta, (x) => x?.sessionId).length
}
在有其他 WebSocket 连接时,广播给所有的 Socket Client 当前的人数。
async handleConnection(socket: SocketIO.Socket) {
this.whenUserOnline()
}
whenUserOnline = async () => {
this.broadcast(
BusinessEvents.VISITOR_ONLINE,
await this.sendOnlineNumber(),
)
}
更新 SessionId
当前端用户登录态完成之后,我们需要更新 SocketId,以获取更加精确的实时人数。
某些状态可能存在于 Hooks 中,这里也封装一个 Hooks 取得最后的 SocketId。
export const useSocketSessionId = () => {
const user = useUser()
const owner = useOwner()
const ownerIsLogin = useIsLogged()
return useMemo((): string => {
const fallbackSid = getSocketWebSessionId()
if (ownerIsLogin) {
if (!owner) return fallbackSid
return `owner-${owner.id}`
} else if (user && user.isSignedIn) {
return user.user.id.toLowerCase()
}
return fallbackSid
}, [owner, ownerIsLogin, user])
}
然后 Emit 更新事件。
const webSocketSessionId = useSocketSessionId()
const previousWebSocketSessionIdRef = useRef(webSocketSessionId)
const socketIsConnected = useSocketIsConnect()
useEffect(() => {
const previousWebSocketSessionId = previousWebSocketSessionIdRef.current
previousWebSocketSessionIdRef.current = webSocketSessionId
if (!socketIsConnected) return
socketClient.emit(SocketEmitEnum.UpdateSid, {
sessionId: webSocketSessionId,
})
}, [socketIsConnected, webSocketSessionId])
那么,在服务端上,对此事件处理。
@SubscribeMessage('message')
async handleMessageEvent(
@MessageBody() data: MessageEventDto,
@ConnectedSocket() socket: SocketIO.Socket,
) {
const { payload, type } = data
switch (type) {
case SupportedMessageEvent.UpdateSid: {
const { sessionId } = payload as { sessionId: string }
if (sessionId) {
await this.gatewayService.setSocketMetadata(socket, { sessionId })
this.whenUserOnline() // broadcast all client online count
}
}
}
}
文章的实时阅读人数
每篇文章我们都需要统计当前的在线人数,可以借助 Socket.IO 的 Room 特征。如果没有使用 Socket.IO 则可以自己实现一套 Channel,也就是把一个 Socket 拆分成多次 Scope,在每个 Scope 可以订阅不同类型的消息推送。
那么,每一篇文章就是对应一个 Room,在同一个 Room 下的 Socket Client 可以接受到相互的消息。在读者滚动页面的时候,Emit 事件,在服务端向整个 Room 广播一位读者的阅读进度即可。
在文章进入后,立即发出加入 Room 的请求,在切换文章或者关闭页面,退出 Room。
那么在服务端首先实现 Join 和 Leave 事件的处理。
@SubscribeMessage('message')
async handleMessageEvent(
@MessageBody() data: MessageEventDto,
@ConnectedSocket() socket: SocketIO.Socket,
) {
const { payload, type } = data
switch (type) {
case SupportedMessageEvent.Join: {
const { roomName } = payload as { roomName: string }
if (roomName) {
socket.join(roomName)
}
break
}
case SupportedMessageEvent.Leave: {
const { roomName } = payload as { roomName: string }
if (roomName) {
socket.leave(roomName)
const socketMeta = await this.gatewayService.getSocketMetadata(socket)
if (socketMeta.presence) {
this.webGateway.broadcast(
BusinessEvents.ACTIVITY_LEAVE_PRESENCE,
{
identity: socketMeta.presence.identity,
roomName,
},
{
rooms: [roomName],
},
)
handlePresencePersistToDb(socket)
}
const roomJoinedAtMap = await this.getSocketRoomJoinedAtMap(socket)
delete roomJoinedAtMap[roomName]
await this.gatewayService.setSocketMetadata(socket, {
roomJoinedAtMap,
})
}
break
}
}
}
RoomName
我们可以根据文章的 id 去生成一个唯一的 key。然后使用 socket.join()
去加入一个 room,这是 SocketIO 的内部实现,我们无需关注,我们只需要在 Join 或者 Leave 时候附加或者修改一些元数据,或者持久化一些数据到 DB 即可。
接下来,前端这边,当读者滚动了页面,那么我们将此时的阅读进度通过接口发送到服务器,然后在服务器端广播给所有处于这个 Room 下的 Socket Client。所以在请求接口时我们需要发送当前的 RoomName
和 socket.sid
和前端生成的 sessionId
以及阅读位置。
假设前端发送的数据结构是这样的。
interface Persence {
roomName: string;
position: number;
identity: string;
sid: string;
displayName?: string;
}
后端的关键实现为:
async updatePresence(data: UpdatePresenceDto) {
const roomName = data.roomName
if (!isValidRoomName(roomName)) {
throw new BadRequestException('invalid room_name')
}
const roomSockets = await this.webGateway.getSocketsOfRoom(roomName)
const presenceData: ActivityPresence = {
...data,
operationTime: data.ts,
updatedAt: Date.now(),
connectedAt: +new Date(socket.handshake.time),
}
const roomJoinedAtMap =
await this.webGateway.getSocketRoomJoinedAtMap(socket)
Reflect.set(serializedPresenceData, 'joinedAt', roomJoinedAtMap[roomName])
this.webGateway.broadcast(
BusinessEvents.ACTIVITY_UPDATE_PRESENCE,
serializedPresenceData,
{
rooms: [roomName],
},
)
await this.gatewayService.setSocketMetadata(socket, {
presence: presenceData,
})
return serializedPresenceData
}
当接口到达后,服务端将广播请求接口用户的阅读状态。
当前站内文章阅读排名
这个功能其实是建立在上面的基础上的,所以当我们实现完了上面一个功能之后,这个功能就迎刃而解了。我们只需要获取现有的所有的 Room,然后获取每个 Room 中的 Socket Client 数量就可以了。
这里比较好理解,就不做展开了。
写在后面
上述所有代码均开源。具体实现位于:mx-space/core,Shiro
- https://github.com/mx-space/core/blob/310480f7b48d6460728a12a847575edd350c10c5/apps/core/src/modules/activity/activity.service.ts
- https://github.com/Innei/shiro/blob/c4bb476ac0fbc9f517d07bfdfc2e33b3890ef94a/src/components/modules/activity/Presence.tsx
- https://github.com/mx-space/core/blob/147441c2c99b163106c5fd02f0433510d1cea1b9/apps/core/src/processors/gateway/web/events.gateway.ts