refactor(docling): extract processing logic to separate worker process (#9393)

* refactor(docling): extract processing logic to separate worker process

- Move Docling processing to dedicated worker function
- Preserve all original pipeline configuration logic
- Maintain support for standard and VLM pipelines
- Keep complete OCR engine configuration
- Add proper error handling for multiprocessing context

* Update src/backend/base/langflow/components/docling/__init__.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update src/backend/base/langflow/components/docling/__init__.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* Update src/backend/base/langflow/components/docling/__init__.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update src/backend/base/langflow/components/docling/docling_inline.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* feat: add process monitoring and timeout handling

* fix: ruff check

* feat: add graceful signal handling to docling worker

* friendlier error message

* Swallow stack trace on interrupt

* [autofix.ci] apply automated fixes

* fix: ruff error

* fix: mypy error

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>
This commit is contained in:
Ítalo Johnny 2025-08-20 13:56:21 -03:00 committed by GitHub
commit ea918df11d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 300 additions and 60 deletions

View file

@ -1,7 +1,13 @@
from __future__ import annotations
import signal
import sys
import traceback
from contextlib import suppress
from typing import TYPE_CHECKING, Any
from loguru import logger
from langflow.components._importing import import_mod
if TYPE_CHECKING:
@ -41,3 +47,195 @@ def __getattr__(attr_name: str) -> Any:
def __dir__() -> list[str]:
return list(__all__)
def docling_worker(file_paths: list[str], queue, pipeline: str, ocr_engine: str):
"""Worker function for processing files with Docling in a separate process."""
# Signal handling for graceful shutdown
shutdown_requested = False
def signal_handler(signum: int, frame) -> None: # noqa: ARG001
"""Handle shutdown signals gracefully."""
nonlocal shutdown_requested
signal_names: dict[int, str] = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"}
signal_name = signal_names.get(signum, f"signal {signum}")
logger.debug(f"Docling worker received {signal_name}, initiating graceful shutdown...")
shutdown_requested = True
# Send shutdown notification to parent process
with suppress(Exception):
queue.put({"error": f"Worker interrupted by {signal_name}", "shutdown": True})
# Exit gracefully
sys.exit(0)
def check_shutdown() -> None:
"""Check if shutdown was requested and exit if so."""
if shutdown_requested:
logger.info("Shutdown requested, exiting worker...")
with suppress(Exception):
queue.put({"error": "Worker shutdown requested", "shutdown": True})
sys.exit(0)
# Register signal handlers early
try:
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.debug("Signal handlers registered for graceful shutdown")
except (OSError, ValueError) as e:
# Some signals might not be available on all platforms
logger.warning(f"Warning: Could not register signal handlers: {e}")
# Check for shutdown before heavy imports
check_shutdown()
try:
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.pipeline_options import (
OcrOptions,
PdfPipelineOptions,
VlmPipelineOptions,
)
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.models.factories import get_ocr_factory
from docling.pipeline.vlm_pipeline import VlmPipeline
# Check for shutdown after imports
check_shutdown()
logger.debug("Docling dependencies loaded successfully")
except ModuleNotFoundError:
msg = (
"Docling is an optional dependency of Langflow. "
"Install with `uv pip install 'langflow[docling]'` "
"or refer to the documentation"
)
queue.put({"error": msg})
return
except ImportError as e:
# A different import failed (e.g., a transitive dependency); preserve details.
queue.put({"error": f"Failed to import a Docling dependency: {e}"})
return
except KeyboardInterrupt:
logger.warning("KeyboardInterrupt during imports, exiting...")
queue.put({"error": "Worker interrupted during imports", "shutdown": True})
return
# Configure the standard PDF pipeline
def _get_standard_opts() -> PdfPipelineOptions:
check_shutdown() # Check before heavy operations
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = ocr_engine != ""
if pipeline_options.do_ocr:
ocr_factory = get_ocr_factory(
allow_external_plugins=False,
)
ocr_options: OcrOptions = ocr_factory.create_options(
kind=ocr_engine,
)
pipeline_options.ocr_options = ocr_options
return pipeline_options
# Configure the VLM pipeline
def _get_vlm_opts() -> VlmPipelineOptions:
check_shutdown() # Check before heavy operations
return VlmPipelineOptions()
# Configure the main format options and create the DocumentConverter()
def _get_converter() -> DocumentConverter:
check_shutdown() # Check before heavy operations
if pipeline == "standard":
pdf_format_option = PdfFormatOption(
pipeline_options=_get_standard_opts(),
)
elif pipeline == "vlm":
pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts())
else:
msg = f"Unknown pipeline: {pipeline!r}"
raise ValueError(msg)
format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}
return DocumentConverter(format_options=format_options)
try:
# Check for shutdown before creating converter (can be slow)
check_shutdown()
logger.info(f"Initializing {pipeline} pipeline with OCR: {ocr_engine or 'disabled'}")
converter = _get_converter()
# Check for shutdown before processing files
check_shutdown()
logger.info(f"Starting to process {len(file_paths)} files...")
# Process files with periodic shutdown checks
results = []
for i, file_path in enumerate(file_paths):
# Check for shutdown before processing each file
check_shutdown()
logger.debug(f"Processing file {i + 1}/{len(file_paths)}: {file_path}")
try:
# Process single file (we can't easily interrupt convert_all)
single_result = converter.convert_all([file_path])
results.extend(single_result)
# Check for shutdown after each file
check_shutdown()
except (OSError, ValueError, RuntimeError, ImportError) as file_error:
# Handle specific file processing errors
logger.error(f"Error processing file {file_path}: {file_error}")
# Continue with other files, but check for shutdown
check_shutdown()
except Exception as file_error: # noqa: BLE001
# Catch any other unexpected errors to prevent worker crash
logger.error(f"Unexpected error processing file {file_path}: {file_error}")
# Continue with other files, but check for shutdown
check_shutdown()
# Final shutdown check before sending results
check_shutdown()
# Process the results while maintaining the original structure
processed_data = [
{"document": res.document, "file_path": str(res.input.file), "status": res.status.name}
if res.status == ConversionStatus.SUCCESS
else None
for res in results
]
logger.info(f"Successfully processed {len([d for d in processed_data if d])} files")
queue.put(processed_data)
except KeyboardInterrupt:
logger.warning("KeyboardInterrupt during processing, exiting gracefully...")
queue.put({"error": "Worker interrupted during processing", "shutdown": True})
return
except Exception as e: # noqa: BLE001
if shutdown_requested:
logger.exception("Exception occurred during shutdown, exiting...")
return
# Send any processing error to the main process with traceback
error_info = {"error": str(e), "traceback": traceback.format_exc()}
logger.error(f"Error in worker: {error_info}")
queue.put(error_info)
finally:
logger.info("Docling worker finishing...")
# Ensure we don't leave any hanging processes
if shutdown_requested:
logger.debug("Worker shutdown completed")
else:
logger.debug("Worker completed normally")

View file

@ -1,4 +1,9 @@
import time
from multiprocessing import Queue, get_context
from queue import Empty
from langflow.base.data import BaseFileComponent
from langflow.components.docling import docling_worker
from langflow.inputs import DropdownInput
from langflow.schema import Data
@ -69,73 +74,110 @@ class DoclingInlineComponent(BaseFileComponent):
*BaseFileComponent._base_outputs,
]
def _wait_for_result_with_process_monitoring(self, queue: Queue, proc, timeout: int = 300):
"""Wait for result from queue while monitoring process health.
Handles cases where process crashes without sending result.
"""
start_time = time.time()
while time.time() - start_time < timeout:
# Check if process is still alive
if not proc.is_alive():
# Process died, try to get any result it might have sent
try:
result = queue.get_nowait()
except Empty:
# Process died without sending result
msg = f"Worker process crashed unexpectedly without producing result. Exit code: {proc.exitcode}"
raise RuntimeError(msg) from None
else:
self.log("Process completed and result retrieved")
return result
# Poll the queue instead of blocking
try:
result = queue.get(timeout=1)
except Empty:
# No result yet, continue monitoring
continue
else:
self.log("Result received from worker process")
return result
# Overall timeout reached
msg = f"Process timed out after {timeout} seconds"
raise TimeoutError(msg)
def _terminate_process_gracefully(self, proc, timeout_terminate: int = 10, timeout_kill: int = 5):
"""Terminate process gracefully with escalating signals.
First tries SIGTERM, then SIGKILL if needed.
"""
if not proc.is_alive():
return
self.log("Attempting graceful process termination with SIGTERM")
proc.terminate() # Send SIGTERM
proc.join(timeout=timeout_terminate)
if proc.is_alive():
self.log("Process didn't respond to SIGTERM, using SIGKILL")
proc.kill() # Send SIGKILL
proc.join(timeout=timeout_kill)
if proc.is_alive():
self.log("Warning: Process still alive after SIGKILL")
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
try:
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.pipeline_options import (
OcrOptions,
PdfPipelineOptions,
VlmPipelineOptions,
)
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.models.factories import get_ocr_factory
from docling.pipeline.vlm_pipeline import VlmPipeline
except ImportError as e:
msg = (
"Docling is not installed. Please install it with `uv pip install docling` or"
" `uv pip install langflow[docling]`."
)
raise ImportError(msg) from e
# Configure the standard PDF pipeline
def _get_standard_opts() -> PdfPipelineOptions:
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = self.ocr_engine != ""
if pipeline_options.do_ocr:
ocr_factory = get_ocr_factory(
allow_external_plugins=False,
)
ocr_options: OcrOptions = ocr_factory.create_options(
kind=self.ocr_engine,
)
pipeline_options.ocr_options = ocr_options
return pipeline_options
# Configure the VLM pipeline
def _get_vlm_opts() -> VlmPipelineOptions:
return VlmPipelineOptions()
# Configure the main format options and create the DocumentConverter()
def _get_converter() -> DocumentConverter:
if self.pipeline == "standard":
pdf_format_option = PdfFormatOption(
pipeline_options=_get_standard_opts(),
)
elif self.pipeline == "vlm":
pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts())
format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}
return DocumentConverter(format_options=format_options)
file_paths = [file.path for file in file_list if file.path]
if not file_paths:
self.log("No files to process.")
return file_list
converter = _get_converter()
results = converter.convert_all(file_paths)
ctx = get_context("spawn")
queue: Queue = ctx.Queue()
proc = ctx.Process(
target=docling_worker,
args=(file_paths, queue, self.pipeline, self.ocr_engine),
)
processed_data: list[Data | None] = [
Data(data={"doc": res.document, "file_path": str(res.input.file)})
if res.status == ConversionStatus.SUCCESS
else None
for res in results
]
result = None
proc.start()
try:
result = self._wait_for_result_with_process_monitoring(queue, proc, timeout=300)
except KeyboardInterrupt:
self.log("Docling process cancelled by user")
result = []
except Exception as e:
self.log(f"Error during processing: {e}")
raise
finally:
# Improved cleanup with graceful termination
try:
self._terminate_process_gracefully(proc)
finally:
# Always close and cleanup queue resources
try:
queue.close()
queue.join_thread()
except Exception as e: # noqa: BLE001
# Ignore cleanup errors, but log them
self.log(f"Warning: Error during queue cleanup - {e}")
# Check if there was an error in the worker
if isinstance(result, dict) and "error" in result:
msg = result["error"]
if msg.startswith("Docling is not installed"):
raise ImportError(msg)
# Handle interrupt gracefully - return empty result instead of raising error
if "Worker interrupted by SIGINT" in msg or "shutdown" in result:
self.log("Docling process cancelled by user")
result = []
else:
raise RuntimeError(msg)
processed_data = [Data(data={"doc": r["document"], "file_path": r["file_path"]}) if r else None for r in result]
return self.rollup_data(file_list, processed_data)