Installation
引入 WaterBear SyncVar SDK
引入SDK - 代码示例
// 方式1: CDN 引入 - 推荐(无需 npm,直接使用)
<script src="https://unpkg.com/@xuzhiyang/syncvar/dist/syncvar.js"></script>
<script>
const { WebSocketClient, SyncVar, SSE } = window.WaterBear;
// 使用 SDK...
</script>
// 方式2: ESM 模块 - 现代前端项目
<script type="module">
import { WebSocketClient, SyncVar, SSE } from 'https://unpkg.com/@xuzhiyang/syncvar/dist/syncvar.mjs';
// 使用 SDK...
</script>
// 方式3: npm 安装 - Node.js 或打包工具项目
// npm install @xuzhiyang/syncvar
import { WebSocketClient, SyncVar, SSE } from '@xuzhiyang/syncvar';
Getting Started
连接服务器并初始化
连接服务器 - 代码示例
const { WebSocketClient, SyncVar } = WaterBear;
let varServer;
let position;
async function init() {
varServer ??= await WebSocketClient({
namespace: 'room1', // 命名空间 - 同一命名空间的变量会自动同步
guid: 'abc', // 用户唯一标识 - 用于区分不同客户端
timeout: 5000, // 连接超时时间(毫秒),默认 5000ms
maxReconnectCount: 100, // 最大重连次数,默认 100
onopen: (ws, event) => { // 连接建立回调
console.log('WebSocket 已连接');
},
onclose: (event) => { // 连接关闭回调
console.log('WebSocket 连接已关闭', event.code, event.reason);
},
onerror: (err) => { // 错误处理回调
console.error('WebSocket 错误:', err.message || err);
}
}).then(ws => ws.bind(new SyncVar()));
const id = varServer.id;
const strId = id.toString().padStart(4, '0');
console.log(`连接成功, 用户号码:${strId}`);
// 同步变量 'position' - 从服务器获取变量并监听变化
await varServer.sync('position');
position = varServer.position;
// 监听变量变化 - 当变量被修改时自动触发回调
position.watch((value) => {
console.log('position updated:', value);
});
}
Variable Operations
变量读写、安全写入
变量操作 - 代码示例
// 初始化数据
async function reset_var_demo() {
const { position, test, user } = await varServer.sync(
['position', 'test', 'user'], {
reset: { // 初始化变量值 - 仅在变量不存在时设置
position: { x: 100, y: 200, obj: { z: 1 } },
test: { a: 0 },
user: { name: '张三', age: 10 }
},
store: { user: true } // 持久化存储 - 将 user 变量保存到数据库
});
}
// 写数据 - 直接修改同步变量
async function write_var_demo() {
position.x ??= 0; // 空值合并运算符 - 仅在值为 null/undefined 时赋值
position.y ??= 0;
position.x++; // 修改变量 - 自动同步到所有连接的客户端
position.y++;
position.obj ??= { z: 100 };
position.obj.z++; // 嵌套对象属性修改也会同步
position.array ??= [1, 2, 3, 4, 5];
position.array[1]++; // 数组元素修改也会同步
}
// 安全写数据 - 使用分布式锁
async function lock_write_var_demo() {
const { user } = await varServer.sync('user');
user.watch((value) => {
console.log('user updated:', value);
});
await user.lock(); // 获取分布式锁 - 确保同一时间只有一个客户端能修改
user.age ??= 1;
user.age += 2; // 锁释放后,修改自动同步到所有客户端
}
// 通过 WebSocket 读数据
async function read_var_demo() {
const { position } = await varServer.sync('position');
console.log({ position });
}
// 等待写入确认 - 确保数据写入完成后再继续
async function wait_updated_demo() {
const { test } = await varServer.sync('test');
// 修改数据
test.a ??= 0;
test.a++;
// 等待服务端确认写入完成
await varServer.waitUpdated();
console.log('写入已确认,test.a=', test.a);
}
Real-time Sync
多变量同步、变化监听
实时同步 - 代码示例
// 同步多个数据
async function sync_multi_var_demo() {
const { position, test, user } = await varServer.sync(
['position', 'test', 'user'] // 批量同步多个变量
);
console.log({ position, test, user });
test.watch((value) => {
console.log('test updated:', value);
});
}
// 不同名字空间数据
let server1, server2, server11;
async function multi_namespace_demo() {
// 相同名字空间自动同步相同变量
server1 ??= await WebSocketClient({
namespace: 'room1' // 命名空间 - room1
}).then(ws => ws.bind(new SyncVar()));
server2 ??= await WebSocketClient({
namespace: 'room2' // 命名空间 - room2 (不同命名空间的变量相互独立)
}).then(ws => ws.bind(new SyncVar()));
server11 ??= await WebSocketClient({
namespace: 'room1' // 命名空间 - room1 (与 server1 相同)
}).then(ws => ws.bind(new SyncVar()));
// server1 和 server11 的 position 变量会相互同步
// server2 的 position 变量是独立的
await server1.sync('position', {
reset: { position: { x: 111, y: 222 } }
});
await server2.sync('position', {
reset: { position: { x: 333, y: 444 } }
});
await server11.sync('position');
server1.position.x++; // 修改 server1.position,server11.position 也会同步更新
server2.position.x++; // 修改 server2.position,对其他命名空间无影响
server11.position.x++; // 修改 server11.position,server1.position 也会同步更新
console.log({
position1: server1.position, // 与 position11 相同
position2: server2.position, // 独立
position11: server11.position // 与 position1 相同
});
}
Events & SSE
Server-Sent Events 订阅与事件触发
Events & SSE - 代码示例
const { SSE } = WaterBear;
let sse;
// SSE 读数据 - 订阅变量和事件
async function sse_read_var_demo() {
sse ??= new SSE({
namespace: 'room1' // 命名空间
});
await sse.watch({
vars: { // 订阅变量 - 当变量变化时触发回调
position: ({ value }) => { // 键名 = 变量名
console.log('position', value); // value - 变量的当前值
},
test: ({ value }) => {
console.log('test', value);
}
},
events: { // 订阅事件 - 当事件触发时执行回调
myevent: ({ data }) => { // 键名 = 事件名
const json = JSON.parse(data); // data - 事件数据(字符串)
console.log('收到myevent', json);
}
}
});
}
// 触发 SSE 事件
async function sse_event_var_demo() {
sse ??= new SSE({
namespace: 'room1' // 命名空间
});
const ret = await sse.emit('myevent', { // 事件名称
hello: 'abc' + Math.random() // 事件数据 - 传递给所有订阅该事件的客户端
});
}
// 动态添加/移除事件监听
async function sse_dynamic_event_demo() {
sse ??= new SSE({
namespace: 'room1'
});
// 动态添加事件监听器
function customEventHandler({ data }) {
console.log('动态监听到 customEvent:', JSON.parse(data));
}
sse.onEvent('customEvent', customEventHandler);
// 触发事件测试
await sse.emit('customEvent', { msg: '动态监听测试' });
// 移除事件监听器
sse.offEvent('customEvent', customEventHandler);
}
// 获取变量快照
async function sse_get_snapshot_demo() {
sse ??= new SSE({
namespace: 'room1'
});
// 获取所有变量的快照(Map)
const allVars = sse.getAll();
console.log('所有变量:', allVars);
// 获取单个变量的当前值
const position = sse.get('position');
console.log('position:', position);
}
// 关闭 SSE 连接
async function sse_close_demo() {
sse?.close(); // 关闭 SSE 连接并停止重连
sse = null;
console.log('SSE 连接已关闭');
}
RPC (Remote Procedure Call)
远程函数调用
RPC (远程函数调用) - 代码示例
// RPC 函数绑定 - 注册远程调用函数
async function rpc_bind_demo() {
await varServer.rpcBind(function rpc_test(params) {
// params - RPC 调用时传递的参数
console.log('rpc_test', params);
const { a, b } = params;
return a + b; // 返回结果会发送给调用方
});
console.log('绑定成功');
}
// RPC 调用 - 调用远程函数
async function rpc_call_demo() {
const a = Math.round(Math.random() * 100);
const b = Math.round(Math.random() * 100);
await varServer.rpcCall('rpc_test', { a, b }) // rpc_test - RPC 函数名,{ a, b } - 参数
.then(ret => {
console.log(ret); // ret - RPC 函数的返回值
})
.catch(err => {
console.error('rpc_call_demo error:', err.message || err);
});
}
Advanced Features
压力测试
压力测试 - 代码示例
// 压力测试 - 创建多个并发客户端
let client_count = 0;
let latest_number = null;
let latest_writer = null;
let startTime;
async function add_testing_client_demo(count = 1) {
startTime ??= new Date();
for (let i = 0; i < count; i++) {
const server = await WebSocketClient({
maxReconnectCount: 1 // 最大重连次数 - 压力测试时设置为1,避免无限重连
}).then(ws => ws.bind(new SyncVar()));
client_count++;
const id = server.ws.id;
const { test } = await server.sync('test');
const add_number = async () => {
try {
// 使用分布式锁确保时序正确
await test.lock({ timeout: 60000 }); // timeout - 锁超时时间(毫秒)
test.a ??= 0;
test.a++;
if (latest_number && latest_number >= test.a) {
throw new Error(
'出问题了!写入的数字小于等于之前的数字:' +
`${id}:a=${test.a},latest_number=${latest_number}`
);
}
latest_number = test.a;
latest_writer = id;
const useTime = ( new Date() - startTime ) / 1000;
console.log(`更新者:${i}, 值:${test.a}, 客户端数:${client_count}, 耗时:${Math.floor(useTime)}秒`);
setTimeout(add_number, 0);
} catch (error) {
console.error(error.message || error);
setTimeout(add_number, Math.random() * 1000);
}
};
add_number();
await new Promise(resolve => setTimeout(resolve, 10));
}
}
Console Output