🔨 refactor(types.py): move extract_input_variables_from_prompt import to interface.utils module

🔨 refactor(custom.py, loading.py, prompts/custom.py, run.py): update import statements to use extract_input_variables_from_prompt from interface.utils module
🔨 refactor(run.py): remove unused imports and functions
🔨 refactor(utils.py): add type hinting to extract_input_variables_from_prompt function and remove unused imports
The extract_input_variables_from_prompt function has been moved to the interface.utils module to improve code organization. The import statements in the affected modules have been updated to reflect this change. Unused imports and functions have been removed from the run.py module. Type hinting has been added to the extract_input_variables_from_prompt function in the interface.utils module.

🚀 feat(processing): add processing module with get_result_and_steps and fix_memory_inputs functions
The processing module was added to the project with two functions: get_result_and_steps and fix_memory_inputs. The get_result_and_steps function extracts the result and thought from a LangChain object and returns them. The fix_memory_inputs function checks if a LangChain object has a memory attribute and if that memory key exists in the object's input variables. If not, it gets a possible new memory key using the get_memory_key function and updates the memory keys using the update_memory_keys function.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-06-06 10:05:46 -03:00
commit 228f938cd8
9 changed files with 238 additions and 226 deletions

View file

@ -1,7 +1,8 @@
from typing import Any, Dict, List, Optional, Union
from langflow.graph.vertex.base import Vertex
from langflow.graph.utils import extract_input_variables_from_prompt, flatten_list
from langflow.graph.utils import flatten_list
from langflow.interface.utils import extract_input_variables_from_prompt
class AgentVertex(Vertex):

View file

@ -5,7 +5,7 @@ from langchain.memory.buffer import ConversationBufferMemory
from langchain.schema import BaseMemory
from pydantic import Field, root_validator
from langflow.graph.utils import extract_input_variables_from_prompt
from langflow.interface.utils import extract_input_variables_from_prompt
DEFAULT_SUFFIX = """"
Current conversation:

View file

@ -12,7 +12,6 @@ from langchain.agents.load_tools import (
_LLM_TOOLS,
)
from langchain.agents.loading import load_agent_from_config
from langflow.graph import Graph
from langchain.agents.tools import Tool
from langchain.base_language import BaseLanguageModel
from langchain.callbacks.base import BaseCallbackManager
@ -22,7 +21,6 @@ from pydantic import ValidationError
from langflow.interface.agents.custom import CUSTOM_AGENTS
from langflow.interface.importing.utils import get_function, import_by_type
from langflow.interface.run import fix_memory_inputs
from langflow.interface.toolkits.base import toolkits_creator
from langflow.interface.types import get_type_list
from langflow.interface.utils import load_file_into_dict
@ -163,37 +161,6 @@ def instantiate_utility(node_type, class_object, params):
return class_object(**params)
def load_flow_from_json(path: str, build=True):
"""Load flow from json file"""
# This is done to avoid circular imports
with open(path, "r", encoding="utf-8") as f:
flow_graph = json.load(f)
data_graph = flow_graph["data"]
nodes = data_graph["nodes"]
# Substitute ZeroShotPrompt with PromptTemplate
# nodes = replace_zero_shot_prompt_with_prompt_template(nodes)
# Add input variables
# nodes = payload.extract_input_variables(nodes)
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)
if build:
langchain_object = graph.build()
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
return langchain_object
return graph
def replace_zero_shot_prompt_with_prompt_template(nodes):
"""Replace ZeroShotPrompt with PromptTemplate"""
for node in nodes:

View file

@ -3,7 +3,7 @@ from typing import Dict, List, Optional, Type
from langchain.prompts import PromptTemplate
from pydantic import root_validator
from langflow.graph.utils import extract_input_variables_from_prompt
from langflow.interface.utils import extract_input_variables_from_prompt
# Steps to create a BaseCustomPrompt:
# 1. Create a prompt template that endes with:

View file

@ -1,10 +1,3 @@
import contextlib
import io
from typing import Any, Dict, List, Tuple
from langchain.schema import AgentAction
from langflow.api.callback import AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler # type: ignore
from langflow.cache.base import compute_dict_hash, load_cache, memoize_dict
from langflow.graph import Graph
from langflow.utils.logger import logger
@ -24,15 +17,6 @@ def load_langchain_object(data_graph, is_first_message=False):
return computed_hash, langchain_object
def load_or_build_langchain_object(data_graph, is_first_message=False):
"""
Load langchain object from cache if it exists, otherwise build it.
"""
if is_first_message:
build_langchain_object_with_caching.clear_cache()
return build_langchain_object_with_caching(data_graph)
@memoize_dict(maxsize=10)
def build_langchain_object_with_caching(data_graph):
"""
@ -40,16 +24,10 @@ def build_langchain_object_with_caching(data_graph):
"""
logger.debug("Building langchain object")
graph = build_graph(data_graph)
graph = Graph.from_payload(data_graph)
return graph.build()
def build_graph(data_graph):
nodes = data_graph["nodes"]
edges = data_graph["edges"]
return Graph(nodes, edges)
def build_langchain_object(data_graph):
"""
Build langchain object from data_graph.
@ -66,29 +44,6 @@ def build_langchain_object(data_graph):
return graph.build()
def process_graph_cached(data_graph: Dict[str, Any], message: str):
"""
Process graph by extracting input variables and replacing ZeroShotPrompt
with PromptTemplate,then run the graph and return the result and thought.
"""
# Load langchain object
is_first_message = len(data_graph.get("chatHistory", [])) == 0
langchain_object = load_or_build_langchain_object(data_graph, is_first_message)
logger.debug("Loaded langchain object")
if langchain_object is None:
# Raise user facing error
raise ValueError(
"There was an error loading the langchain_object. Please, check all the nodes and try again."
)
# Generate result and thought
logger.debug("Generating result and thought")
result, thought = get_result_and_thought(langchain_object, message)
logger.debug("Generated result and thought")
return {"result": str(result), "thought": thought.strip()}
def get_memory_key(langchain_object):
"""
Given a LangChain object, this function retrieves the current memory key from the object's memory attribute.
@ -124,147 +79,3 @@ def update_memory_keys(langchain_object, possible_new_mem_key):
langchain_object.memory.input_key = input_key
langchain_object.memory.output_key = output_key
langchain_object.memory.memory_key = possible_new_mem_key
def fix_memory_inputs(langchain_object):
"""
Given a LangChain object, this function checks if it has a memory attribute and if that memory key exists in the
object's input variables. If so, it does nothing. Otherwise, it gets a possible new memory key using the
get_memory_key function and updates the memory keys using the update_memory_keys function.
"""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
try:
if langchain_object.memory.memory_key in langchain_object.input_variables:
return
except AttributeError:
input_variables = (
langchain_object.prompt.input_variables
if hasattr(langchain_object, "prompt")
else langchain_object.input_keys
)
if langchain_object.memory.memory_key in input_variables:
return
possible_new_mem_key = get_memory_key(langchain_object)
if possible_new_mem_key is not None:
update_memory_keys(langchain_object, possible_new_mem_key)
async def get_result_and_steps(langchain_object, message: str, **kwargs):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
chat_input = None
memory_key = ""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
memory_key = langchain_object.memory.memory_key
if hasattr(langchain_object, "input_keys"):
for key in langchain_object.input_keys:
if key not in [memory_key, "chat_history"]:
chat_input = {key: message}
else:
chat_input = message # type: ignore
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = True
fix_memory_inputs(langchain_object)
try:
async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)]
output = await langchain_object.acall(chat_input, callbacks=async_callbacks)
except Exception as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)]
output = langchain_object(chat_input, callbacks=sync_callbacks)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
output.get(langchain_object.output_keys[0])
if isinstance(output, dict)
else output
)
thought = format_actions(intermediate_steps) if intermediate_steps else ""
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
return result, thought
def get_result_and_thought(langchain_object, message: str):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
chat_input = None
memory_key = ""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
memory_key = langchain_object.memory.memory_key
if hasattr(langchain_object, "input_keys"):
for key in langchain_object.input_keys:
if key not in [memory_key, "chat_history"]:
chat_input = {key: message}
else:
chat_input = message # type: ignore
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
try:
# if hasattr(langchain_object, "acall"):
# output = await langchain_object.acall(chat_input)
# else:
output = langchain_object(chat_input)
except ValueError as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
output = langchain_object.run(chat_input)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
output.get(langchain_object.output_keys[0])
if isinstance(output, dict)
else output
)
if intermediate_steps:
thought = format_actions(intermediate_steps)
else:
thought = output_buffer.getvalue()
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
return result, thought
def format_actions(actions: List[Tuple[AgentAction, str]]) -> str:
"""Format a list of (AgentAction, answer) tuples into a string."""
output = []
for action, answer in actions:
log = action.log
tool = action.tool
tool_input = action.tool_input
output.append(f"Log: {log}")
if "Action" not in log and "Action Input" not in log:
output.append(f"Tool: {tool}")
output.append(f"Tool Input: {tool_input}")
output.append(f"Answer: {answer}")
output.append("") # Add a blank line
return "\n".join(output)

View file

@ -2,6 +2,7 @@ import base64
import json
import os
from io import BytesIO
import re
import yaml
from langchain.base_language import BaseLanguageModel
@ -48,3 +49,8 @@ def try_setting_streaming_options(langchain_object, websocket):
llm.streaming = True
return langchain_object
def extract_input_variables_from_prompt(prompt: str) -> list[str]:
"""Extract input variables from prompt."""
return re.findall(r"{(.*?)}", prompt)

View file

@ -0,0 +1,55 @@
from langflow.api.v1.callback import (
AsyncStreamingLLMCallbackHandler,
StreamingLLMCallbackHandler,
)
from langflow.processing.process import fix_memory_inputs, format_actions
from langflow.utils.logger import logger
async def get_result_and_steps(langchain_object, message: str, **kwargs):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
chat_input = None
memory_key = ""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
memory_key = langchain_object.memory.memory_key
if hasattr(langchain_object, "input_keys"):
for key in langchain_object.input_keys:
if key not in [memory_key, "chat_history"]:
chat_input = {key: message}
else:
chat_input = message # type: ignore
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = True
fix_memory_inputs(langchain_object)
try:
async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)]
output = await langchain_object.acall(chat_input, callbacks=async_callbacks)
except Exception as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)]
output = langchain_object(chat_input, callbacks=sync_callbacks)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
output.get(langchain_object.output_keys[0])
if isinstance(output, dict)
else output
)
thought = format_actions(intermediate_steps) if intermediate_steps else ""
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
return result, thought

View file

@ -0,0 +1,172 @@
import contextlib
import io
from langchain.schema import AgentAction
import json
from langflow.interface.run import (
build_langchain_object_with_caching,
get_memory_key,
update_memory_keys,
)
from langflow.utils.logger import logger
from langflow.graph import Graph
from typing import Any, Dict, List, Tuple
def fix_memory_inputs(langchain_object):
"""
Given a LangChain object, this function checks if it has a memory attribute and if that memory key exists in the
object's input variables. If so, it does nothing. Otherwise, it gets a possible new memory key using the
get_memory_key function and updates the memory keys using the update_memory_keys function.
"""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
try:
if langchain_object.memory.memory_key in langchain_object.input_variables:
return
except AttributeError:
input_variables = (
langchain_object.prompt.input_variables
if hasattr(langchain_object, "prompt")
else langchain_object.input_keys
)
if langchain_object.memory.memory_key in input_variables:
return
possible_new_mem_key = get_memory_key(langchain_object)
if possible_new_mem_key is not None:
update_memory_keys(langchain_object, possible_new_mem_key)
def format_actions(actions: List[Tuple[AgentAction, str]]) -> str:
"""Format a list of (AgentAction, answer) tuples into a string."""
output = []
for action, answer in actions:
log = action.log
tool = action.tool
tool_input = action.tool_input
output.append(f"Log: {log}")
if "Action" not in log and "Action Input" not in log:
output.append(f"Tool: {tool}")
output.append(f"Tool Input: {tool_input}")
output.append(f"Answer: {answer}")
output.append("") # Add a blank line
return "\n".join(output)
def get_result_and_thought(langchain_object, message: str):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
chat_input = None
memory_key = ""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
memory_key = langchain_object.memory.memory_key
if hasattr(langchain_object, "input_keys"):
for key in langchain_object.input_keys:
if key not in [memory_key, "chat_history"]:
chat_input = {key: message}
else:
chat_input = message # type: ignore
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
try:
# if hasattr(langchain_object, "acall"):
# output = await langchain_object.acall(chat_input)
# else:
output = langchain_object(chat_input)
except ValueError as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
output = langchain_object.run(chat_input)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
output.get(langchain_object.output_keys[0])
if isinstance(output, dict)
else output
)
if intermediate_steps:
thought = format_actions(intermediate_steps)
else:
thought = output_buffer.getvalue()
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
return result, thought
def load_or_build_langchain_object(data_graph, is_first_message=False):
"""
Load langchain object from cache if it exists, otherwise build it.
"""
if is_first_message:
build_langchain_object_with_caching.clear_cache()
return build_langchain_object_with_caching(data_graph)
def process_graph_cached(data_graph: Dict[str, Any], message: str):
"""
Process graph by extracting input variables and replacing ZeroShotPrompt
with PromptTemplate,then run the graph and return the result and thought.
"""
# Load langchain object
is_first_message = len(data_graph.get("chatHistory", [])) == 0
langchain_object = load_or_build_langchain_object(data_graph, is_first_message)
logger.debug("Loaded langchain object")
if langchain_object is None:
# Raise user facing error
raise ValueError(
"There was an error loading the langchain_object. Please, check all the nodes and try again."
)
# Generate result and thought
logger.debug("Generating result and thought")
result, thought = get_result_and_thought(langchain_object, message)
logger.debug("Generated result and thought")
return {"result": str(result), "thought": thought.strip()}
def load_flow_from_json(path: str, build=True):
"""Load flow from json file"""
# This is done to avoid circular imports
with open(path, "r", encoding="utf-8") as f:
flow_graph = json.load(f)
data_graph = flow_graph["data"]
nodes = data_graph["nodes"]
# Substitute ZeroShotPrompt with PromptTemplate
# nodes = replace_zero_shot_prompt_with_prompt_template(nodes)
# Add input variables
# nodes = payload.extract_input_variables(nodes)
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)
if build:
langchain_object = graph.build()
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
return langchain_object
return graph