🔧 chore(base.py): refactor setup_callbacks function to improve readability and maintainability
🔧 chore(base.py): refactor get_langfuse_callback function to improve readability and maintainability 🔧 chore(base.py): refactor get_result_and_steps function to improve readability and maintainability 🔧 chore(settings.py): add LANGFUSE_SECRET_KEY and LANGFUSE_PUBLIC_KEY optional settings
This commit is contained in:
parent
2d2641c285
commit
6fe486b164
2 changed files with 33 additions and 2 deletions
|
|
@ -8,6 +8,34 @@ from langflow.utils.logger import logger
|
|||
from langchain.agents.agent import AgentExecutor
|
||||
|
||||
|
||||
def setup_callbacks(sync, **kwargs):
|
||||
"""Setup callbacks for langchain object"""
|
||||
callbacks = []
|
||||
if sync:
|
||||
callbacks.append(StreamingLLMCallbackHandler(**kwargs))
|
||||
else:
|
||||
callbacks.append(AsyncStreamingLLMCallbackHandler(**kwargs))
|
||||
|
||||
if langfuse_callback := get_langfuse_callback():
|
||||
callbacks.append(langfuse_callback)
|
||||
return callbacks
|
||||
|
||||
|
||||
def get_langfuse_callback():
|
||||
from langflow.settings import settings
|
||||
|
||||
if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY:
|
||||
try:
|
||||
from langfuse.callback import CallbackHandler
|
||||
|
||||
return CallbackHandler(
|
||||
settings.LANGFUSE_PUBLIC_KEY, settings.LANGFUSE_SECRET_KEY
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error initializing langfuse callback: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs):
|
||||
"""Get result and thought from extracted json"""
|
||||
|
||||
|
|
@ -27,12 +55,12 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa
|
|||
logger.error(f"Error fixing memory inputs: {exc}")
|
||||
|
||||
try:
|
||||
async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)]
|
||||
async_callbacks = setup_callbacks(sync=False, **kwargs)
|
||||
output = await langchain_object.acall(inputs, callbacks=async_callbacks)
|
||||
except Exception as exc:
|
||||
# make the error message more informative
|
||||
logger.debug(f"Error: {str(exc)}")
|
||||
sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)]
|
||||
sync_callbacks = setup_callbacks(sync=True, **kwargs)
|
||||
output = langchain_object(inputs, callbacks=sync_callbacks)
|
||||
|
||||
intermediate_steps = (
|
||||
|
|
|
|||
|
|
@ -35,6 +35,9 @@ class Settings(BaseSettings):
|
|||
REMOVE_API_KEYS: bool = False
|
||||
COMPONENTS_PATH: List[str] = []
|
||||
|
||||
LANGFUSE_SECRET_KEY: Optional[str] = None
|
||||
LANGFUSE_PUBLIC_KEY: Optional[str] = None
|
||||
|
||||
@validator("DATABASE_URL", pre=True)
|
||||
def set_database_url(cls, value):
|
||||
if not value:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue