阐述 WebTransport (HTTP/3 基于 QUIC) 中 Datagrams (不可靠数据报) 和 Streams (可靠流) 的区别,以及它们在实时通信中的 JavaScript 应用场景。

咳咳,各位听众朋友们,早上/下午/晚上好!我是今天的讲师,很高兴能和大家一起聊聊WebTransport这个神奇的东西。今天的主题是:WebTransport的Datagrams和Streams,以及它们在实时通信中的JavaScript应用。

WebTransport,可以简单理解为WebSocket的升级版,它基于HTTP/3和QUIC协议,提供了更快的连接建立速度、更好的拥塞控制和更强大的多路复用能力。相比WebSocket,WebTransport最大的亮点就是引入了Datagrams和Streams两种数据传输方式,这两种方式各有千秋,适合不同的应用场景。接下来我们就来好好聊聊它们。

Datagrams:不可靠的“飞毛腿”

首先,我们来说说Datagrams。你可以把Datagrams想象成一个个独立的包裹,每个包裹里装着你需要发送的数据。这些包裹会被直接扔到网络上,然后各凭本事到达目的地。重点是:WebTransport不保证这些包裹一定能到达,也不保证它们到达的顺序。

  • 特点:

    • 不可靠: 数据可能会丢失。
    • 无序: 数据到达的顺序可能和发送的顺序不一致。
    • 无连接状态: 每次发送都是一个独立的行为,不需要建立持续的连接状态。
    • 低延迟: 因为不需要保证可靠性,所以延迟通常较低。
    • 高吞吐量: 适合大量数据的快速传输,即使偶尔丢包也能接受。
  • 适用场景:

    • 实时游戏: 游戏中玩家的位置、动作等数据,允许一定程度的丢包,但对延迟要求极高。
    • 传感器数据: 传感器数据流,可以容忍少量数据丢失,但需要快速传输。
    • 心跳检测: 定期发送心跳包,即使偶尔丢包也能判断连接是否存活。
    • 音视频流媒体(某些场景): 容忍一定程度的丢帧,以保证流畅播放。

Datagrams的JavaScript代码示例:

// 客户端
const connect = async () => {
  const transport = new WebTransport('https://example.com/webtransport');

  transport.addEventListener('sessionestablished', async () => {
    console.log('WebTransport session established!');

    const encoder = new TextEncoder();
    const datagram = encoder.encode('Hello from client!');

    try {
      transport.datagrams.writable.getWriter().then(writer => {
        writer.write(datagram);
        writer.releaseLock();
      });
      console.log('Datagram sent!');
    } catch (e) {
      console.error('Error sending datagram:', e);
    }
  });

  try {
    await transport.ready;

    transport.datagrams.readable.getReader().then(reader => {
      readDatagrams(reader);
    });

  } catch (e) {
    console.error('Failed to connect:', e);
  }
};

const readDatagrams = async (reader) => {
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        console.log("Datagram stream closed.");
        break;
      }

      const decoder = new TextDecoder();
      const message = decoder.decode(value);
      console.log('Received datagram:', message);
    }
  } catch (e) {
    console.error("Error reading datagrams:", e);
  } finally {
    reader.releaseLock();
  }
}

connect();

// 服务器端 (Node.js, 需要 @failsafe-ws/webtransport 库)
import { WebTransportServer } from '@failsafe-ws/webtransport';
import { createServer } from 'node:https';
import { readFileSync } from 'node:fs';

const key = readFileSync('./localhost.key');
const cert = readFileSync('./localhost.crt');

const server = createServer({ key, cert });

const transportServer = new WebTransportServer({ server });

transportServer.handleStream((stream) => {
    stream.readable.pipeTo(stream.writable);
});

transportServer.handleDatagram((datagram, session) => {
    const decoder = new TextDecoder();
    const message = decoder.decode(datagram);
    console.log('Received datagram:', message);

    const encoder = new TextEncoder();
    const response = encoder.encode(`Server received: ${message}`);

    session.datagrams.writable.getWriter().then(writer => {
        writer.write(response);
        writer.releaseLock();
    });
});

server.listen(4433, () => {
    console.log('WebTransport server listening on https://localhost:4433');
});

Streams:可靠的“快递员”

接下来,我们再来说说Streams。你可以把Streams想象成一条可靠的管道,你把数据放进管道的一端,数据会按照顺序、完整地到达管道的另一端。WebTransport保证数据一定能到达,并且保证它们到达的顺序和发送的顺序一致。

  • 特点:

    • 可靠: 数据不会丢失。
    • 有序: 数据到达的顺序和发送的顺序一致。
    • 连接状态: 需要建立持续的连接状态。
    • 较高延迟: 为了保证可靠性,延迟通常较高。
    • 较低吞吐量: 相比Datagrams,吞吐量较低。
  • 适用场景:

    • 文件传输: 确保文件完整无损地传输。
    • 聊天应用: 确保消息按照发送顺序到达,且不会丢失。
    • 远程控制: 确保控制指令可靠地发送到目标设备。
    • 事务处理: 确保事务的完整性和一致性。
    • 任何需要可靠数据传输的场景。

