ref: Propagate traceback in exception logs (#4030)

Propagate traceback in exception logs
This commit is contained in:
Christophe Bornet 2024-10-07 14:16:58 +02:00 committed by GitHub
commit 32b5da8d1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
57 changed files with 202 additions and 217 deletions

View file

@ -72,8 +72,8 @@ def run_migrations_online() -> None:
initialize_settings_service()
service_manager.register_factory(DatabaseServiceFactory())
connectable = get_db_service().engine
except Exception as e:
logger.error(f"Error getting database engine: {e}")
except Exception:
logger.exception("Error getting database engine")
url = os.getenv("LANGFLOW_DATABASE_URL")
url = url or config.get_main_option("sqlalchemy.url")
if url:

View file

@ -74,8 +74,8 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
# This is to emulate the stream of tokens
for resp in resps:
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
except Exception as exc:
logger.error(f"Error sending response: {exc}")
except Exception:
logger.exception("Error sending response")
async def on_tool_error(
self,

View file

@ -60,8 +60,8 @@ async def try_running_celery_task(vertex, user_id):
task = build_vertex.delay(vertex)
vertex.task_id = task.id
except Exception as exc:
logger.debug(f"Error running task in celery: {exc}")
except Exception:
logger.opt(exception=True).debug("Error running task in celery")
vertex.task_id = None
await vertex.build(user_id=user_id)
return vertex
@ -135,8 +135,7 @@ async def retrieve_vertices_order(
)
if "stream or streaming set to True" in str(exc):
raise HTTPException(status_code=400, detail=str(exc)) from exc
logger.error(f"Error checking build status: {exc}")
logger.exception(exc)
logger.exception("Error checking build status")
raise HTTPException(status_code=500, detail=str(exc)) from exc
@ -169,7 +168,7 @@ async def build_flow(
try:
first_layer = graph.sort_vertices(stop_component_id, start_component_id)
except Exception as exc:
logger.error(exc)
logger.exception(exc)
first_layer = graph.sort_vertices()
else:
first_layer = graph.sort_vertices()
@ -203,8 +202,7 @@ async def build_flow(
)
if "stream or streaming set to True" in str(exc):
raise HTTPException(status_code=400, detail=str(exc)) from exc
logger.error(f"Error checking build status: {exc}")
logger.exception(exc)
logger.exception("Error checking build status")
raise HTTPException(status_code=500, detail=str(exc)) from exc
async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse:
@ -241,7 +239,7 @@ async def build_flow(
tb = exc.formatted_traceback
else:
tb = traceback.format_exc()
logger.exception(f"Error building Component: {exc}")
logger.exception("Error building Component")
params = format_exception_message(exc)
message = {"errorMessage": params, "stackTrace": tb}
valid = False
@ -315,8 +313,7 @@ async def build_flow(
componentErrorMessage=str(exc),
),
)
logger.error(f"Error building Component: \n\n{exc}")
logger.exception(exc)
logger.exception("Error building Component")
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc
@ -524,7 +521,7 @@ async def build_vertex(
tb = exc.formatted_traceback
else:
tb = traceback.format_exc()
logger.exception(f"Error building Component: {exc}")
logger.exception("Error building Component")
params = format_exception_message(exc)
message = {"errorMessage": params, "stackTrace": tb}
valid = False
@ -602,8 +599,7 @@ async def build_vertex(
componentErrorMessage=str(exc),
),
)
logger.error(f"Error building Component: \n\n{exc}")
logger.exception(exc)
logger.exception("Error building Component")
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc
@ -695,7 +691,7 @@ async def build_vertex_stream(
raise ValueError(msg)
except Exception as exc:
logger.exception(f"Error building Component: {exc}")
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."

View file

@ -168,8 +168,8 @@ async def simple_run_flow_task(
api_key_user=api_key_user,
)
except Exception as exc:
logger.exception(f"Error running flow {flow.id} task: {exc}")
except Exception:
logger.exception(f"Error running flow {flow.id} task")
@router.post("/run/{flow_id_or_name}", response_model=RunResponse, response_model_exclude_none=True)
@ -280,7 +280,7 @@ async def simplified_run_flow(
logger.exception(exc)
raise APIException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, exception=exc, flow=flow) from exc
except InvalidChatInputException as exc:
logger.error(exc)
logger.exception(exc)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except Exception as exc:
logger.exception(exc)
@ -293,7 +293,6 @@ async def simplified_run_flow(
runErrorMessage=str(exc),
),
)
logger.exception(exc)
raise APIException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, exception=exc, flow=flow) from exc
@ -478,15 +477,15 @@ async def experimental_run_flow(
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
logger.error(f"Flow ID {flow_id_str} is not a valid UUID")
logger.exception(f"Flow ID {flow_id_str} is not a valid UUID")
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if f"Flow {flow_id_str} not found" in str(exc):
logger.error(f"Flow {flow_id_str} not found")
logger.exception(f"Flow {flow_id_str} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
if f"Session {session_id} not found" in str(exc):
logger.error(f"Session {session_id} not found")
logger.exception(f"Session {session_id} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
logger.exception(exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
@ -573,7 +572,7 @@ async def create_upload_file(
file_path=file_path,
)
except Exception as exc:
logger.error(f"Error saving file: {exc}")
logger.exception("Error saving file")
raise HTTPException(status_code=500, detail=str(exc)) from exc

View file

@ -170,7 +170,7 @@ def read_flows(
if example_flow.id not in flow_ids:
flows.append(example_flow) # type: ignore
except Exception as e:
logger.error(e)
logger.exception(e)
if remove_example_flows:
flows = [flow for flow in flows if flow.folder_id != folder.id]

View file

@ -19,5 +19,5 @@ def get_starter_projects(
try:
return get_starter_projects_dump()
except Exception as exc:
logger.error(exc)
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc

View file

@ -42,8 +42,8 @@ def get_optional_user_store_api_key(
return None
try:
return auth_utils.decrypt_api_key(user.store_api_key, settings_service)
except Exception as e:
logger.error(f"Failed to decrypt API key: {e}")
except Exception:
logger.exception("Failed to decrypt API key")
return user.store_api_key

View file

@ -133,9 +133,9 @@ def validate_prompt(prompt_template: str, silent_errors: bool = False) -> list[s
try:
PromptTemplate(template=prompt_template, input_variables=input_variables)
except Exception as exc:
logger.error(f"Invalid prompt: {exc}")
msg = f"Invalid prompt: {exc}"
logger.exception(msg)
if not silent_errors:
msg = f"Invalid prompt: {exc}"
raise ValueError(msg) from exc
return input_variables

View file

@ -103,8 +103,8 @@ class FlowTool(BaseTool):
tweaks = self.build_tweaks_dict(args, kwargs)
try:
run_id = self.graph.run_id if self.graph else None
except Exception as e:
logger.warning(f"Failed to set run_id: {e}")
except Exception:
logger.opt(exception=True).warning("Failed to set run_id")
run_id = None
run_outputs = await run_flow(
tweaks={key: {"input_value": value} for key, value in tweaks.items()},

View file

@ -77,7 +77,7 @@ class NotionPageUpdate(LCToolComponent):
parsed_properties = json.loads(properties)
except json.JSONDecodeError as e:
error_message = f"Invalid JSON format for properties: {e}"
logger.error(error_message)
logger.exception(error_message)
return error_message
else:
@ -98,15 +98,15 @@ class NotionPageUpdate(LCToolComponent):
if e.response is not None:
error_message += f"\nStatus code: {e.response.status_code}"
error_message += f"\nResponse body: {e.response.text}"
logger.error(error_message)
logger.exception(error_message)
return error_message
except requests.exceptions.RequestException as e:
error_message = f"An error occurred while making the request: {e}"
logger.error(error_message)
logger.exception(error_message)
return error_message
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
logger.error(error_message)
logger.exception(error_message)
return error_message
def __call__(self, *args, **kwargs):

View file

@ -87,13 +87,13 @@ class APIRequestComponent(Component):
try:
json_data = json.loads(parsed.data)
build_config["body"]["value"] = json_data
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON data: {e}")
except json.JSONDecodeError:
logger.exception("Error decoding JSON data")
else:
build_config["body"]["value"] = {}
except Exception as exc:
logger.error(f"Error parsing curl: {exc}")
msg = f"Error parsing curl: {exc}"
logger.exception(msg)
raise ValueError(msg) from exc
return build_config
@ -120,9 +120,9 @@ class APIRequestComponent(Component):
try:
body = json.loads(body)
except Exception as e:
logger.error(f"Error decoding JSON data: {e}")
body = None
msg = f"Error decoding JSON data: {e}"
logger.exception(msg)
body = None
raise ValueError(msg) from e
data = body if body else None

View file

@ -54,8 +54,8 @@ class SubFlowComponent(CustomComponent):
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception as e:
logger.error(f"Error getting flow {field_value}: {e}")
except Exception:
logger.exception(f"Error getting flow {field_value}")
return build_config

View file

@ -36,8 +36,8 @@ class AIMLEmbeddingsImpl(BaseModel, Embeddings):
httpx.RequestError,
json.JSONDecodeError,
KeyError,
) as e:
logger.error(f"Error occurred: {e}")
):
logger.exception("Error occurred")
raise
return embeddings # type: ignore

View file

@ -85,8 +85,8 @@ class FlowToolComponent(LCToolComponent):
graph = Graph.from_payload(flow_data.data["data"])
try:
graph.set_run_id(self.graph.run_id)
except Exception as e:
logger.warning(f"Failed to set run_id: {e}")
except Exception:
logger.opt(exception=True).warning("Failed to set run_id")
inputs = get_flow_inputs(graph)
tool = FlowTool(
name=self.name,

View file

@ -46,8 +46,8 @@ class SubFlowComponent(Component):
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception as e:
logger.error(f"Error getting flow {field_value}: {e}")
except Exception:
logger.exception(f"Error getting flow {field_value}")
return build_config

View file

@ -85,8 +85,8 @@ class ComposioAPIComponent(LCToolComponent):
if auth_schemes[0].auth_mode == "API_KEY":
return self._process_api_key_auth(entity, app)
return self._initiate_default_connection(entity, app)
except Exception as exc:
logger.error(f"Authorization error: {exc}")
except Exception:
logger.exception("Authorization error")
return "Error"
def _process_api_key_auth(self, entity: Any, app: str) -> str:

View file

@ -198,11 +198,11 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
msg = f"Invalid search type: {self.search_type}"
raise ValueError(msg)
except Exception as e:
logger.error(f"Search query failed: {e}")
msg = (
"Error occurred while querying the Elasticsearch VectorStore,"
" there is no Data into the VectorStore."
)
logger.exception(msg)
raise ValueError(msg) from e
return [
{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results

View file

@ -1,5 +1,4 @@
import json
import traceback
from typing import Any
from langchain_community.vectorstores import OpenSearchVectorSearch
@ -125,7 +124,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
from langchain_community.vectorstores import OpenSearchVectorSearch
except ImportError as e:
error_message = f"Failed to import required modules: {e}"
logger.error(error_message)
logger.exception(error_message)
raise ImportError(error_message) from e
try:
@ -141,7 +140,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
)
except Exception as e:
error_message = f"Failed to create OpenSearchVectorSearch instance: {e}"
logger.error(error_message)
logger.exception(error_message)
raise RuntimeError(error_message) from e
if self.ingest_data:
@ -168,8 +167,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
vector_store.add_documents(documents)
except Exception as e:
error_message = f"Error adding documents to Vector Store: {e}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
logger.exception(error_message)
raise RuntimeError(error_message) from e
else:
logger.debug("No documents to add to the Vector Store.")
@ -188,7 +186,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
hybrid_query = json.loads(self.hybrid_search_query)
except json.JSONDecodeError as e:
error_message = f"Invalid hybrid search query JSON: {e}"
logger.error(error_message)
logger.exception(error_message)
raise ValueError(error_message) from e
results = vector_store.client.search(index=self.index_name, body=hybrid_query)
@ -232,13 +230,12 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
error_message = f"Invalid search type:: {self.search_type}"
logger.error(error_message)
logger.exception(error_message)
raise ValueError(error_message)
except Exception as e:
error_message = f"Error during search: {e}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
logger.exception(error_message)
raise RuntimeError(error_message) from e
def search_documents(self) -> list[Data]:
@ -260,6 +257,5 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
return retrieved_data
except Exception as e:
error_message = f"Error during document search: {e}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
logger.exception(error_message)
raise RuntimeError(error_message) from e

View file

@ -340,8 +340,8 @@ class CodeParser:
for import_node in import_nodes:
self.parse_imports(import_node)
nodes.append(class_node)
except Exception as exc:
logger.error(f"Error finding base class node: {exc}")
except Exception:
logger.exception("Error finding base class node")
nodes.insert(0, node)
class_details = ClassCodeDetails(
name=node.name,

View file

@ -215,9 +215,8 @@ class DirectoryReader:
"""
try:
file_content = self.read_file_content(file_path)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error while reading file {file_path}: {exc}")
except Exception:
logger.exception(f"Error while reading file {file_path}")
return False, f"Could not read {file_path}"
if file_content is None:
@ -292,9 +291,8 @@ class DirectoryReader:
async def process_file_async(self, file_path):
try:
file_content = self.read_file_content(file_path)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error while reading file {file_path}: {exc}")
except Exception:
logger.exception(f"Error while reading file {file_path}")
return False, f"Could not read {file_path}"
if file_content is None:
@ -346,8 +344,8 @@ class DirectoryReader:
if validation_result:
try:
output_types = await self.get_output_types_from_code_async(result_content)
except Exception as exc:
logger.error(f"Error while getting output types from code: {exc}")
except Exception:
logger.exception("Error while getting output types from code")
output_types = [component_name_camelcase]
else:
output_types = [component_name_camelcase]

View file

@ -132,8 +132,8 @@ def build_invalid_menu_items(menu_item):
component_name, component_template = build_invalid_component(component)
menu_items[component_name] = component_template
logger.debug(f"Added {component_name} to invalid menu.")
except Exception as exc:
logger.exception(f"Error while creating custom component [{component_name}]: {exc}")
except Exception:
logger.exception(f"Error while creating custom component [{component_name}]")
return menu_items
@ -165,7 +165,6 @@ def build_menu_items(menu_item):
for component_name, component_template, component in menu_item["components"]:
try:
menu_items[component_name] = component_template
except Exception as exc:
logger.error(f"Error loading Component: {component['output_types']}")
logger.exception(f"Error while building custom component {component['output_types']}: {exc}")
except Exception:
logger.exception(f"Error while building custom component {component['output_types']}")
return menu_items

View file

@ -258,7 +258,7 @@ def run_build_inputs(
return custom_component.build_inputs(user_id=user_id)
# add_extra_fields(frontend_node, field_config, field_config.values())
except Exception as exc:
logger.error(f"Error running build inputs: {exc}")
logger.exception("Error running build inputs")
raise HTTPException(status_code=500, detail=str(exc)) from exc
@ -273,7 +273,7 @@ def get_component_instance(custom_component: CustomComponent, user_id: str | UUI
msg = "Invalid code type"
raise ValueError(msg)
except Exception as exc:
logger.error(f"Error while evaluating custom component code: {exc}")
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
@ -285,7 +285,7 @@ def get_component_instance(custom_component: CustomComponent, user_id: str | UUI
try:
return custom_class(_user_id=user_id, _code=custom_component._code)
except Exception as exc:
logger.error(f"Error while instantiating custom component: {exc}")
logger.exception("Error while instantiating custom component")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
@ -308,7 +308,7 @@ def run_build_config(
msg = "Invalid code type"
raise ValueError(msg)
except Exception as exc:
logger.error(f"Error while evaluating custom component code: {exc}")
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
@ -333,7 +333,7 @@ def run_build_config(
return build_config, custom_instance
except Exception as exc:
logger.error(f"Error while building field config: {exc}")
logger.exception("Error while building field config")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
@ -521,8 +521,8 @@ def update_field_dict(
)
build_config = dd_build_config
except Exception as exc:
logger.error(f"Error while running update_build_config: {exc}")
msg = f"Error while running update_build_config: {exc}"
logger.exception(msg)
raise UpdateBuildConfigError(msg) from exc
return build_config

View file

@ -121,8 +121,8 @@ class Graph:
self._snapshots: list[dict[str, Any]] = []
try:
self.tracing_service: TracingService | None = get_tracing_service()
except Exception as exc:
logger.error(f"Error getting tracing service: {exc}")
except Exception:
logger.exception("Error getting tracing service")
self.tracing_service = None
if start is not None and end is not None:
self._set_start_and_end(start, end)
@ -1407,8 +1407,8 @@ class Graph:
)
except Exception as exc:
if not isinstance(exc, ComponentBuildException):
logger.exception(f"Error building Component: \n\n{exc}")
raise exc
logger.exception("Error building Component")
raise
def get_vertex_edges(
self,
@ -1473,9 +1473,9 @@ class Graph:
logger.debug(f"Running layer {layer_index} with {len(tasks)} tasks, {current_batch}")
try:
next_runnable_vertices = await self._execute_tasks(tasks, lock=lock)
except Exception as e:
logger.error(f"Error executing tasks in layer {layer_index}: {e}")
raise e
except Exception:
logger.exception(f"Error executing tasks in layer {layer_index}")
raise
if not next_runnable_vertices:
break
to_process.extend(next_runnable_vertices)
@ -1748,7 +1748,7 @@ class Graph:
try:
first_layer = self.sort_vertices(stop_component_id, start_component_id)
except Exception as exc:
logger.error(exc)
logger.exception(exc)
first_layer = self.sort_vertices()
else:
first_layer = self.sort_vertices()

View file

@ -16,8 +16,8 @@ class GraphStateManager:
def __init__(self):
try:
self.state_service: StateService = get_state_service()
except Exception as e:
logger.debug(f"Error getting state service. Defaulting to InMemoryStateService: {e}")
except Exception:
logger.opt(exception=True).debug("Error getting state service. Defaulting to InMemoryStateService")
from langflow.services.state.service import InMemoryStateService
self.state_service = InMemoryStateService(get_settings_service())
@ -42,6 +42,6 @@ class GraphStateManager:
for callback in self.observers[key]:
try:
callback(key, new_state, append=True)
except Exception as e:
logger.error(f"Error in observer {callback} for key {key}: {e}")
except Exception:
logger.exception(f"Error in observer {callback} for key {key}")
logger.warning("Callbacks not implemented yet")

View file

@ -155,8 +155,8 @@ async def log_transaction(
with session_getter(get_db_service()) as session:
inserted = crud_log_transaction(session, transaction)
logger.debug(f"Logged transaction: {inserted.id}")
except Exception as e:
logger.error(f"Error logging transaction: {e}")
except Exception:
logger.exception("Error logging transaction")
def log_vertex_build(
@ -183,8 +183,8 @@ def log_vertex_build(
with session_getter(get_db_service()) as session:
inserted = crud_log_vertex_build(session, vertex_build)
logger.debug(f"Logged vertex build: {inserted.build_id}")
except Exception as e:
logger.exception(f"Error logging vertex build: {e}")
except Exception:
logger.exception("Error logging vertex build")
def rewrite_file_path(file_path: str):

View file

@ -383,8 +383,8 @@ def copy_profile_pictures():
shutil.copytree(origin, target, dirs_exist_ok=True)
logger.debug(f"Folder copied from '{origin}' to '{target}'")
except Exception as e:
logger.error(f"Error copying the folder: {e}")
except Exception:
logger.exception("Error copying the folder")
def get_project_data(project):
@ -578,9 +578,9 @@ def find_existing_flow(session, flow_id, flow_endpoint_name):
async def create_or_update_starter_projects(get_all_components_coro: Awaitable[dict]):
try:
all_types_dict = await get_all_components_coro
except Exception as e:
logger.exception(f"Error loading components: {e}")
raise e
except Exception:
logger.exception("Error loading components")
raise
with session_scope() as session:
new_folder = create_starter_folder(session)
starter_projects = load_starter_projects()
@ -603,7 +603,7 @@ async def create_or_update_starter_projects(get_all_components_coro: Awaitable[d
try:
Graph.from_payload(updated_project_data)
except Exception as e:
logger.error(e)
logger.exception(e)
if updated_project_data != project_data:
project_data = updated_project_data
# We also need to update the project data in the file

View file

@ -137,9 +137,8 @@ def update_params_with_load_from_db_fields(
except TypeError as exc:
raise exc
except Exception as exc:
logger.error(f"Failed to get value for {field} from custom component. Setting it to None. Error: {exc}")
except Exception:
logger.exception(f"Failed to get value for {field} from custom component. Setting it to None.")
params[field] = None
return params

View file

@ -94,8 +94,8 @@ def setup_llm_caching():
set_langchain_cache(settings_service.settings)
except ImportError:
logger.warning(f"Could not import {settings_service.settings.cache_type}. ")
except Exception as exc:
logger.warning(f"Could not setup LLM caching. Error: {exc}")
except Exception:
logger.opt(exception=True).warning("Could not setup LLM caching.")
def set_langchain_cache(settings):

View file

@ -111,8 +111,8 @@ def run_flow_from_json(
import nest_asyncio # type: ignore
nest_asyncio.apply()
except Exception as e:
logger.warning(f"Could not apply nest_asyncio: {e}")
except Exception:
logger.opt(exception=True).warning("Could not apply nest_asyncio")
if tweaks is None:
tweaks = {}
tweaks["stream"] = False

View file

@ -195,8 +195,8 @@ def configure(
rotation="10 MB", # Log rotation based on file size
serialize=True,
)
except Exception as exc:
logger.error(f"Error setting up log file: {exc}")
except Exception:
logger.exception("Error setting up log file")
if log_buffer.enabled():
logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)

View file

@ -100,7 +100,7 @@ def get_lifespan(fix_migration=False, socketio_server=None, version=None):
yield
except Exception as exc:
if "langflow migration --fix" not in str(exc):
logger.error(exc)
logger.exception(exc)
raise
# Shutdown message
rprint("[bold red]Shutting down Langflow...[/bold red]")

View file

@ -30,8 +30,8 @@ def get_langfuse_callback(trace_id):
try:
trace = langfuse.trace(name="langflow-" + trace_id, id=trace_id)
return trace.getNewHandler()
except Exception as exc:
logger.error(f"Error initializing langfuse callback: {exc}")
except Exception:
logger.exception("Error initializing langfuse callback")
return None

View file

@ -132,8 +132,7 @@ async def get_current_user_by_jwt(
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError as e:
logger.error(f"JWT decoding error: {e}")
logger.exception(e)
logger.exception("JWT decoding error")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@ -318,7 +317,7 @@ def create_refresh_token(refresh_token: str, db: Session = Depends(get_session))
return create_user_tokens(user_id, db)
except JWTError as e:
logger.error(f"JWT decoding error: {e}")
logger.exception("JWT decoding error")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",

View file

@ -236,8 +236,8 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]): # type: ignore
try:
self._client.ping()
return True
except redis.exceptions.ConnectionError as exc:
logger.error(f"RedisCache could not connect to the Redis server: {exc}")
except redis.exceptions.ConnectionError:
logger.exception("RedisCache could not connect to the Redis server")
return False
async def get(self, key, lock=None):

View file

@ -94,8 +94,8 @@ class DatabaseService(Service):
for pragma in pragmas_list:
try:
cursor.execute(pragma)
except OperationalError as oe:
logger.error(f"Failed to set PRAGMA {pragma}: ", {oe})
except OperationalError:
logger.exception(f"Failed to set PRAGMA {pragma}")
finally:
cursor.close()
@ -195,8 +195,8 @@ class DatabaseService(Service):
try:
self.init_alembic(alembic_cfg)
except Exception as exc:
logger.error(f"Error initializing alembic: {exc}")
msg = "Error initializing alembic"
logger.exception(msg)
raise RuntimeError(msg) from exc
else:
logger.info("Alembic already initialized")
@ -215,7 +215,7 @@ class DatabaseService(Service):
buffer.write(f"{datetime.now(tz=timezone.utc).astimezone()}: Checking migrations\n")
command.check(alembic_cfg)
except util.exc.AutogenerateDiffsDetected as exc:
logger.error(f"AutogenerateDiffsDetected: {exc}")
logger.exception("AutogenerateDiffsDetected")
if not fix:
msg = f"There's a mismatch between the models and the database.\n{exc}"
raise RuntimeError(msg) from exc
@ -230,9 +230,9 @@ class DatabaseService(Service):
try:
command.check(alembic_cfg)
break
except util.exc.AutogenerateDiffsDetected as exc:
except util.exc.AutogenerateDiffsDetected:
# downgrade to base and upgrade again
logger.warning(f"AutogenerateDiffsDetected: {exc}")
logger.opt(exception=True).warning("AutogenerateDiffsDetected")
command.downgrade(alembic_cfg, f"-{i}")
# wait for the database to be ready
time.sleep(3)
@ -258,7 +258,7 @@ class DatabaseService(Service):
available_columns = [col["name"] for col in inspector.get_columns(table_name)]
results.append(Result(name=table_name, type="table", success=True))
except sa.exc.NoSuchTableError:
logger.error(f"Missing table: {table_name}")
logger.exception(f"Missing table: {table_name}")
results.append(Result(name=table_name, type="table", success=False))
for column in expected_columns:
@ -288,8 +288,8 @@ class DatabaseService(Service):
except OperationalError as oe:
logger.warning(f"Table {table} already exists, skipping. Exception: {oe}")
except Exception as exc:
logger.error(f"Error creating table {table}: {exc}")
msg = f"Error creating table {table}"
logger.exception(msg)
raise RuntimeError(msg) from exc
# Now check if the required tables exist, if not, something went wrong.
@ -313,7 +313,7 @@ class DatabaseService(Service):
with self.with_session() as session:
teardown_superuser(settings_service, session)
except Exception as exc:
logger.error(f"Error tearing down database: {exc}")
except Exception:
logger.exception("Error tearing down database")
self.engine.dispose()

View file

@ -23,14 +23,14 @@ def initialize_database(fix_migration: bool = False):
# if the exception involves tables already existing
# we can ignore it
if "already exists" not in str(exc):
logger.error(f"Error creating DB and tables: {exc}")
msg = "Error creating DB and tables"
logger.exception(msg)
raise RuntimeError(msg) from exc
try:
database_service.check_schema_health()
except Exception as exc:
logger.error(f"Error checking schema health: {exc}")
msg = "Error checking schema health"
logger.exception(msg)
raise RuntimeError(msg) from exc
try:
database_service.run_migrations(fix=fix_migration)
@ -52,8 +52,8 @@ def initialize_database(fix_migration: bool = False):
# if the exception involves tables already existing
# we can ignore it
if "already exists" not in str(exc):
logger.error(exc)
raise exc
logger.exception(exc)
raise
logger.debug("Database initialized")
@ -62,8 +62,8 @@ def session_getter(db_service: DatabaseService):
try:
session = Session(db_service.engine)
yield session
except Exception as e:
logger.error("Session rollback because of exception:", e)
except Exception:
logger.exception("Session rollback because of exception")
session.rollback()
raise
finally:

View file

@ -192,8 +192,8 @@ def session_scope() -> Generator[Session, None, None]:
try:
yield session
session.commit()
except Exception as e:
logger.exception("An error occurred during the session scope.", e)
except Exception:
logger.exception("An error occurred during the session scope.")
session.rollback()
raise

View file

@ -34,9 +34,8 @@ class ServiceManager:
for factory in self.get_factories():
try:
self.register_factory(factory)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error initializing {factory}: {exc}")
except Exception:
logger.exception(f"Error initializing {factory}")
def register_factory(
self,

View file

@ -80,7 +80,7 @@ class LangfusePlugin(CallbackPlugin):
if trace:
return trace.getNewHandler()
except Exception as exc:
logger.error(f"Error initializing langfuse callback: {exc}")
except Exception:
logger.exception("Error initializing langfuse callback")
return None

View file

@ -40,8 +40,8 @@ class PluginService(Service):
and attr not in [CallbackPlugin, BasePlugin]
):
self.register_plugin(plugin_name, attr())
except Exception as exc:
logger.error(f"Error loading plugin {plugin_name}: {exc}")
except Exception:
logger.exception(f"Error loading plugin {plugin_name}")
def register_plugin(self, plugin_name, plugin_instance):
self.plugins[plugin_name] = plugin_instance

View file

@ -272,7 +272,7 @@ class Settings(BaseSettings):
copy2("./{db_file_name}", new_path)
logger.debug(f"Copied existing database to {new_path}")
except Exception:
logger.error("Failed to copy database, using default path")
logger.exception("Failed to copy database, using default path")
new_path = "./{db_file_name}"
else:
final_path = new_path

View file

@ -35,7 +35,7 @@ def write_secret_to_file(path: Path, value: str) -> None:
try:
set_secure_permissions(path)
except Exception:
logger.error("Failed to set secure permissions on secret key")
logger.exception("Failed to set secure permissions on secret key")
def read_secret_from_file(path: Path) -> str:

View file

@ -69,6 +69,6 @@ class InMemoryStateService(StateService):
for callback in self.observers[key]:
try:
callback(key, new_state, append=True)
except Exception as e:
logger.error(f"Error in observer {callback} for key {key}: {e}")
except Exception:
logger.exception(f"Error in observer {callback} for key {key}")
logger.warning("Callbacks not implemented yet")

View file

@ -41,9 +41,9 @@ class LocalStorageService(StorageService):
try:
await asyncio.get_event_loop().run_in_executor(None, write_file, file_path, data)
logger.info(f"File {file_name} saved successfully in flow {flow_id}.")
except Exception as e:
logger.error(f"Error saving file {file_name} in flow {flow_id}: {e}")
raise e
except Exception:
logger.exception(f"Error saving file {file_name} in flow {flow_id}")
raise
async def get_file(self, flow_id: str, file_name: str) -> bytes:
"""

View file

@ -28,10 +28,10 @@ class S3StorageService(StorageService):
self.s3_client.put_object(Bucket=self.bucket, Key=f"{folder}/{file_name}", Body=data)
logger.info(f"File {file_name} saved successfully in folder {folder}.")
except NoCredentialsError:
logger.error("Credentials not available for AWS S3.")
logger.exception("Credentials not available for AWS S3.")
raise
except ClientError as e:
logger.error(f"Error saving file {file_name} in folder {folder}: {e}")
except ClientError:
logger.exception(f"Error saving file {file_name} in folder {folder}")
raise
async def get_file(self, folder: str, file_name: str):
@ -47,8 +47,8 @@ class S3StorageService(StorageService):
response = self.s3_client.get_object(Bucket=self.bucket, Key=f"{folder}/{file_name}")
logger.info(f"File {file_name} retrieved successfully from folder {folder}.")
return response["Body"].read()
except ClientError as e:
logger.error(f"Error retrieving file {file_name} from folder {folder}: {e}")
except ClientError:
logger.exception(f"Error retrieving file {file_name} from folder {folder}")
raise
async def list_files(self, folder: str):
@ -64,8 +64,8 @@ class S3StorageService(StorageService):
files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]]
logger.info(f"{len(files)} files listed in folder {folder}.")
return files
except ClientError as e:
logger.error(f"Error listing files in folder {folder}: {e}")
except ClientError:
logger.exception(f"Error listing files in folder {folder}")
raise
async def delete_file(self, folder: str, file_name: str):
@ -79,8 +79,8 @@ class S3StorageService(StorageService):
try:
self.s3_client.delete_object(Bucket=self.bucket, Key=f"{folder}/{file_name}")
logger.info(f"File {file_name} deleted successfully from folder {folder}.")
except ClientError as e:
logger.error(f"Error deleting file {file_name} from folder {folder}: {e}")
except ClientError:
logger.exception(f"Error deleting file {file_name} from folder {folder}")
raise
async def teardown(self):

View file

@ -161,8 +161,8 @@ class StoreService(Service):
return response.json()
except HTTPError as exc:
raise exc
except Exception as exc:
logger.debug(f"Webhook failed: {exc}")
except Exception:
logger.opt(exception=True).debug("Webhook failed")
def build_tags_filter(self, tags: list[str]):
tags_filter: dict[str, Any] = {"tags": {"_and": []}}

View file

@ -72,8 +72,8 @@ class AnyIOBackend(TaskBackend):
self.tasks[task_id] = task_result
logger.info(f"Task {task_id} started.")
return task_id, task_result
except Exception as e:
logger.error(f"An error occurred while launching the task: {e}")
except Exception:
logger.exception("An error occurred while launching the task")
return None, None
def get_task(self, task_id: str) -> Any:

View file

@ -20,8 +20,8 @@ def check_celery_availability():
status = get_celery_worker_status(celery_app)
logger.debug(f"Celery status: {status}")
except Exception as exc:
logger.debug(f"Celery not available: {exc}")
except Exception:
logger.opt(exception=True).debug("Celery not available")
status = {"availability": None}
return status

View file

@ -51,8 +51,8 @@ class TelemetryService(Service):
func, payload, path = await self.telemetry_queue.get()
try:
await func(payload, path)
except Exception as e:
logger.error(f"Error sending telemetry data: {e}")
except Exception:
logger.exception("Error sending telemetry data")
finally:
self.telemetry_queue.task_done()
@ -71,12 +71,12 @@ class TelemetryService(Service):
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
except httpx.RequestError as e:
logger.error(f"Request error occurred: {e}")
except Exception as e:
logger.error(f"Unexpected error occurred: {e}")
except httpx.HTTPStatusError:
logger.exception("HTTP error occurred")
except httpx.RequestError:
logger.exception("Request error occurred")
except Exception:
logger.exception("Unexpected error occurred")
async def log_package_run(self, payload: RunPayload):
await self._queue_event((self.send_telemetry_data, payload, "run"))
@ -120,16 +120,16 @@ class TelemetryService(Service):
self._start_time = datetime.now(timezone.utc)
self.worker_task = asyncio.create_task(self.telemetry_worker())
asyncio.create_task(self.log_package_version())
except Exception as e:
logger.error(f"Error starting telemetry service: {e}")
except Exception:
logger.exception("Error starting telemetry service")
async def flush(self):
if self.do_not_track:
return
try:
await self.telemetry_queue.join()
except Exception as e:
logger.error(f"Error flushing logs: {e}")
except Exception:
logger.exception("Error flushing logs")
async def stop(self):
if self.do_not_track or self._stopping:
@ -144,8 +144,8 @@ class TelemetryService(Service):
with contextlib.suppress(asyncio.CancelledError):
await self.worker_task
await self.client.aclose()
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")
except Exception:
logger.exception("Error stopping tracing service")
async def teardown(self):
await self.stop()

View file

@ -57,11 +57,11 @@ class LangFuseTracer(BaseTracer):
self._callback = LangchainCallbackHandler(**config)
except ImportError:
logger.error("Could not import langfuse. Please install it with `pip install langfuse`.")
logger.exception("Could not import langfuse. Please install it with `pip install langfuse`.")
return False
except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
except Exception:
logger.opt(exception=True).debug("Error setting up LangSmith tracer")
return False
return True

View file

@ -41,8 +41,8 @@ class LangSmithTracer(BaseTracer):
)
self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()})
self._children: dict[str, RunTree] = {}
except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
except Exception:
logger.opt(exception=True).debug("Error setting up LangSmith tracer")
self._ready = False
@property
@ -57,7 +57,7 @@ class LangSmithTracer(BaseTracer):
self._client = Client()
except ImportError:
logger.error("Could not import langsmith. Please install it with `pip install langsmith`.")
logger.exception("Could not import langsmith. Please install it with `pip install langsmith`.")
return False
os.environ["LANGCHAIN_TRACING_V2"] = "true"
return True

View file

@ -46,8 +46,8 @@ class LangWatchTracer(BaseTracer):
name=name_without_id,
type="workflow",
)
except Exception as e:
logger.debug(f"Error setting up LangWatch tracer: {e}")
except Exception:
logger.opt(exception=True).debug("Error setting up LangWatch tracer")
self._ready = False
@property
@ -60,7 +60,7 @@ class LangWatchTracer(BaseTracer):
self._client = langwatch
except ImportError:
logger.error("Could not import langwatch. Please install it with `pip install langwatch`.")
logger.exception("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
return True

View file

@ -63,8 +63,8 @@ class TracingService(Service):
log_func, args = await self.logs_queue.get()
try:
await log_func(*args)
except Exception as e:
logger.error(f"Error processing log: {e}")
except Exception:
logger.exception("Error processing log")
finally:
self.logs_queue.task_done()
@ -74,14 +74,14 @@ class TracingService(Service):
try:
self.running = True
self.worker_task = asyncio.create_task(self.log_worker())
except Exception as e:
logger.error(f"Error starting tracing service: {e}")
except Exception:
logger.exception("Error starting tracing service")
async def flush(self):
try:
await self.logs_queue.join()
except Exception as e:
logger.error(f"Error flushing logs: {e}")
except Exception:
logger.exception("Error flushing logs")
async def stop(self):
try:
@ -94,8 +94,8 @@ class TracingService(Service):
self.worker_task.cancel()
self.worker_task = None
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")
except Exception:
logger.exception("Error stopping tracing service")
def _reset_io(self):
self.inputs = defaultdict(dict)
@ -109,8 +109,8 @@ class TracingService(Service):
self._initialize_langsmith_tracer()
self._initialize_langwatch_tracer()
self._initialize_langfuse_tracer()
except Exception as e:
logger.debug(f"Error initializing tracers: {e}")
except Exception:
logger.opt(exception=True).debug("Error initializing tracers")
def _initialize_langsmith_tracer(self):
project_name = os.getenv("LANGCHAIN_PROJECT", "Langflow")
@ -168,8 +168,8 @@ class TracingService(Service):
continue
try:
tracer.add_trace(trace_id, trace_name, trace_type, inputs, metadata, vertex)
except Exception as e:
logger.error(f"Error starting trace {trace_name}: {e}")
except Exception:
logger.exception(f"Error starting trace {trace_name}")
def _end_traces(self, trace_id: str, trace_name: str, error: Exception | None = None):
for tracer in self._tracers.values():
@ -183,8 +183,8 @@ class TracingService(Service):
error=error,
logs=self._logs[trace_name],
)
except Exception as e:
logger.error(f"Error ending trace {trace_name}: {e}")
except Exception:
logger.exception(f"Error ending trace {trace_name}")
def _end_all_traces(self, outputs: dict, error: Exception | None = None):
for tracer in self._tracers.values():
@ -192,8 +192,8 @@ class TracingService(Service):
continue
try:
tracer.end(self.inputs, outputs=self.outputs, error=error, metadata=outputs)
except Exception as e:
logger.error(f"Error ending all traces: {e}")
except Exception:
logger.exception("Error ending all traces")
async def end(self, outputs: dict, error: Exception | None = None):
self._end_all_traces(outputs, error)

View file

@ -171,6 +171,6 @@ def initialize_services(fix_migration: bool = False, socketio_server=None):
try:
get_db_service().migrate_flows_if_auto_login()
except Exception as exc:
logger.error(f"Error migrating flows: {exc}")
msg = "Error migrating flows"
logger.exception(msg)
raise RuntimeError(msg) from exc

View file

@ -47,8 +47,8 @@ class KubernetesSecretService(VariableService, Service):
name=secret_name,
data=variables,
)
except Exception as e:
logger.error(f"Error creating {var} variable: {e}")
except Exception:
logger.exception(f"Error creating {var} variable")
else:
logger.info("Skipping environment variable storage.")

View file

@ -74,7 +74,7 @@ class KubernetesSecretManager:
if e.status == 404:
# Secret doesn't exist, create a new one
return self.create_secret(secret_name, data)
logger.error(f"Error upserting secret {secret_name}: {e}")
logger.exception(f"Error upserting secret {secret_name}")
raise
def get_secret(self, name: str) -> dict | None:

View file

@ -62,8 +62,8 @@ class DatabaseVariableService(VariableService, Service):
_type=CREDENTIAL_TYPE,
session=session,
)
except Exception as e:
logger.error(f"Error creating {var} variable: {e}")
except Exception:
logger.exception(f"Error creating {var} variable")
else:
logger.info("Skipping environment variable storage.")