前端fetch 实现流式接口
1.5k
类别: 
开发交流

什么是流式传输

先说说传统传输吧,我们日常接触的大多数是传统传输方式,就也是整段传输,前后端将数据一次性传送给对方;相比于传统传输方式,流式传输则采用分段的方式将要传输的数据分层n段,后端一次性传一段给前端,直到传输完成为止,当然在传输的过程中,前端也可以提前中断传输,后端收到中断传输的消息后,也不再继续往前端传输剩下没传完的数据段。 当然,前端流式传输需要在后端支持分块传输的情况下才能实现。

前端实现流式传输的几种主流方式

查阅了资料,目前前端实现流式传输主要有以下几种方式:fetch、SSE、websocket,详情移步另一位博主,没错我查阅的资料就是他的文章,链接chatGPT流式输出前端实现fetch、SSE、websocket_fetch sse-CSDN博客

我用的是fetch,fetch 本身不直接支持流式输出,但你可以使用fetch ai 中的 ReadableStream实现流式数据处理。

什么是fetch?

简单来说fetch 是一种 HTTP 数据请求的方式, XMLHttpRequest(以下简称 XHR)的一种替代方案。与基于回调的API的XMLHttpRequest不同,fetch是基于Promise的,可以链式分块化地处理数据,更重要的是Fetch API能够处理流式响应。

更多关于fetch 的文档可以参考以下文档,我们重点讲fetch 处理流式响应

fetch 官方文档: https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API

fetch处理流式响应

要实现 fetch 的流式输出,关键在于如何正确地处理返回的 ReadableStream 对象。ReadableStream 是 HTML 标准的一部分,它代表了一个可以从内部读取数据的源头。在 Fetch API 中,Response 对象的 body 属性就是一个 ReadableStream 实例。

  return fetch(input, { ...init, headers })
    .then((res) => {
      console.log('res.body', res.body);
      if (res.ok) return Promise.resolve(res);
    })

image.png

实现步骤:

1.创建一个阅读器并将流锁定到它。

 //创建一个阅读器并将流锁定到它。当流被锁定时,在这个阅读器发布之前,无法获得其他阅读器
      const reader = res_.body.getReader();
      return reader.read().then(({ value, done }) => {
        console.log('value', value);
      });

image.png
可以看到阅读器一次只接收一次响应数据,且接收到的数据为utf-8编码数据,这时我们需要让阅读器重复获取数据(发布这个阅读器),直到数据获取完或者手动中止为止,并且还要对 utf-8编码数据进行转码

2.发布阅读器我们 可以使用递归函数去重复发布阅读器

 const reader = res_.body.getReader();
      return reader.read().then(function push({ value, done }) {
        console.log('done, value', done, value);
        return reader.read().then(push);
 });

3.转码的话我们需要使用 文本解码器(TextDecoder)进行转码。

 const reader = res_.body.getReader();
      const utf8Decoder = new TextDecoder('utf-8');
      return reader.read().then(function push({ value, done }) {
        let _value = value ? utf8Decoder.decode(value, { stream: true }) : '';
        console.log('转码后的数据', _value);
        return reader.read().then(push);
      })

image.png

4.中止fetch要中止不完整的fetch()操作,请使用AbortController和AbortSignal接口。

定义

const controllerRef = useRef(null);
 controllerRef.current = new AbortController();
      const signal = controllerRef.current.signal;
      fetch(input, { headers, signal })

使用

 <button onClick={() => controllerRef.current.abort()}>停止生成 </button>

5.完整代码

//存储中断器
 const controllerRef = useRef(null);

 const getStreamData = () => {
   const input = 'xxxxxx';
   //fetch 其它配置项
   const token = utils.getToken();
   const headers = {
     ...(token ? { Authorization: 'Bearer ' + token } : null),
   };

   controllerRef.current = new AbortController();
   const signal = controllerRef.current.signal;

   //存储拿到的数据
   let outline = '';

   fetch(input, { headers, signal })
     .then((res) => {
       //res.ok 表示成功状态
       if (res.ok) return Promise.resolve(res);
     })
     .then((res_) => {
       //创建一个阅读器并将流锁定到它。当流被锁定时,在这个阅读器发布之前,无法获得其他阅读器
       const reader = res_.body.getReader();
       const utf8Decoder = new TextDecoder('utf-8');
       //定义一个对象存储每次拿到的数据
       return reader.read().then(function push({ value, done }) {
         if (done) {
           //表示数据流结束
           return outline;
         } else {
           let _value = value ? utf8Decoder.decode(value, { stream: true }) : '';
           let _data = _value?.split('\n\n') || [];
           for (const it of _data) {
             let _it = it.replace(/^(data: \[DONE\])|^(data:)/, '');
             if (_it) {
               let res: { code; data; msg } = JSON.parse(_it);
               if (res?.code == 0) {
                 outline = outline + res?.data?.text;
                 console.log('接收到的数据为:', outline);
               } else {
                 return Promise.reject(res);
               }
             }
           }
         }
         return reader.read().then(push);
       });
     });
 };
标签:
评论 0
/ 1000
0
0
收藏