Streams的JavaScript代码示例:

// 客户端
const connectStream = async () => {
  const transport = new WebTransport('https://example.com/webtransport');

  transport.addEventListener('sessionestablished', async () => {
    console.log('WebTransport session established!');

    try {
      const stream = await transport.createUnidirectionalStream();
      const writer = stream.getWriter();
      const encoder = new TextEncoder();
      const data = encoder.encode('Hello from client via stream!');
      await writer.write(data);
      await writer.close();
      console.log('Stream sent!');

      //接收服务器流
      const incomingStream = await transport.createBidirectionalStream();
      const reader = incomingStream.readable.getReader();
      const decoder = new TextDecoder();

      try {
        while (true) {
          const { value, done } = await reader.read();
          if (done) {
            console.log("Stream closed by server");
            break;
          }
          console.log("Received from server via stream:", decoder.decode(value));
        }
      } catch (e) {
        console.error("Error reading from stream:", e);
      } finally {
        reader.releaseLock();
      }
      console.log('Bidirectional Stream received!');

    } catch (e) {
      console.error('Error creating stream:', e);
    }
  });

  try {
    await transport.ready;
  } catch (e) {
    console.error('Failed to connect:', e);
  }
};

connectStream();

// 服务器端 (Node.js, 需要 @failsafe-ws/webtransport 库)
import { WebTransportServer } from '@failsafe-ws/webtransport';
import { createServer } from 'node:https';
import { readFileSync } from 'node:fs';

const key = readFileSync('./localhost.key');
const cert = readFileSync('./localhost.crt');

const server = createServer({ key, cert });

const transportServer = new WebTransportServer({ server });

transportServer.handleStream((stream) => {
    stream.readable.pipeTo(stream.writable);
});

transportServer.handleUnidirectionalStream(async (stream) => {
    const reader = stream.readable.getReader();
    const decoder = new TextDecoder();
    try {
      while (true) {
        const { value, done } = await reader.read();
        if (done) {
          console.log("Unidirectional Stream closed by client");
          break;
        }
        console.log("Received from client via unidirectional stream:", decoder.decode(value));
      }
    } catch (e) {
      console.error("Error reading from unidirectional stream:", e);
    } finally {
      reader.releaseLock();
    }

});

transportServer.handleBidirectionalStream(async (stream) => {
    const writer = stream.writable.getWriter();
    const encoder = new TextEncoder();
    const data = encoder.encode('Hello from server via bidirectional stream!');
    await writer.write(data);
    await writer.close();
    console.log('Bidirectional Stream sent!');
});

transportServer.handleDatagram((datagram, session) => {
    const decoder = new TextDecoder();
    const message = decoder.decode(datagram);
    console.log('Received datagram:', message);

    const encoder = new TextEncoder();
    const response = encoder.encode(`Server received: ${message}`);

    session.datagrams.writable.getWriter().then(writer => {
        writer.write(response);
        writer.releaseLock();
    });
});

server.listen(4433, () => {
    console.log('WebTransport server listening on https://localhost:4433');
});

Datagrams vs. Streams:一个表格对比

为了更清晰地理解Datagrams和Streams的区别,我们用一个表格来总结一下:

特性 Datagrams (不可靠数据报) Streams (可靠流)
可靠性 不可靠 可靠
顺序 无序 有序
连接状态 无连接状态 连接状态
延迟 较高
吞吐量 较低
适用场景 实时游戏,传感器数据等 文件传输,聊天应用等

实时通信中的JavaScript应用场景:结合使用 Datagrams 和 Streams

在实际的实时通信应用中,我们通常会结合使用Datagrams和Streams,以达到最佳的效果。

  • 实时游戏:

    • Datagrams: 用于传输玩家的位置、动作等数据,对延迟要求极高,可以容忍少量丢包。
    • Streams: 用于传输游戏状态、玩家信息等重要数据,需要保证可靠性。
  • 音视频会议:

    • Datagrams: 用于传输音视频数据,对延迟要求较高,可以容忍一定程度的丢帧。
    • Streams: 用于传输控制信令、聊天消息等,需要保证可靠性。
  • 在线协作:

    • Datagrams: 用于传输光标位置、选择范围等实时数据,对延迟要求较高,可以容忍少量丢包。
    • Streams: 用于传输文档内容、协作状态等重要数据,需要保证可靠性。

更复杂的示例:实时游戏中的应用

假设我们要开发一个简单的实时游戏,玩家可以在地图上移动,并且可以互相发送聊天消息。

// 客户端

const connectGame = async () => {
  const transport = new WebTransport('https://example.com/game');

  transport.addEventListener('sessionestablished', async () => {
    console.log('WebTransport session established!');

    // 使用 Datagrams 发送玩家位置
    setInterval(() => {
      const playerPosition = { x: Math.random() * 100, y: Math.random() * 100 };
      const encoder = new TextEncoder();
      const datagram = encoder.encode(JSON.stringify(playerPosition));
      try {
        transport.datagrams.writable.getWriter().then(writer => {
          writer.write(datagram);
          writer.releaseLock();
        });

        //console.log('Player position sent:', playerPosition);
      } catch (e) {
        console.error('Error sending player position:', e);
      }
    }, 50); // 每50毫秒发送一次位置

    // 使用 Streams 接收聊天消息
    try {
        const stream = await transport.createBidirectionalStream();
        const reader = stream.readable.getReader();
        const decoder = new TextDecoder();
        readChatMessages(reader);

        // 发送聊天消息的函数
        window.sendMessage = async (message) => {
            const writer = stream.writable.getWriter();
            const encoder = new TextEncoder();
            const data = encoder.encode(message);
            await writer.write(data);
           // await writer.close(); //保持连接
            console.log('Sent chat message:', message);
        };

    } catch (e) {
      console.error('Error creating stream:', e);
    }

    //读取其他玩家的位置信息
    transport.datagrams.readable.getReader().then(reader => {
      readOtherPlayersPosition(reader);
    });

  });

  try {
    await transport.ready;
  } catch (e) {
    console.error('Failed to connect:', e);
  }
};

const readChatMessages = async (reader) => {
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        console.log("Stream closed by server");
        break;
      }
      const decoder = new TextDecoder();
      const message = decoder.decode(value);
      console.log("Received chat message:", message);
      // 在游戏中显示聊天消息
    }
  } catch (e) {
    console.error("Error reading from stream:", e);
  } finally {
    reader.releaseLock();
  }
}

const readOtherPlayersPosition = async (reader) => {
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        console.log("Datagram stream closed.");
        break;
      }
      const decoder = new TextDecoder();
      const message = decoder.decode(value);
      const position = JSON.parse(message);
      // 在游戏中更新其他玩家的位置
      //console.log('Received other player position:', position);
      updateOtherPlayerPosition(position);
    }
  } catch (e) {
    console.error("Error reading datagrams:", e);
  } finally {
    reader.releaseLock();
  }
}

//模拟更新其他玩家位置的函数
const updateOtherPlayerPosition = (position) => {
  //假设这里有渲染其他玩家位置的代码
  //console.log("更新其他玩家位置", position)
}
connectGame();

// 服务器端 (Node.js, 需要 @failsafe-ws/webtransport 库)
import { WebTransportServer } from '@failsafe-ws/webtransport';
import { createServer } from 'node:https';
import { readFileSync } from 'node:fs';

const key = readFileSync('./localhost.key');
const cert = readFileSync('./localhost.crt');

const server = createServer({ key, cert });

const transportServer = new WebTransportServer({ server });

const connectedClients = new Set(); //保存所有连接的客户端会话

transportServer.handleStream((stream) => {
  stream.readable.pipeTo(stream.writable);
});

transportServer.handleBidirectionalStream(async (stream, session) => {
    connectedClients.add(session);
    const reader = stream.readable.getReader();
    const decoder = new TextDecoder();

    try {
        while (true) {
            const { value, done } = await reader.read();
            if (done) {
                console.log("Stream closed by client");
                connectedClients.delete(session);
                break;
            }
            const message = decoder.decode(value);
            console.log("Received chat message:", message);

            // 将聊天消息广播给所有其他客户端
            for (const client of connectedClients) {
                if (client !== session) {
                   try {
                      const writer = client.createBidirectionalStream().then(s=>{
                        const writer = s.writable.getWriter();
                        const encoder = new TextEncoder();
                        const data = encoder.encode(message);
                        writer.write(data);
                        writer.releaseLock();
                      });
                   }catch(e){
                     console.log("Error sending chat message to client", e);
                   }

                }
            }
        }
    } catch (e) {
        console.error("Error reading from stream:", e);
    } finally {
        reader.releaseLock();
    }
});

transportServer.handleDatagram((datagram, session) => {
    // 广播玩家位置给所有其他客户端
    for (const client of connectedClients) {
        if (client.sessionId !== session.sessionId) {
            client.datagrams.writable.getWriter().then(writer => {
                writer.write(datagram);
                writer.releaseLock();
            });
        }
    }
});

transportServer.handleSession(session => {
  connectedClients.add(session);

  session.addEventListener('close', () => {
      connectedClients.delete(session);
      console.log("Session closed.");
  });

  console.log("New session established", session.sessionId)

});

server.listen(4433, () => {
    console.log('WebTransport server listening on https://localhost:4433');
});

在这个示例中:

  • 客户端使用Datagrams以高频率发送玩家的位置信息,即使偶尔丢包,也能保证玩家的移动体验基本流畅。
  • 客户端使用Streams发送和接收聊天消息,确保消息的可靠性和顺序。
  • 服务器接收到玩家的位置信息后,将信息广播给其他客户端。
  • 服务器接收到聊天消息后,将消息广播给所有其他客户端。

总结

WebTransport的Datagrams和Streams为实时通信提供了更灵活、更强大的数据传输方式。Datagrams适用于对延迟要求极高、可以容忍少量丢包的场景,而Streams适用于需要保证可靠性和顺序的场景。在实际应用中,我们可以根据不同的需求,灵活地结合使用这两种方式,以达到最佳的效果。

好了,今天的讲座就到这里,希望大家有所收获!如果有什么问题,欢迎随时提问。下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注