使用langchain异步获取网络信息

2024-06-17 1607阅读

文章目录

  • 前言
  • 发现问题
  • 分析问题
  • 阻塞?
  • 阻塞!
  • 附:用什么阻塞streamlit进程

    前言

    从来没想过在接入通义千问的时候还会遇到NotImplementedError。实在难以理解,处理过后才明白问题。现在总结后给出结果。

    使用langchain异步获取网络信息
    (图片来源网络,侵删)

    发现问题

    我们来看个例子。就比如这段代码:

    摘自ranying666/langchain_tongyi_practical中的5-langchain-search-document_loaders.py

    loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"])
    html = loader.load()
    html2text = Html2TextTransformer()
    docs_transformed = html2text.transform_documents(html)
    

    如果把这段直接接入streamlit,那就直接报错:

    NotImplementedError

    比较离谱的是,只有这么一个错误,没有其他信息。

    分析问题

    很难理解为什么是这个错。

    随着排查的进行,发现问题好像出在AsyncChromiumLoader中。

    AsyncChromiumLoader继承自BaseLoader,而BaseLoader的load方法中,调用的却是lazy_load。先不用看这个方法的具体内容,就看这个方法的名字,你大概也能猜出来什么问题了:

    懒加载带来的未能实例化的问题。

    简单地说,就是:streamlit已在前面飞,loader.load()还在后面追。终于,追不上了,就爆炸了。而刚好的是,lazy_load中抛出的异常就是这个NotImplementedError。

    众所周知,streamlit构建网页的过程是单线程的。

    所以,当我们需要请求内容的时候,使用异步请求的AsyncChromiumLoader就会出现这种问题。

    那么,该怎么办呢?

    阻塞?

    怎么办呢?阻塞,对吧?很容易想到。

    于是会想当然的这么用:

    loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"])
    html = loader.load()
    while html is None:
      time.sleep(1)
    html2text = Html2TextTransformer()
    docs_transformed = html2text.transform_documents(html)
    

    看着很直观,检测html是否有返回值。

    如果真这么简单的话我也不会把它写在这里(🤷‍♂️)。

    结果就是,还是报错。这又是为什么呢?逻辑没问题呀?

    逻辑是没问题,问题出在底层。作为一个异步函数,他怎么可能没有返回值呢?

    我们来回顾一下load方法:

    # Sub-classes should not implement this method directly. Instead, they should implement the lazy load method.
    def load(self) -> List[Document]:
      """Load data into Document objects."""
      return list(self.lazy_load())
    

    那么,lazy_load是怎么回事呢?

    def lazy_load(self) -> Iterator[Document]:
      """A lazy loader for Documents."""
      if type(self).load != BaseLoader.load:
        return iter(self.load())
      raise NotImplementedError(
        f"{self.__class__.__name__} does not implement lazy_load()"
      )
    

    这里比较有意思的是,对实例化的AsyncChromiumLoader对象(就是这个self),判断AsyncChromiumLoader.load与BaseLoader.load是否一致。

    其实这里比较的是地址信息,因为子类如果重写了这个load方法,那么地址就会被改变。如果不一致的话,就会返回一个迭代器,这个迭代器就是为了后续过程中无论返回的是否是list,都能够迭代。

    听起来没问题。

    但是,懒加载呢?

    async def aload(self) -> List[Document]:
      """Load data into Document objects."""
      return [document async for document in self.alazy_load()]
    async def alazy_load(self) -> AsyncIterator[Document]:
      """A lazy loader for Documents."""
      iterator = await run_in_executor(None, self.lazy_load)
      done = object()
      while True:
        doc = await run_in_executor(None, next, iterator, done)  # type: ignore[call-arg, arg-type]
        if doc is done:
          break
        yield doc  # type: ignore[misc]
    

    比较神奇的就出现在这里了。alazy_load方法最终给出来的就是一个Document类的迭代器,然后最终通过yield给到调用方,直到doc在迭代过程中达到了done。

    但是呢,doc变量的结果是await run_in_executor(None, next, iterator, done),即使run_in_executor返回的是一个迭代器对象,最终由await进行处理,所以是有返回值的,但是返回的是未来需要返回的,是asyncio.futures.Future类。这一点完全可以类比Java中的Future对象。

    所以,最终而言,AsyncChromiumLoader.load并不是直到结束才返回值,而是在执行的过程中不断地通过yield给出返回值,只是在await最终处理为AsyncIterator[Document]类型。

    阻塞!

    为了让streamlit等待异步请求,就需要主线程停下来,直到请求结束了才能继续执行。

    那这回该怎么办呢?直接用asyncio与playwright给阻塞掉。

    首先,我们需要利用asyncio创建一个阻塞事件,并设置所有的事件都需要在阻塞事件结束后执行。

    其次,在执行这个阻塞之间的时候,我们依然使用异步请求,只不过是所有的事件都在等我们。

    于是,可以给出代码如下:

    import asyncio
    import platform
    from playwright.async_api import async_playwright
    # 阻塞事件
    async def fetch_page_content(url):
      async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto(url)
        content = await page.content()
        await browser.close()
        return content
    # 阻塞主进程
    @st.cache_data
    def load_documents(url):
      loop = None
      if platform.system() == 'Windows':
        loop = asyncio.ProactorEventLoop()
      elif platform.system() == 'Linux':
        loop = asyncio.new_event_loop()
      elif platform.system() == 'Darwin':
        loop = asyncio.SelectorEventLoop()
      else:
        return None
      html_content = loop.run_until_complete(fetch_page_content(url))
      html2text = Html2TextTransformer()
      document = Document(page_content=html_content)
      docs_transformed = list(html2text.transform_documents([document]))
      return docs_transformed
    

    其实这里面最核心的就是asyncio下的run_until_complete函数:

    def run_until_complete(self, future):
      """Run until the Future is done.
      If the argument is a coroutine, it is wrapped in a Task.
      WARNING: It would be disastrous to call run_until_complete()
      with the same coroutine twice -- it would wrap it in two
      different Tasks and that can't be good.
      Return the Future's result, or raise its exception.
      """
      self._check_closed()
      self._check_running()
      new_task = not futures.isfuture(future)
      future = tasks.ensure_future(future, loop=self)
      if new_task:
        # An exception is raised if the future didn't complete, so there
        # is no need to log the "destroy pending task" message
        future._log_destroy_pending = False
      future.add_done_callback(_run_until_complete_cb)
      try:
        self.run_forever()
      except:
        if new_task and future.done() and not future.cancelled():
          # The coroutine raised a BaseException. Consume the exception
          # to not log a warning, the caller doesn't have access to the
          # local task.
          future.exception()
        raise
      finally:
        future.remove_done_callback(_run_until_complete_cb)
      if not future.done():
        raise RuntimeError('Event loop stopped before Future completed.')
      return future.result()
    

    这个函数最大的特点就是会创建一个任务并执行。直到任务执行完成或者报错中断之前,其他所有任务都得等着这个任务的回调函数。

    于是,这个函数就阻塞了streamlit的进程,直到异步任务完成。

    附:用什么阻塞streamlit进程

    其实这段文字本来应该接在上面这段的。但是这个坑实在太神奇了,单独拉出来说明。

    这里面还有一个很神奇的坑:用什么东西阻塞。

    就像上面这段代码,针对Windows、Linux、Darwin,分别采用了ProactorEventLoop、new_event_loop、SelectorEventLoop阻塞streamlit进程。

    如果在Linux平台中使用ProactorEventLoop,那么streamlit进程依然不会阻塞,因为他们都只能在各自的操作系统中起作用。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]