From ea918df11d4eb6d9cd9f243b871542e710754eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=8Dtalo=20Johnny?= Date: Wed, 20 Aug 2025 13:56:21 -0300 Subject: [PATCH] refactor(docling): extract processing logic to separate worker process (#9393) * refactor(docling): extract processing logic to separate worker process - Move Docling processing to dedicated worker function - Preserve all original pipeline configuration logic - Maintain support for standard and VLM pipelines - Keep complete OCR engine configuration - Add proper error handling for multiprocessing context * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/docling_inline.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * feat: add process monitoring and timeout handling * fix: ruff check * feat: add graceful signal handling to docling worker * friendlier error message * Swallow stack trace on interrupt * [autofix.ci] apply automated fixes * fix: ruff error * fix: mypy error --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jordan Frazier --- .../langflow/components/docling/__init__.py | 198 ++++++++++++++++++ .../components/docling/docling_inline.py | 162 ++++++++------ 2 files changed, 300 insertions(+), 60 deletions(-) diff --git a/src/backend/base/langflow/components/docling/__init__.py b/src/backend/base/langflow/components/docling/__init__.py index 76f6035a8..307bfd8e3 100644 --- a/src/backend/base/langflow/components/docling/__init__.py +++ b/src/backend/base/langflow/components/docling/__init__.py @@ -1,7 +1,13 @@ from __future__ import annotations +import signal +import sys +import traceback +from contextlib import suppress from typing import TYPE_CHECKING, Any +from loguru import logger + from langflow.components._importing import import_mod if TYPE_CHECKING: @@ -41,3 +47,195 @@ def __getattr__(attr_name: str) -> Any: def __dir__() -> list[str]: return list(__all__) + + +def docling_worker(file_paths: list[str], queue, pipeline: str, ocr_engine: str): + """Worker function for processing files with Docling in a separate process.""" + # Signal handling for graceful shutdown + shutdown_requested = False + + def signal_handler(signum: int, frame) -> None: # noqa: ARG001 + """Handle shutdown signals gracefully.""" + nonlocal shutdown_requested + signal_names: dict[int, str] = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"} + signal_name = signal_names.get(signum, f"signal {signum}") + + logger.debug(f"Docling worker received {signal_name}, initiating graceful shutdown...") + shutdown_requested = True + + # Send shutdown notification to parent process + with suppress(Exception): + queue.put({"error": f"Worker interrupted by {signal_name}", "shutdown": True}) + + # Exit gracefully + sys.exit(0) + + def check_shutdown() -> None: + """Check if shutdown was requested and exit if so.""" + if shutdown_requested: + logger.info("Shutdown requested, exiting worker...") + + with suppress(Exception): + queue.put({"error": "Worker shutdown requested", "shutdown": True}) + + sys.exit(0) + + # Register signal handlers early + try: + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + logger.debug("Signal handlers registered for graceful shutdown") + except (OSError, ValueError) as e: + # Some signals might not be available on all platforms + logger.warning(f"Warning: Could not register signal handlers: {e}") + + # Check for shutdown before heavy imports + check_shutdown() + + try: + from docling.datamodel.base_models import ConversionStatus, InputFormat + from docling.datamodel.pipeline_options import ( + OcrOptions, + PdfPipelineOptions, + VlmPipelineOptions, + ) + from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption + from docling.models.factories import get_ocr_factory + from docling.pipeline.vlm_pipeline import VlmPipeline + + # Check for shutdown after imports + check_shutdown() + logger.debug("Docling dependencies loaded successfully") + + except ModuleNotFoundError: + msg = ( + "Docling is an optional dependency of Langflow. " + "Install with `uv pip install 'langflow[docling]'` " + "or refer to the documentation" + ) + queue.put({"error": msg}) + return + except ImportError as e: + # A different import failed (e.g., a transitive dependency); preserve details. + queue.put({"error": f"Failed to import a Docling dependency: {e}"}) + return + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt during imports, exiting...") + queue.put({"error": "Worker interrupted during imports", "shutdown": True}) + return + + # Configure the standard PDF pipeline + def _get_standard_opts() -> PdfPipelineOptions: + check_shutdown() # Check before heavy operations + + pipeline_options = PdfPipelineOptions() + pipeline_options.do_ocr = ocr_engine != "" + if pipeline_options.do_ocr: + ocr_factory = get_ocr_factory( + allow_external_plugins=False, + ) + + ocr_options: OcrOptions = ocr_factory.create_options( + kind=ocr_engine, + ) + pipeline_options.ocr_options = ocr_options + return pipeline_options + + # Configure the VLM pipeline + def _get_vlm_opts() -> VlmPipelineOptions: + check_shutdown() # Check before heavy operations + return VlmPipelineOptions() + + # Configure the main format options and create the DocumentConverter() + def _get_converter() -> DocumentConverter: + check_shutdown() # Check before heavy operations + + if pipeline == "standard": + pdf_format_option = PdfFormatOption( + pipeline_options=_get_standard_opts(), + ) + elif pipeline == "vlm": + pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts()) + else: + msg = f"Unknown pipeline: {pipeline!r}" + raise ValueError(msg) + + format_options: dict[InputFormat, FormatOption] = { + InputFormat.PDF: pdf_format_option, + InputFormat.IMAGE: pdf_format_option, + } + + return DocumentConverter(format_options=format_options) + + try: + # Check for shutdown before creating converter (can be slow) + check_shutdown() + logger.info(f"Initializing {pipeline} pipeline with OCR: {ocr_engine or 'disabled'}") + + converter = _get_converter() + + # Check for shutdown before processing files + check_shutdown() + logger.info(f"Starting to process {len(file_paths)} files...") + + # Process files with periodic shutdown checks + results = [] + for i, file_path in enumerate(file_paths): + # Check for shutdown before processing each file + check_shutdown() + + logger.debug(f"Processing file {i + 1}/{len(file_paths)}: {file_path}") + + try: + # Process single file (we can't easily interrupt convert_all) + single_result = converter.convert_all([file_path]) + results.extend(single_result) + + # Check for shutdown after each file + check_shutdown() + + except (OSError, ValueError, RuntimeError, ImportError) as file_error: + # Handle specific file processing errors + logger.error(f"Error processing file {file_path}: {file_error}") + # Continue with other files, but check for shutdown + check_shutdown() + except Exception as file_error: # noqa: BLE001 + # Catch any other unexpected errors to prevent worker crash + logger.error(f"Unexpected error processing file {file_path}: {file_error}") + # Continue with other files, but check for shutdown + check_shutdown() + + # Final shutdown check before sending results + check_shutdown() + + # Process the results while maintaining the original structure + processed_data = [ + {"document": res.document, "file_path": str(res.input.file), "status": res.status.name} + if res.status == ConversionStatus.SUCCESS + else None + for res in results + ] + + logger.info(f"Successfully processed {len([d for d in processed_data if d])} files") + queue.put(processed_data) + + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt during processing, exiting gracefully...") + queue.put({"error": "Worker interrupted during processing", "shutdown": True}) + return + except Exception as e: # noqa: BLE001 + if shutdown_requested: + logger.exception("Exception occurred during shutdown, exiting...") + return + + # Send any processing error to the main process with traceback + error_info = {"error": str(e), "traceback": traceback.format_exc()} + logger.error(f"Error in worker: {error_info}") + queue.put(error_info) + finally: + logger.info("Docling worker finishing...") + # Ensure we don't leave any hanging processes + if shutdown_requested: + logger.debug("Worker shutdown completed") + else: + logger.debug("Worker completed normally") diff --git a/src/backend/base/langflow/components/docling/docling_inline.py b/src/backend/base/langflow/components/docling/docling_inline.py index 4ec94ac4b..15b7052a4 100644 --- a/src/backend/base/langflow/components/docling/docling_inline.py +++ b/src/backend/base/langflow/components/docling/docling_inline.py @@ -1,4 +1,9 @@ +import time +from multiprocessing import Queue, get_context +from queue import Empty + from langflow.base.data import BaseFileComponent +from langflow.components.docling import docling_worker from langflow.inputs import DropdownInput from langflow.schema import Data @@ -69,73 +74,110 @@ class DoclingInlineComponent(BaseFileComponent): *BaseFileComponent._base_outputs, ] + def _wait_for_result_with_process_monitoring(self, queue: Queue, proc, timeout: int = 300): + """Wait for result from queue while monitoring process health. + + Handles cases where process crashes without sending result. + """ + start_time = time.time() + + while time.time() - start_time < timeout: + # Check if process is still alive + if not proc.is_alive(): + # Process died, try to get any result it might have sent + try: + result = queue.get_nowait() + except Empty: + # Process died without sending result + msg = f"Worker process crashed unexpectedly without producing result. Exit code: {proc.exitcode}" + raise RuntimeError(msg) from None + else: + self.log("Process completed and result retrieved") + return result + + # Poll the queue instead of blocking + try: + result = queue.get(timeout=1) + except Empty: + # No result yet, continue monitoring + continue + else: + self.log("Result received from worker process") + return result + + # Overall timeout reached + msg = f"Process timed out after {timeout} seconds" + raise TimeoutError(msg) + + def _terminate_process_gracefully(self, proc, timeout_terminate: int = 10, timeout_kill: int = 5): + """Terminate process gracefully with escalating signals. + + First tries SIGTERM, then SIGKILL if needed. + """ + if not proc.is_alive(): + return + + self.log("Attempting graceful process termination with SIGTERM") + proc.terminate() # Send SIGTERM + proc.join(timeout=timeout_terminate) + + if proc.is_alive(): + self.log("Process didn't respond to SIGTERM, using SIGKILL") + proc.kill() # Send SIGKILL + proc.join(timeout=timeout_kill) + + if proc.is_alive(): + self.log("Warning: Process still alive after SIGKILL") + def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: - try: - from docling.datamodel.base_models import ConversionStatus, InputFormat - from docling.datamodel.pipeline_options import ( - OcrOptions, - PdfPipelineOptions, - VlmPipelineOptions, - ) - from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption - from docling.models.factories import get_ocr_factory - from docling.pipeline.vlm_pipeline import VlmPipeline - except ImportError as e: - msg = ( - "Docling is not installed. Please install it with `uv pip install docling` or" - " `uv pip install langflow[docling]`." - ) - raise ImportError(msg) from e - - # Configure the standard PDF pipeline - def _get_standard_opts() -> PdfPipelineOptions: - pipeline_options = PdfPipelineOptions() - pipeline_options.do_ocr = self.ocr_engine != "" - if pipeline_options.do_ocr: - ocr_factory = get_ocr_factory( - allow_external_plugins=False, - ) - - ocr_options: OcrOptions = ocr_factory.create_options( - kind=self.ocr_engine, - ) - pipeline_options.ocr_options = ocr_options - return pipeline_options - - # Configure the VLM pipeline - def _get_vlm_opts() -> VlmPipelineOptions: - return VlmPipelineOptions() - - # Configure the main format options and create the DocumentConverter() - def _get_converter() -> DocumentConverter: - if self.pipeline == "standard": - pdf_format_option = PdfFormatOption( - pipeline_options=_get_standard_opts(), - ) - elif self.pipeline == "vlm": - pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts()) - - format_options: dict[InputFormat, FormatOption] = { - InputFormat.PDF: pdf_format_option, - InputFormat.IMAGE: pdf_format_option, - } - - return DocumentConverter(format_options=format_options) - file_paths = [file.path for file in file_list if file.path] if not file_paths: self.log("No files to process.") return file_list - converter = _get_converter() - results = converter.convert_all(file_paths) + ctx = get_context("spawn") + queue: Queue = ctx.Queue() + proc = ctx.Process( + target=docling_worker, + args=(file_paths, queue, self.pipeline, self.ocr_engine), + ) - processed_data: list[Data | None] = [ - Data(data={"doc": res.document, "file_path": str(res.input.file)}) - if res.status == ConversionStatus.SUCCESS - else None - for res in results - ] + result = None + proc.start() + try: + result = self._wait_for_result_with_process_monitoring(queue, proc, timeout=300) + except KeyboardInterrupt: + self.log("Docling process cancelled by user") + result = [] + except Exception as e: + self.log(f"Error during processing: {e}") + raise + finally: + # Improved cleanup with graceful termination + try: + self._terminate_process_gracefully(proc) + finally: + # Always close and cleanup queue resources + try: + queue.close() + queue.join_thread() + except Exception as e: # noqa: BLE001 + # Ignore cleanup errors, but log them + self.log(f"Warning: Error during queue cleanup - {e}") + + # Check if there was an error in the worker + if isinstance(result, dict) and "error" in result: + msg = result["error"] + if msg.startswith("Docling is not installed"): + raise ImportError(msg) + # Handle interrupt gracefully - return empty result instead of raising error + if "Worker interrupted by SIGINT" in msg or "shutdown" in result: + self.log("Docling process cancelled by user") + result = [] + else: + raise RuntimeError(msg) + + processed_data = [Data(data={"doc": r["document"], "file_path": r["file_path"]}) if r else None for r in result] return self.rollup_data(file_list, processed_data)