Waterbear SyncVar SDK

实时变量同步 · WebSocket · SSE · RPC · 分布式锁

v0.0.0
📦
Installation
引入 WaterBear SyncVar SDK
引入SDK - 代码示例
// 方式1: CDN 引入 - 推荐(无需 npm,直接使用)
<script src="https://unpkg.com/@xuzhiyang/syncvar/dist/syncvar.js"></script>
<script>
    const { WebSocketClient, SyncVar, SSE } = window.WaterBear;
    // 使用 SDK...
</script>

// 方式2: ESM 模块 - 现代前端项目
<script type="module">
    import { WebSocketClient, SyncVar, SSE } from 'https://unpkg.com/@xuzhiyang/syncvar/dist/syncvar.mjs';
    // 使用 SDK...
</script>

// 方式3: npm 安装 - Node.js 或打包工具项目
// npm install @xuzhiyang/syncvar
import { WebSocketClient, SyncVar, SSE } from '@xuzhiyang/syncvar';
🚀
Getting Started
连接服务器并初始化
连接服务器 - 代码示例
const { WebSocketClient, SyncVar } = WaterBear;

let varServer;
let position;

async function init() {
    varServer ??= await WebSocketClient({
        namespace: 'room1',  // 命名空间 - 同一命名空间的变量会自动同步
        guid: 'abc',         // 用户唯一标识 - 用于区分不同客户端
        timeout: 5000,        // 连接超时时间(毫秒),默认 5000ms
        maxReconnectCount: 100, // 最大重连次数,默认 100
        onopen: (ws, event) => {    // 连接建立回调
            console.log('WebSocket 已连接');
        },
        onclose: (event) => {       // 连接关闭回调
            console.log('WebSocket 连接已关闭', event.code, event.reason);
        },
        onerror: (err) => {        // 错误处理回调
            console.error('WebSocket 错误:', err.message || err);
        }
    }).then(ws => ws.bind(new SyncVar()));

    const id = varServer.id;
    const strId = id.toString().padStart(4, '0');
    console.log(`连接成功, 用户号码:${strId}`);

    // 同步变量 'position' - 从服务器获取变量并监听变化
    await varServer.sync('position');
    position = varServer.position;

    // 监听变量变化 - 当变量被修改时自动触发回调
    position.watch((value) => {
        console.log('position updated:', value);
    });
}
📊
Variable Operations
变量读写、安全写入
变量操作 - 代码示例
// 初始化数据
async function reset_var_demo() {
    const { position, test, user } = await varServer.sync(
        ['position', 'test', 'user'], {
        reset: {  // 初始化变量值 - 仅在变量不存在时设置
            position: { x: 100, y: 200, obj: { z: 1 } },
            test: { a: 0 },
            user: { name: '张三', age: 10 }
        },
        store: { user: true }  // 持久化存储 - 将 user 变量保存到数据库
    });
}

// 写数据 - 直接修改同步变量
async function write_var_demo() {
    position.x ??= 0;  // 空值合并运算符 - 仅在值为 null/undefined 时赋值
    position.y ??= 0;
    position.x++;      // 修改变量 - 自动同步到所有连接的客户端
    position.y++;

    position.obj ??= { z: 100 };
    position.obj.z++;  // 嵌套对象属性修改也会同步

    position.array ??= [1, 2, 3, 4, 5];
    position.array[1]++;  // 数组元素修改也会同步
}

// 安全写数据 - 使用分布式锁
async function lock_write_var_demo() {
    const { user } = await varServer.sync('user');

    user.watch((value) => {
        console.log('user updated:', value);
    });

    await user.lock();  // 获取分布式锁 - 确保同一时间只有一个客户端能修改
    user.age ??= 1;
    user.age += 2;  // 锁释放后,修改自动同步到所有客户端
}

// 通过 WebSocket 读数据
async function read_var_demo() {
    const { position } = await varServer.sync('position');
    console.log({ position });
}

// 等待写入确认 - 确保数据写入完成后再继续
async function wait_updated_demo() {
    const { test } = await varServer.sync('test');

    // 修改数据
    test.a ??= 0;
    test.a++;

    // 等待服务端确认写入完成
    await varServer.waitUpdated();
    console.log('写入已确认,test.a=', test.a);
}
Real-time Sync
多变量同步、变化监听
实时同步 - 代码示例
// 同步多个数据
async function sync_multi_var_demo() {
    const { position, test, user } = await varServer.sync(
        ['position', 'test', 'user']  // 批量同步多个变量
    );
    console.log({ position, test, user });

    test.watch((value) => {
        console.log('test updated:', value);
    });
}

// 不同名字空间数据
let server1, server2, server11;

async function multi_namespace_demo() {
    // 相同名字空间自动同步相同变量
    server1 ??= await WebSocketClient({
        namespace: 'room1'  // 命名空间 - room1
    }).then(ws => ws.bind(new SyncVar()));

    server2 ??= await WebSocketClient({
        namespace: 'room2'  // 命名空间 - room2 (不同命名空间的变量相互独立)
    }).then(ws => ws.bind(new SyncVar()));

    server11 ??= await WebSocketClient({
        namespace: 'room1'  // 命名空间 - room1 (与 server1 相同)
    }).then(ws => ws.bind(new SyncVar()));

    // server1 和 server11 的 position 变量会相互同步
    // server2 的 position 变量是独立的
    await server1.sync('position', {
        reset: { position: { x: 111, y: 222 } }
    });
    await server2.sync('position', {
        reset: { position: { x: 333, y: 444 } }
    });
    await server11.sync('position');

    server1.position.x++;  // 修改 server1.position,server11.position 也会同步更新
    server2.position.x++;  // 修改 server2.position,对其他命名空间无影响
    server11.position.x++; // 修改 server11.position,server1.position 也会同步更新
    console.log({
        position1: server1.position,   // 与 position11 相同
        position2: server2.position,   // 独立
        position11: server11.position  // 与 position1 相同
    });
}
📡
Events & SSE
Server-Sent Events 订阅与事件触发
Events & SSE - 代码示例
const { SSE } = WaterBear;
let sse;

// SSE 读数据 - 订阅变量和事件
async function sse_read_var_demo() {
    sse ??= new SSE({
        namespace: 'room1'  // 命名空间
    });

    await sse.watch({
        vars: {  // 订阅变量 - 当变量变化时触发回调
            position: ({ value }) => {  // 键名 = 变量名
                console.log('position', value);  // value - 变量的当前值
            },
            test: ({ value }) => {
                console.log('test', value);
            }
        },
        events: {  // 订阅事件 - 当事件触发时执行回调
            myevent: ({ data }) => {  // 键名 = 事件名
                const json = JSON.parse(data);  // data - 事件数据(字符串)
                console.log('收到myevent', json);
            }
        }
    });
}

// 触发 SSE 事件
async function sse_event_var_demo() {
    sse ??= new SSE({
        namespace: 'room1'  // 命名空间
    });
    const ret = await sse.emit('myevent', {  // 事件名称
        hello: 'abc' + Math.random()  // 事件数据 - 传递给所有订阅该事件的客户端
    });
}

// 动态添加/移除事件监听
async function sse_dynamic_event_demo() {
    sse ??= new SSE({
        namespace: 'room1'
    });

    // 动态添加事件监听器
    function customEventHandler({ data }) {
        console.log('动态监听到 customEvent:', JSON.parse(data));
    }
    sse.onEvent('customEvent', customEventHandler);

    // 触发事件测试
    await sse.emit('customEvent', { msg: '动态监听测试' });

    // 移除事件监听器
    sse.offEvent('customEvent', customEventHandler);
}

// 获取变量快照
async function sse_get_snapshot_demo() {
    sse ??= new SSE({
        namespace: 'room1'
    });

    // 获取所有变量的快照(Map)
    const allVars = sse.getAll();
    console.log('所有变量:', allVars);

    // 获取单个变量的当前值
    const position = sse.get('position');
    console.log('position:', position);
}

// 关闭 SSE 连接
async function sse_close_demo() {
    sse?.close();  // 关闭 SSE 连接并停止重连
    sse = null;
    console.log('SSE 连接已关闭');
}
🔌
RPC (Remote Procedure Call)
远程函数调用
RPC (远程函数调用) - 代码示例
// RPC 函数绑定 - 注册远程调用函数
async function rpc_bind_demo() {
    await varServer.rpcBind(function rpc_test(params) {
        // params - RPC 调用时传递的参数
        console.log('rpc_test', params);
        const { a, b } = params;
        return a + b;  // 返回结果会发送给调用方
    });
    console.log('绑定成功');
}

// RPC 调用 - 调用远程函数
async function rpc_call_demo() {
    const a = Math.round(Math.random() * 100);
    const b = Math.round(Math.random() * 100);

    await varServer.rpcCall('rpc_test', { a, b })  // rpc_test - RPC 函数名,{ a, b } - 参数
        .then(ret => {
            console.log(ret);  // ret - RPC 函数的返回值
        })
        .catch(err => {
            console.error('rpc_call_demo error:', err.message || err);
        });
}
🔧
Advanced Features
压力测试
压力测试 - 代码示例
// 压力测试 - 创建多个并发客户端
let client_count = 0;
let latest_number = null;
let latest_writer = null;
let startTime;

async function add_testing_client_demo(count = 1) {
    startTime ??= new Date();

    for (let i = 0; i < count; i++) {
        const server = await WebSocketClient({
            maxReconnectCount: 1  // 最大重连次数 - 压力测试时设置为1,避免无限重连
        }).then(ws => ws.bind(new SyncVar()));

        client_count++;
        const id = server.ws.id;
        const { test } = await server.sync('test');

        const add_number = async () => {
            try {
                // 使用分布式锁确保时序正确
                await test.lock({ timeout: 60000 });  // timeout - 锁超时时间(毫秒)
                test.a ??= 0;
                test.a++;

                if (latest_number && latest_number >= test.a) {
                    throw new Error(
                        '出问题了!写入的数字小于等于之前的数字:' +
                        `${id}:a=${test.a},latest_number=${latest_number}`
                    );
                }

                latest_number = test.a;
                latest_writer = id;
                const useTime = ( new Date() - startTime ) / 1000;
                console.log(`更新者:${i}, 值:${test.a}, 客户端数:${client_count}, 耗时:${Math.floor(useTime)}秒`);
                setTimeout(add_number, 0);
            } catch (error) {
                console.error(error.message || error);
                setTimeout(add_number, Math.random() * 1000);
            }
        };
        add_number();
        await new Promise(resolve => setTimeout(resolve, 10));
    }
}
Console Output