AI大模型實戰(zhàn)篇:AI Agent設(shè)計模式 – LLM Compiler

0 評論 1987 瀏覽 4 收藏 18 分鐘

通過構(gòu)建有向無環(huán)圖DAG來表示任務(wù)之間的依賴關(guān)系,LLM Compiler能夠?qū)崿F(xiàn)任務(wù)的并行執(zhí)行,從而大幅降低總執(zhí)行時間。本文將詳細介紹LLM Compiler的原理、實現(xiàn)過程以及其在實際應(yīng)用中的優(yōu)勢。

在上篇文章《AI大模型實戰(zhàn)篇:AI Agent設(shè)計模式 – Plan & Execute》中,風(fēng)叔結(jié)合原理和具體源代碼,詳細介紹了AI Agent設(shè)計模式中的Plan-and-Execute。但是Plan-and-execute的局限性在于,每個任務(wù)是按順序執(zhí)行的,這可能會導(dǎo)致總執(zhí)行時間的增加。

一種有效改進的辦法是將每個任務(wù)表示為有向無環(huán)圖DAG,這樣可以讓多個任務(wù)并行執(zhí)行,大幅降低執(zhí)行總時間。

這就是本篇文章風(fēng)叔將為大家介紹的AI Agent設(shè)計模式,LLM Compiler。

01 LLM Compiler的概念

LLM Compiler是伯克利大學(xué)的SqueezeAILab于2023年12月提出的新項目。這個項目在ReWOO引入的變量分配的基礎(chǔ)上,進一步訓(xùn)練大語言模型生成一個有向無環(huán)圖(Directed Acyclic Graph,DAG,如下圖所示)類的規(guī)劃。DAG可以明確各步驟任務(wù)之間的依賴關(guān)系,從而并行執(zhí)行任務(wù),實現(xiàn)類似處理器“亂序執(zhí)行”的效果,可以大幅加速AI Agent完成任務(wù)的速度。

比如下圖的例子,向Agent提問“微軟的市值需要增加多少才能超過蘋果的市值?”,Planner并行搜索微軟的市值和蘋果的市值,然后進行合并計算。

1. LLM Compiler設(shè)計模式主要有以下組件:

  • Planner:輸出流式傳輸任務(wù)的DAG,每個任務(wù)都包含一個工具、參數(shù)和依賴項列表。相比ReWOO的Planner,依賴項列表是最大的不同。
  • Task Fetching Unit:調(diào)度并執(zhí)行任務(wù),一旦滿足任務(wù)的依賴性,該單元就會安排任務(wù)。由于許多工具涉及對搜索引擎或LLM的其他調(diào)用,因此額外的并行性可以顯著提高速度。
  • Joiner:由LLM根據(jù)整個歷史記錄(包括任務(wù)執(zhí)行結(jié)果),決定是否響應(yīng)最終答案或是否將進度重新傳遞回Planner。

2. 下圖是LLM Compiler的原理:

  • Planner接收來自用戶的輸入,輸出流式傳輸任務(wù)的DAG
  • Task Fetching Unit從式傳輸任務(wù)DAG中讀取任務(wù),通過處理工具并行執(zhí)行
  • Task Fetching Unit將狀態(tài)和結(jié)果傳遞給Joiner(或Replanner),Joiner來決定是將結(jié)果輸出給用戶,還是增加更多任務(wù)交由Task Fetching Unit處理

02 LLM Compiler的實現(xiàn)過程

下面,風(fēng)叔通過實際的源碼,詳細介紹LLM Compiler模式的實現(xiàn)方法。大家可以關(guān)注風(fēng)叔,回復(fù)關(guān)鍵詞【LLMC源碼】,獲取LLM Compiler設(shè)計模式的完整源代碼。

第一步 構(gòu)建工具Tools

首先,我們要定義Agent需要使用的工具。在這個例子中,我們將使用搜索引擎 + 計算器這兩個工具。

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from math_tools import get_math_tool

_get_pass(“TAVILY_API_KEY”)

calculate = get_math_tool(ChatOpenAI(model=”gpt-4-turbo-preview”))
search = TavilySearchResults(
max_results=1,
description=’tavily_search_results_json(query=”the search query”) – a search engine.’,
)

tools = [search, calculate]

第二步 構(gòu)建Planner

Planner接收用戶輸入,并生成一個待執(zhí)行的任務(wù)清單的DAG。

以下代碼構(gòu)建了Planner的提示模板,并將其與 LLM 和輸出解析器組合在一起,輸出解析器處理以下形式的任務(wù)列表。在Planner中,我們同時定義了replanner的Prompt,這個prompt提出了三項核心的約束

  • 啟動當(dāng)前計劃時,應(yīng)該從概述下一個計劃策略的“Thought”開始
  • 在當(dāng)前計劃中,絕不應(yīng)該重復(fù)上一個計劃中已經(jīng)執(zhí)行的操作
  • 必須從上一個任務(wù)索引的末尾繼續(xù)任務(wù)索引,不要重復(fù)任務(wù)索引

def create_planner(
llm: BaseChatModel, tools: Sequence[BaseTool], base_prompt: ChatPromptTemplate
):
tool_descriptions = “n”.join(
f”{i+1}. {tool.description}n”
for i, tool in enumerate(
tools
) ?# +1 to offset the 0 starting index, we want it count normally from 1.
)
planner_prompt = base_prompt.partial(
replan=””,
num_tools=len(tools)
+ 1, ?# Add one because we’re adding the join() tool at the end.
tool_descriptions=tool_descriptions,
)
replanner_prompt = base_prompt.partial(
replan=’ – You are given “Previous Plan” which is the plan that the previous agent created along with the execution results ‘
“(given as Observation) of each plan and a general thought (given as Thought) about the executed results.”
‘You MUST use these information to create the next plan under “Current Plan”.n’
‘ – When starting the Current Plan, you should start with “Thought” that outlines the strategy for the next plan.n’
” – In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.n”
” – You must continue the task index from the end of the previous one. Do not repeat task indices.”,
num_tools=len(tools) + 1,
tool_descriptions=tool_descriptions,
)

def should_replan(state: list):
# Context is passed as a system message
return isinstance(state[-1], SystemMessage)

def wrap_messages(state: list):
return {“messages”: state}

def wrap_and_get_last_index(state: list):
next_task = 0
for message in state[::-1]:
if isinstance(message, FunctionMessage):
next_task = message.additional_kwargs[“idx”] + 1
break
state[-1].content = state[-1].content + f” – Begin counting at : {next_task}”
return {“messages”: state}

return (
RunnableBranch(
(should_replan, wrap_and_get_last_index | replanner_prompt),
wrap_messages | planner_prompt,
)
| llm
| LLMCompilerPlanParser(tools=tools)
)
llm = ChatOpenAI(model=”gpt-4-turbo-preview”)
planner = create_planner(llm, tools, prompt)

第三步 構(gòu)建Task Fetching Unit

這個部分負責(zé)安排任務(wù),它接收以下格式的數(shù)據(jù)流。

tool:BaseTool,

dependencies:number[]

其核心思想是,一旦滿足依賴關(guān)系,就開始執(zhí)行工具,可以通過多線程實現(xiàn)。下面這段代碼的關(guān)鍵就在于schedule_tasks,會將所有任務(wù)處理成有向無環(huán)圖。在當(dāng)前任務(wù)存在尚未完成的依賴關(guān)系時,放入pending task;在當(dāng)前任務(wù)所有依賴關(guān)系都已完成時,執(zhí)行任務(wù)。

@as_runnable

def schedule_task(task_inputs, config):
task: Task = task_inputs[“task”]
observations: Dict[int, Any] = task_inputs[“observations”]
try:
observation = _execute_task(task, observations, config)
except Exception:
import traceback

observation = traceback.format_exception() ?# repr(e) +

observations[task[“idx”]] = observation
def schedule_pending_task(task: Task, observations: Dict[int, Any], retry_after: float = 0.2):
while True:
deps = task[“dependencies”]
if deps and (any([dep not in observations for dep in deps])):
# Dependencies not yet satisfied
time.sleep(retry_after)
continue
schedule_task.invoke({“task”: task, “observations”: observations})
break

@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
“””Group the tasks into a DAG schedule.”””
tasks = scheduler_input[“tasks”]
args_for_tasks = {}
messages = scheduler_input[“messages”]

observations = _get_observations(messages)
task_names = {}
originals = set(observations)

futures = []
retry_after = 0.25 ?# Retry every quarter second
with ThreadPoolExecutor() as executor:
for task in tasks:
deps = task[“dependencies”]
task_names[task[“idx”]] = (
task[“tool”] if isinstance(task[“tool”], str) else task[“tool”].name
)
args_for_tasks[task[“idx”]] = task[“args”]
if (
# Depends on other tasks
deps
and (any([dep not in observations for dep in deps]))
):
futures.append(
executor.submit(
schedule_pending_task, task, observations, retry_after
)
)
else:
# No deps or all deps satisfied,can schedule now
schedule_task.invoke(dict(task=task, observations=observations))
# futures.append(executor.submit(schedule_task.invoke dict(task=task, observations=observations)))

# All tasks have been submitted or enqueued
# Wait for them to complete
wait(futures)
# Convert observations to new tool messages to add to the state
new_observations = {
k: (task_names[k], args_for_tasks[k], observations[k])
for k in sorted(observations.keys() – originals)
}

tool_messages = [
FunctionMessage(
name=name, content=str(obs), additional_kwargs={“idx”: k, “args”: task_args}
)
for k, (name, task_args, obs) in new_observations.items()
]

return tool_messages

import itertools

@as_runnable
def plan_and_schedule(state):
messages = state[“messages”]
tasks = planner.stream(messages)
# Begin executing the planner immediately
try:
tasks = itertools.chain([next(tasks)], tasks)
except StopIteration:
# Handle the case where tasks is empty.
tasks = iter([])
scheduled_tasks = schedule_tasks.invoke(
{
“messages”: messages,
“tasks”: tasks,
}
)
return {“messages”: [scheduled_tasks]}

第四步 構(gòu)建Joiner

前面我們構(gòu)建了Planner和Task Fetching Unit,下一步我們需要構(gòu)建Joiner來處理工具的輸出,以及決定是否需要使用新的計劃并開啟新的循環(huán)。

class FinalResponse(BaseModel):
“””The final response/answer.”””
response: str

class Replan(BaseModel):
feedback: str = Field(
description=”Analysis of the previous attempts and recommendations on what needs to be fixed.”
)

class JoinOutputs(BaseModel):
“””Decide whether to replan or whether you can return the final response.”””
thought: str = Field(
description=”The chain of thought reasoning for the selected action”
)
action: Union[FinalResponse, Replan]

joiner_prompt = hub.pull(“wfh/llm-compiler-joiner”).partial(
examples=””
) ?# You can optionally add examples

llm = ChatOpenAI(model=”gpt-4-turbo-preview”)
runnable = create_structured_output_runnable(JoinOutputs, llm, joiner_prompt)

如果Agent需要繼續(xù)循環(huán),我們需要選擇狀態(tài)機內(nèi)的最新消息,并按照Planner的要求輸出相應(yīng)的格式。

def _parse_joiner_output(decision: JoinOutputs) -> List[BaseMessage]:
response = [AIMessage(content=f”Thought: {decision.thought}”)]
if isinstance(decision.action, Replan):
return response + [
SystemMessage(
content=f”Context from last attempt: {decision.action.feedback}”
)
]
else:

return {“messages”: response + [AIMessage(content=decision.action.response)]}
def select_recent_messages(state) -> dict:
messages = state[“messages”]
selected = []
for msg in messages[::-1]:
selected.append(msg)
if isinstance(msg, HumanMessage):
break

return {“messages”: selected[::-1]}
joiner = select_recent_messages | runnable | _parse_joiner_output
input_messages = [HumanMessage(content=example_question)] + tool_messages
joiner.invoke(input_messages)

第五步 構(gòu)建流程圖

下面,我們構(gòu)建流程圖,將Planner、Task Fetching Unit、Joiner等節(jié)點添加進來,循環(huán)執(zhí)行并輸出結(jié)果。

from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from typing import Annotated

class State(TypedDict):
messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

graph_builder.add_node(“plan_and_schedule”, plan_and_schedule)
graph_builder.add_node(“join”, joiner)
graph_builder.add_edge(“plan_and_schedule”, “join”)

def should_continue(state):
messages = state[“messages”]
if isinstance(messages[-1], AIMessage):
return END
return “plan_and_schedule”

graph_builder.add_conditional_edges(
start_key=”join”,
# Next, we pass in the function that will determine which node is called next.
condition=should_continue,
)

graph_builder.add_edge(START, “plan_and_schedule”)
chain = graph_builder.compile()

總結(jié)

通過前面三篇文章,按照遞進關(guān)系,風(fēng)叔依次介紹了REWOO、Plan-and-Execute和LLM Compiler三種更側(cè)重規(guī)劃能力的AI Agent設(shè)計模式。從最初的ReAct模式出發(fā),加入規(guī)劃能力即演變成REWOO;再加上Replan能力即演變成Plan-and-Execute;最后再加上DAG和并行處理能力,即演變成LLM Compiler。

在后續(xù)的文章中,風(fēng)叔將轉(zhuǎn)向另外幾種側(cè)重反思的AI Agent模式。下一篇文章,風(fēng)叔將介紹Agent左右互搏之術(shù),Basic Reflection。

本文由人人都是產(chǎn)品經(jīng)理作者【風(fēng)叔】,微信公眾號:【風(fēng)叔云】,原創(chuàng)/授權(quán) 發(fā)布于人人都是產(chǎn)品經(jīng)理,未經(jīng)許可,禁止轉(zhuǎn)載。

題圖來自Unsplash,基于 CC0 協(xié)議。

更多精彩內(nèi)容,請關(guān)注人人都是產(chǎn)品經(jīng)理微信公眾號或下載App
評論
評論請登錄
  1. 目前還沒評論,等你發(fā)揮!