fix: Run docling processing in subprocess (#9541)
* fix: Run docling processing in subprocess * [autofix.ci] apply automated fixes * Update file.py * [autofix.ci] apply automated fixes * Update file.py * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * Update file.py * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Carlos Coelho <80289056+carlosrcoelho@users.noreply.github.com>
This commit is contained in:
parent
234ff867b9
commit
0118012a43
5 changed files with 312 additions and 386 deletions
|
|
@ -1,9 +1,21 @@
|
|||
"""Enhanced file component v2 with mypy and ruff compliance."""
|
||||
"""Enhanced file component with clearer structure and Docling isolation.
|
||||
|
||||
Notes:
|
||||
-----
|
||||
- Functionality is preserved with minimal behavioral changes.
|
||||
- ALL Docling parsing/export runs in a separate OS process to prevent memory
|
||||
growth and native library state from impacting the main Langflow process.
|
||||
- Standard text/structured parsing continues to use existing BaseFileComponent
|
||||
utilities (and optional threading via `parallel_load_data`).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from langflow.base.data.base_file import BaseFileComponent
|
||||
|
|
@ -24,51 +36,8 @@ if TYPE_CHECKING:
|
|||
from langflow.schema import DataFrame
|
||||
|
||||
|
||||
class MockConversionStatus(Enum):
|
||||
"""Mock ConversionStatus for fallback compatibility."""
|
||||
|
||||
SUCCESS = "success"
|
||||
FAILURE = "failure"
|
||||
|
||||
|
||||
class MockInputFormat(Enum):
|
||||
"""Mock InputFormat for fallback compatibility."""
|
||||
|
||||
PDF = "pdf"
|
||||
IMAGE = "image"
|
||||
|
||||
|
||||
class MockImageRefMode(Enum):
|
||||
"""Mock ImageRefMode for fallback compatibility."""
|
||||
|
||||
PLACEHOLDER = "placeholder"
|
||||
EMBEDDED = "embedded"
|
||||
|
||||
|
||||
class DoclingImports:
|
||||
"""Container for docling imports with type information."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
conversion_status: type[Enum],
|
||||
input_format: type[Enum],
|
||||
document_converter: type,
|
||||
image_ref_mode: type[Enum],
|
||||
strategy: str,
|
||||
) -> None:
|
||||
self.conversion_status = conversion_status
|
||||
self.input_format = input_format
|
||||
self.document_converter = document_converter
|
||||
self.image_ref_mode = image_ref_mode
|
||||
self.strategy = strategy
|
||||
|
||||
|
||||
class FileComponent(BaseFileComponent):
|
||||
"""Enhanced file component v2 that combines standard file loading with optional Docling processing and export.
|
||||
|
||||
This component supports all features of the standard File component, plus an advanced mode
|
||||
that enables Docling document processing and export to various formats (Markdown, HTML, etc.).
|
||||
"""
|
||||
"""File component with optional Docling processing (isolated in a subprocess)."""
|
||||
|
||||
display_name = "File"
|
||||
description = "Loads content from files with optional advanced document processing and export using Docling."
|
||||
|
|
@ -76,7 +45,7 @@ class FileComponent(BaseFileComponent):
|
|||
icon = "file-text"
|
||||
name = "File"
|
||||
|
||||
# Docling supported formats from original component
|
||||
# Docling-supported/compatible extensions; TEXT_FILE_TYPES are supported by the base loader.
|
||||
VALID_EXTENSIONS = [
|
||||
"adoc",
|
||||
"asciidoc",
|
||||
|
|
@ -110,12 +79,12 @@ class FileComponent(BaseFileComponent):
|
|||
*TEXT_FILE_TYPES,
|
||||
]
|
||||
|
||||
# Fixed export settings
|
||||
# Fixed export settings used when markdown export is requested.
|
||||
EXPORT_FORMAT = "Markdown"
|
||||
IMAGE_MODE = "placeholder"
|
||||
|
||||
# ---- Inputs / Outputs (kept as close to original as possible) -------------------
|
||||
_base_inputs = deepcopy(BaseFileComponent._base_inputs)
|
||||
|
||||
for input_item in _base_inputs:
|
||||
if isinstance(input_item, FileInput) and input_item.name == "path":
|
||||
input_item.real_time_refresh = True
|
||||
|
|
@ -175,6 +144,7 @@ class FileComponent(BaseFileComponent):
|
|||
advanced=True,
|
||||
show=False,
|
||||
),
|
||||
# Deprecated input retained for backward-compatibility.
|
||||
BoolInput(
|
||||
name="use_multithreading",
|
||||
display_name="[Deprecated] Use Multithreading",
|
||||
|
|
@ -202,8 +172,10 @@ class FileComponent(BaseFileComponent):
|
|||
Output(display_name="Raw Content", name="message", method="load_files_message"),
|
||||
]
|
||||
|
||||
def _path_value(self, template) -> list[str]:
|
||||
# Get current path value
|
||||
# ------------------------------ UI helpers --------------------------------------
|
||||
|
||||
def _path_value(self, template: dict) -> list[str]:
|
||||
"""Return the list of currently selected file paths from the template."""
|
||||
return template.get("path", {}).get("file_path", [])
|
||||
|
||||
def update_build_config(
|
||||
|
|
@ -212,65 +184,41 @@ class FileComponent(BaseFileComponent):
|
|||
field_value: Any,
|
||||
field_name: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Update build configuration to show/hide fields based on file count and advanced_mode."""
|
||||
"""Show/hide Advanced Parser and related fields based on selection context."""
|
||||
if field_name == "path":
|
||||
# Get current path value
|
||||
path_value = self._path_value(build_config)
|
||||
file_path = path_value[0] if len(path_value) > 0 else ""
|
||||
|
||||
# Show/hide Advanced Parser based on file count (only for single files)
|
||||
paths = self._path_value(build_config)
|
||||
file_path = paths[0] if paths else ""
|
||||
file_count = len(field_value) if field_value else 0
|
||||
if file_count == 1 and not file_path.endswith((".csv", ".xlsx", ".parquet")):
|
||||
build_config["advanced_mode"]["show"] = True
|
||||
else:
|
||||
build_config["advanced_mode"]["show"] = False
|
||||
build_config["advanced_mode"]["value"] = False # Reset to False when hidden
|
||||
|
||||
# Hide all advanced fields when Advanced Parser is not available
|
||||
advanced_fields = [
|
||||
"pipeline",
|
||||
"ocr_engine",
|
||||
"doc_key",
|
||||
"md_image_placeholder",
|
||||
"md_page_break_placeholder",
|
||||
]
|
||||
for field in advanced_fields:
|
||||
if field in build_config:
|
||||
build_config[field]["show"] = False
|
||||
# Advanced mode only for single (non-tabular) file
|
||||
allow_advanced = file_count == 1 and not file_path.endswith((".csv", ".xlsx", ".parquet"))
|
||||
build_config["advanced_mode"]["show"] = allow_advanced
|
||||
if not allow_advanced:
|
||||
build_config["advanced_mode"]["value"] = False
|
||||
for f in ("pipeline", "ocr_engine", "doc_key", "md_image_placeholder", "md_page_break_placeholder"):
|
||||
if f in build_config:
|
||||
build_config[f]["show"] = False
|
||||
|
||||
elif field_name == "advanced_mode":
|
||||
# Show/hide advanced fields based on advanced_mode (only if single file)
|
||||
advanced_fields = [
|
||||
"pipeline",
|
||||
"ocr_engine",
|
||||
"doc_key",
|
||||
"md_image_placeholder",
|
||||
"md_page_break_placeholder",
|
||||
]
|
||||
|
||||
for field in advanced_fields:
|
||||
if field in build_config:
|
||||
build_config[field]["show"] = field_value
|
||||
for f in ("pipeline", "ocr_engine", "doc_key", "md_image_placeholder", "md_page_break_placeholder"):
|
||||
if f in build_config:
|
||||
build_config[f]["show"] = bool(field_value)
|
||||
|
||||
return build_config
|
||||
|
||||
def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002
|
||||
"""Dynamically show outputs based on the number of files and their types."""
|
||||
"""Dynamically show outputs based on file count/type and advanced mode."""
|
||||
if field_name not in ["path", "advanced_mode"]:
|
||||
return frontend_node
|
||||
|
||||
# Add outputs based on the number of files in the path
|
||||
template = frontend_node.get("template", {})
|
||||
path_value = self._path_value(template)
|
||||
if len(path_value) == 0:
|
||||
paths = self._path_value(template)
|
||||
if not paths:
|
||||
return frontend_node
|
||||
|
||||
# Clear existing outputs
|
||||
frontend_node["outputs"] = []
|
||||
|
||||
if len(path_value) == 1:
|
||||
# We need to check if the file is structured content
|
||||
file_path = path_value[0] if field_name == "path" else frontend_node["template"]["path"]["file_path"][0]
|
||||
if len(paths) == 1:
|
||||
file_path = paths[0] if field_name == "path" else frontend_node["template"]["path"]["file_path"][0]
|
||||
if file_path.endswith((".csv", ".xlsx", ".parquet")):
|
||||
frontend_node["outputs"].append(
|
||||
Output(display_name="Structured Content", name="dataframe", method="load_files_structured"),
|
||||
|
|
@ -280,11 +228,8 @@ class FileComponent(BaseFileComponent):
|
|||
Output(display_name="Structured Content", name="json", method="load_files_json"),
|
||||
)
|
||||
|
||||
# Add outputs based on advanced mode
|
||||
advanced_mode = frontend_node.get("template", {}).get("advanced_mode", {}).get("value", False)
|
||||
|
||||
if advanced_mode:
|
||||
# Advanced mode: Structured Output, Markdown, and File Path
|
||||
frontend_node["outputs"].append(
|
||||
Output(display_name="Structured Output", name="advanced", method="load_files_advanced"),
|
||||
)
|
||||
|
|
@ -295,7 +240,6 @@ class FileComponent(BaseFileComponent):
|
|||
Output(display_name="File Path", name="path", method="load_files_path"),
|
||||
)
|
||||
else:
|
||||
# Normal mode: Raw Content and File Path
|
||||
frontend_node["outputs"].append(
|
||||
Output(display_name="Raw Content", name="message", method="load_files_message"),
|
||||
)
|
||||
|
|
@ -303,130 +247,16 @@ class FileComponent(BaseFileComponent):
|
|||
Output(display_name="File Path", name="path", method="load_files_path"),
|
||||
)
|
||||
else:
|
||||
# For multiple files, we show the files output (DataFrame format)
|
||||
# Advanced Parser is not available for multiple files
|
||||
frontend_node["outputs"].append(
|
||||
Output(display_name="Files", name="dataframe", method="load_files"),
|
||||
)
|
||||
# Multiple files => DataFrame output; advanced parser disabled
|
||||
frontend_node["outputs"].append(Output(display_name="Files", name="dataframe", method="load_files"))
|
||||
|
||||
return frontend_node
|
||||
|
||||
def _try_import_docling(self) -> DoclingImports | None:
|
||||
"""Try different import strategies for docling components."""
|
||||
# Try strategy 1: Latest docling structure
|
||||
try:
|
||||
from docling.datamodel.base_models import ConversionStatus, InputFormat # type: ignore[import-untyped]
|
||||
from docling.document_converter import DocumentConverter # type: ignore[import-untyped]
|
||||
from docling_core.types.doc import ImageRefMode # type: ignore[import-untyped]
|
||||
|
||||
self.log("Using latest docling import structure")
|
||||
return DoclingImports(
|
||||
conversion_status=ConversionStatus,
|
||||
input_format=InputFormat,
|
||||
document_converter=DocumentConverter,
|
||||
image_ref_mode=ImageRefMode,
|
||||
strategy="latest",
|
||||
)
|
||||
except ImportError as e:
|
||||
self.log(f"Latest docling structure failed: {e}")
|
||||
|
||||
# Try strategy 2: Alternative import paths
|
||||
try:
|
||||
from docling.document_converter import DocumentConverter # type: ignore[import-untyped]
|
||||
from docling_core.types.doc import ImageRefMode # type: ignore[import-untyped]
|
||||
|
||||
# Try to get ConversionStatus from different locations
|
||||
conversion_status: type[Enum] = MockConversionStatus
|
||||
input_format: type[Enum] = MockInputFormat
|
||||
|
||||
try:
|
||||
from docling_core.types import ConversionStatus, InputFormat # type: ignore[import-untyped]
|
||||
|
||||
conversion_status = ConversionStatus
|
||||
input_format = InputFormat
|
||||
except ImportError:
|
||||
try:
|
||||
from docling.datamodel import ConversionStatus, InputFormat # type: ignore[import-untyped]
|
||||
|
||||
conversion_status = ConversionStatus
|
||||
input_format = InputFormat
|
||||
except ImportError:
|
||||
# Use mock enums if we can't find them
|
||||
pass
|
||||
|
||||
self.log("Using alternative docling import structure")
|
||||
return DoclingImports(
|
||||
conversion_status=conversion_status,
|
||||
input_format=input_format,
|
||||
document_converter=DocumentConverter,
|
||||
image_ref_mode=ImageRefMode,
|
||||
strategy="alternative",
|
||||
)
|
||||
except ImportError as e:
|
||||
self.log(f"Alternative docling structure failed: {e}")
|
||||
|
||||
# Try strategy 3: Basic converter only
|
||||
try:
|
||||
from docling.document_converter import DocumentConverter # type: ignore[import-untyped]
|
||||
|
||||
self.log("Using basic docling import structure with mocks")
|
||||
return DoclingImports(
|
||||
conversion_status=MockConversionStatus,
|
||||
input_format=MockInputFormat,
|
||||
document_converter=DocumentConverter,
|
||||
image_ref_mode=MockImageRefMode,
|
||||
strategy="basic",
|
||||
)
|
||||
except ImportError as e:
|
||||
self.log(f"Basic docling structure failed: {e}")
|
||||
|
||||
# Strategy 4: Complete fallback - return None to indicate failure
|
||||
return None
|
||||
|
||||
def _create_advanced_converter(self, docling_imports: DoclingImports) -> Any:
|
||||
"""Create advanced converter with pipeline options if available."""
|
||||
try:
|
||||
from docling.datamodel.pipeline_options import PdfPipelineOptions # type: ignore[import-untyped]
|
||||
from docling.document_converter import PdfFormatOption # type: ignore[import-untyped]
|
||||
|
||||
document_converter = docling_imports.document_converter
|
||||
input_format = docling_imports.input_format
|
||||
|
||||
# Create basic pipeline options
|
||||
pipeline_options = PdfPipelineOptions()
|
||||
|
||||
# Configure OCR if specified and available
|
||||
if self.ocr_engine:
|
||||
try:
|
||||
from docling.models.factories import get_ocr_factory # type: ignore[import-untyped]
|
||||
|
||||
pipeline_options.do_ocr = True
|
||||
ocr_factory = get_ocr_factory(allow_external_plugins=False)
|
||||
ocr_options = ocr_factory.create_options(kind=self.ocr_engine)
|
||||
pipeline_options.ocr_options = ocr_options
|
||||
self.log(f"Configured OCR with engine: {self.ocr_engine}")
|
||||
except Exception as e: # noqa: BLE001
|
||||
self.log(f"Could not configure OCR: {e}, proceeding without OCR")
|
||||
pipeline_options.do_ocr = False
|
||||
|
||||
# Create format options
|
||||
pdf_format_option = PdfFormatOption(pipeline_options=pipeline_options)
|
||||
format_options = {}
|
||||
if hasattr(input_format, "PDF"):
|
||||
format_options[input_format.PDF] = pdf_format_option
|
||||
if hasattr(input_format, "IMAGE"):
|
||||
format_options[input_format.IMAGE] = pdf_format_option
|
||||
|
||||
return document_converter(format_options=format_options)
|
||||
|
||||
except Exception as e: # noqa: BLE001
|
||||
self.log(f"Could not create advanced converter: {e}, using basic converter")
|
||||
return docling_imports.document_converter()
|
||||
# ------------------------------ Core processing ----------------------------------
|
||||
|
||||
def _is_docling_compatible(self, file_path: str) -> bool:
|
||||
"""Check if file is compatible with Docling processing."""
|
||||
# All VALID_EXTENSIONS are Docling compatible (except for TEXT_FILE_TYPES which may overlap)
|
||||
docling_extensions = [
|
||||
"""Lightweight extension gate for Docling-compatible types."""
|
||||
docling_exts = (
|
||||
".adoc",
|
||||
".asciidoc",
|
||||
".asc",
|
||||
|
|
@ -456,102 +286,296 @@ class FileComponent(BaseFileComponent):
|
|||
".xhtml",
|
||||
".xml",
|
||||
".webp",
|
||||
]
|
||||
return any(file_path.lower().endswith(ext) for ext in docling_extensions)
|
||||
)
|
||||
return file_path.lower().endswith(docling_exts)
|
||||
|
||||
def _process_docling_in_subprocess(self, file_path: str) -> Data | None:
|
||||
"""Run Docling in a separate OS process and map the result to a Data object.
|
||||
|
||||
We avoid multiprocessing pickling by launching `python -c "<script>"` and
|
||||
passing JSON config via stdin. The child prints a JSON result to stdout.
|
||||
"""
|
||||
if not file_path:
|
||||
return None
|
||||
|
||||
args: dict[str, Any] = {
|
||||
"file_path": file_path,
|
||||
"markdown": bool(self.markdown),
|
||||
"image_mode": str(self.IMAGE_MODE),
|
||||
"md_image_placeholder": str(self.md_image_placeholder),
|
||||
"md_page_break_placeholder": str(self.md_page_break_placeholder),
|
||||
"pipeline": str(self.pipeline),
|
||||
"ocr_engine": str(self.ocr_engine) if getattr(self, "ocr_engine", "") else None,
|
||||
}
|
||||
|
||||
# The child is a tiny, self-contained script to keep memory/state isolated.
|
||||
child_script = textwrap.dedent(
|
||||
r"""
|
||||
import json, sys
|
||||
|
||||
def try_imports():
|
||||
# Strategy 1: latest layout
|
||||
try:
|
||||
from docling.datamodel.base_models import ConversionStatus, InputFormat # type: ignore
|
||||
from docling.document_converter import DocumentConverter # type: ignore
|
||||
from docling_core.types.doc import ImageRefMode # type: ignore
|
||||
return ConversionStatus, InputFormat, DocumentConverter, ImageRefMode, "latest"
|
||||
except Exception:
|
||||
pass
|
||||
# Strategy 2: alternative layout
|
||||
try:
|
||||
from docling.document_converter import DocumentConverter # type: ignore
|
||||
try:
|
||||
from docling_core.types import ConversionStatus, InputFormat # type: ignore
|
||||
except Exception:
|
||||
try:
|
||||
from docling.datamodel import ConversionStatus, InputFormat # type: ignore
|
||||
except Exception:
|
||||
class ConversionStatus: SUCCESS = "success"
|
||||
class InputFormat:
|
||||
PDF="pdf"; IMAGE="image"
|
||||
try:
|
||||
from docling_core.types.doc import ImageRefMode # type: ignore
|
||||
except Exception:
|
||||
class ImageRefMode:
|
||||
PLACEHOLDER="placeholder"; EMBEDDED="embedded"
|
||||
return ConversionStatus, InputFormat, DocumentConverter, ImageRefMode, "alternative"
|
||||
except Exception:
|
||||
pass
|
||||
# Strategy 3: basic converter only
|
||||
try:
|
||||
from docling.document_converter import DocumentConverter # type: ignore
|
||||
class ConversionStatus: SUCCESS = "success"
|
||||
class InputFormat:
|
||||
PDF="pdf"; IMAGE="image"
|
||||
class ImageRefMode:
|
||||
PLACEHOLDER="placeholder"; EMBEDDED="embedded"
|
||||
return ConversionStatus, InputFormat, DocumentConverter, ImageRefMode, "basic"
|
||||
except Exception as e:
|
||||
raise ImportError(f"Docling imports failed: {e}") from e
|
||||
|
||||
def create_converter(strategy, input_format, DocumentConverter, pipeline, ocr_engine):
|
||||
if strategy == "latest" and pipeline == "standard":
|
||||
try:
|
||||
from docling.datamodel.pipeline_options import PdfPipelineOptions # type: ignore
|
||||
from docling.document_converter import PdfFormatOption # type: ignore
|
||||
pipe = PdfPipelineOptions()
|
||||
if ocr_engine:
|
||||
try:
|
||||
from docling.models.factories import get_ocr_factory # type: ignore
|
||||
pipe.do_ocr = True
|
||||
fac = get_ocr_factory(allow_external_plugins=False)
|
||||
pipe.ocr_options = fac.create_options(kind=ocr_engine)
|
||||
except Exception:
|
||||
pipe.do_ocr = False
|
||||
fmt = {}
|
||||
if hasattr(input_format, "PDF"):
|
||||
fmt[getattr(input_format, "PDF")] = PdfFormatOption(pipeline_options=pipe)
|
||||
if hasattr(input_format, "IMAGE"):
|
||||
fmt[getattr(input_format, "IMAGE")] = PdfFormatOption(pipeline_options=pipe)
|
||||
return DocumentConverter(format_options=fmt)
|
||||
except Exception:
|
||||
return DocumentConverter()
|
||||
return DocumentConverter()
|
||||
|
||||
def export_markdown(document, ImageRefMode, image_mode, img_ph, pg_ph):
|
||||
try:
|
||||
mode = getattr(ImageRefMode, image_mode.upper(), image_mode)
|
||||
return document.export_to_markdown(
|
||||
image_mode=mode,
|
||||
image_placeholder=img_ph,
|
||||
page_break_placeholder=pg_ph,
|
||||
)
|
||||
except Exception:
|
||||
try:
|
||||
return document.export_to_text()
|
||||
except Exception:
|
||||
return str(document)
|
||||
|
||||
def to_rows(doc_dict):
|
||||
rows = []
|
||||
for t in doc_dict.get("texts", []):
|
||||
prov = t.get("prov") or []
|
||||
page_no = None
|
||||
if prov and isinstance(prov, list) and isinstance(prov[0], dict):
|
||||
page_no = prov[0].get("page_no")
|
||||
rows.append({
|
||||
"page_no": page_no,
|
||||
"label": t.get("label"),
|
||||
"text": t.get("text"),
|
||||
"level": t.get("level"),
|
||||
})
|
||||
return rows
|
||||
|
||||
def main():
|
||||
cfg = json.loads(sys.stdin.read())
|
||||
file_path = cfg["file_path"]
|
||||
markdown = cfg["markdown"]
|
||||
image_mode = cfg["image_mode"]
|
||||
img_ph = cfg["md_image_placeholder"]
|
||||
pg_ph = cfg["md_page_break_placeholder"]
|
||||
pipeline = cfg["pipeline"]
|
||||
ocr_engine = cfg.get("ocr_engine")
|
||||
meta = {"file_path": file_path}
|
||||
|
||||
try:
|
||||
ConversionStatus, InputFormat, DocumentConverter, ImageRefMode, strategy = try_imports()
|
||||
converter = create_converter(strategy, InputFormat, DocumentConverter, pipeline, ocr_engine)
|
||||
try:
|
||||
res = converter.convert(file_path)
|
||||
except Exception as e:
|
||||
print(json.dumps({"ok": False, "error": f"Docling conversion error: {e}", "meta": meta}))
|
||||
return
|
||||
|
||||
ok = False
|
||||
if hasattr(res, "status"):
|
||||
try:
|
||||
ok = (res.status == ConversionStatus.SUCCESS) or (str(res.status).lower() == "success")
|
||||
except Exception:
|
||||
ok = (str(res.status).lower() == "success")
|
||||
if not ok and hasattr(res, "document"):
|
||||
ok = getattr(res, "document", None) is not None
|
||||
if not ok:
|
||||
print(json.dumps({"ok": False, "error": "Docling conversion failed", "meta": meta}))
|
||||
return
|
||||
|
||||
doc = getattr(res, "document", None)
|
||||
if doc is None:
|
||||
print(json.dumps({"ok": False, "error": "Docling produced no document", "meta": meta}))
|
||||
return
|
||||
|
||||
if markdown:
|
||||
text = export_markdown(doc, ImageRefMode, image_mode, img_ph, pg_ph)
|
||||
print(json.dumps({"ok": True, "mode": "markdown", "text": text, "meta": meta}))
|
||||
return
|
||||
|
||||
# structured
|
||||
try:
|
||||
doc_dict = doc.export_to_dict()
|
||||
except Exception as e:
|
||||
print(json.dumps({"ok": False, "error": f"Docling export_to_dict failed: {e}", "meta": meta}))
|
||||
return
|
||||
|
||||
rows = to_rows(doc_dict)
|
||||
print(json.dumps({"ok": True, "mode": "structured", "doc": rows, "meta": meta}))
|
||||
except Exception as e:
|
||||
print(
|
||||
json.dumps({
|
||||
"ok": False,
|
||||
"error": f"Docling processing error: {e}",
|
||||
"meta": {"file_path": file_path},
|
||||
})
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
"""
|
||||
)
|
||||
|
||||
# Validate file_path to avoid command injection or unsafe input
|
||||
if not isinstance(args["file_path"], str) or any(c in args["file_path"] for c in [";", "|", "&", "$", "`"]):
|
||||
return Data(data={"error": "Unsafe file path detected.", "file_path": args["file_path"]})
|
||||
|
||||
proc = subprocess.run( # noqa: S603
|
||||
[sys.executable, "-u", "-c", child_script],
|
||||
input=json.dumps(args).encode("utf-8"),
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
if not proc.stdout:
|
||||
err_msg = proc.stderr.decode("utf-8", errors="replace") or "no output from child process"
|
||||
return Data(data={"error": f"Docling subprocess error: {err_msg}", "file_path": file_path})
|
||||
|
||||
try:
|
||||
result = json.loads(proc.stdout.decode("utf-8"))
|
||||
except Exception as e: # noqa: BLE001
|
||||
err_msg = proc.stderr.decode("utf-8", errors="replace")
|
||||
return Data(
|
||||
data={"error": f"Invalid JSON from Docling subprocess: {e}. stderr={err_msg}", "file_path": file_path},
|
||||
)
|
||||
|
||||
if not result.get("ok"):
|
||||
return Data(data={"error": result.get("error", "Unknown Docling error"), **result.get("meta", {})})
|
||||
|
||||
meta = result.get("meta", {})
|
||||
if result.get("mode") == "markdown":
|
||||
exported_content = str(result.get("text", ""))
|
||||
return Data(
|
||||
text=exported_content,
|
||||
data={"exported_content": exported_content, "export_format": self.EXPORT_FORMAT, **meta},
|
||||
)
|
||||
|
||||
rows = list(result.get("doc", []))
|
||||
return Data(data={"doc": rows, "export_format": self.EXPORT_FORMAT, **meta})
|
||||
|
||||
def process_files(
|
||||
self,
|
||||
file_list: list[BaseFileComponent.BaseFile],
|
||||
) -> list[BaseFileComponent.BaseFile]:
|
||||
"""Process files using standard parsing or Docling based on advanced_mode and file type."""
|
||||
|
||||
def process_file_standard(file_path: str, *, silent_errors: bool = False) -> Data | None:
|
||||
"""Process a single file using standard text parsing."""
|
||||
try:
|
||||
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
|
||||
except FileNotFoundError as e:
|
||||
msg = f"File not found: {file_path}. Error: {e}"
|
||||
self.log(msg)
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
except Exception as e:
|
||||
msg = f"Unexpected error processing {file_path}: {e}"
|
||||
self.log(msg)
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
|
||||
def process_file_docling(file_path: str, *, silent_errors: bool = False) -> Data | None:
|
||||
"""Process a single file using Docling if compatible, otherwise standard processing."""
|
||||
# Try Docling first if file is compatible and advanced mode is enabled
|
||||
try:
|
||||
return self._process_with_docling_and_export(file_path)
|
||||
except Exception as e: # noqa: BLE001
|
||||
self.log(f"Docling processing failed for {file_path}: {e}, falling back to standard processing")
|
||||
if not silent_errors:
|
||||
# Return error data instead of raising
|
||||
return Data(data={"error": f"Docling processing failed: {e}", "file_path": file_path})
|
||||
|
||||
return None
|
||||
"""Process input files.
|
||||
|
||||
- Single file + advanced_mode => Docling in a separate process.
|
||||
- Otherwise => standard parsing in current process (optionally threaded).
|
||||
"""
|
||||
if not file_list:
|
||||
msg = "No files to process."
|
||||
raise ValueError(msg)
|
||||
|
||||
file_path = str(file_list[0].path)
|
||||
if self.advanced_mode and self._is_docling_compatible(file_path):
|
||||
processed_data = process_file_docling(file_path)
|
||||
if not processed_data:
|
||||
msg = f"Failed to process file with Docling: {file_path}"
|
||||
raise ValueError(msg)
|
||||
|
||||
# Serialize processed data to match Data structure
|
||||
serialized_data = processed_data.serialize_model()
|
||||
|
||||
# Now, if doc is nested, we need to unravel it
|
||||
clean_data: list[Data | None] = [processed_data]
|
||||
|
||||
# This is where we've manually processed the data
|
||||
def process_file_standard(file_path: str, *, silent_errors: bool = False) -> Data | None:
|
||||
try:
|
||||
if "exported_content" not in serialized_data:
|
||||
clean_data = [
|
||||
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
|
||||
except FileNotFoundError as e:
|
||||
self.log(f"File not found: {file_path}. Error: {e}")
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
except Exception as e:
|
||||
self.log(f"Unexpected error processing {file_path}: {e}")
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
|
||||
# Advanced path: only for a single Docling-compatible file
|
||||
if len(file_list) == 1:
|
||||
file_path = str(file_list[0].path)
|
||||
if self.advanced_mode and self._is_docling_compatible(file_path):
|
||||
advanced_data: Data | None = self._process_docling_in_subprocess(file_path)
|
||||
|
||||
# --- UNNEST: expand each element in `doc` to its own Data row
|
||||
payload = getattr(advanced_data, "data", {}) or {}
|
||||
doc_rows = payload.get("doc")
|
||||
if isinstance(doc_rows, list):
|
||||
rows: list[Data | None] = [
|
||||
Data(
|
||||
data={
|
||||
"file_path": file_path,
|
||||
**(
|
||||
item["element"]
|
||||
if "element" in item
|
||||
else {k: v for k, v in item.items() if k != "file_path"}
|
||||
),
|
||||
}
|
||||
**(item if isinstance(item, dict) else {"value": item}),
|
||||
},
|
||||
)
|
||||
for item in serialized_data["doc"]
|
||||
for item in doc_rows
|
||||
]
|
||||
except Exception as _: # noqa: BLE001
|
||||
raise ValueError(serialized_data) from None
|
||||
return self.rollup_data(file_list, rows)
|
||||
|
||||
# Repeat file_list to match the number of processed data elements
|
||||
final_data: list[Data | None] = clean_data
|
||||
return self.rollup_data(file_list, final_data)
|
||||
# If not structured, keep as-is (e.g., markdown export or error dict)
|
||||
return self.rollup_data(file_list, [advanced_data])
|
||||
|
||||
# Standard multi-file (or single non-advanced) path
|
||||
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
|
||||
file_count = len(file_list)
|
||||
|
||||
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
|
||||
file_paths = [str(file.path) for file in file_list]
|
||||
file_paths = [str(f.path) for f in file_list]
|
||||
self.log(f"Starting parallel processing of {len(file_paths)} files with concurrency: {concurrency}.")
|
||||
my_data = parallel_load_data(
|
||||
file_paths,
|
||||
silent_errors=self.silent_errors,
|
||||
load_function=process_file_standard,
|
||||
max_concurrency=concurrency,
|
||||
)
|
||||
|
||||
return self.rollup_data(file_list, my_data)
|
||||
|
||||
# ------------------------------ Output helpers -----------------------------------
|
||||
|
||||
def load_files_advanced(self) -> DataFrame:
|
||||
"""Load files using advanced Docling processing and export to an advanced format."""
|
||||
# TODO: Update
|
||||
self.markdown = False
|
||||
return self.load_files()
|
||||
|
||||
|
|
@ -560,101 +584,3 @@ class FileComponent(BaseFileComponent):
|
|||
self.markdown = True
|
||||
result = self.load_files()
|
||||
return Message(text=str(result.text[0]))
|
||||
|
||||
def _process_with_docling_and_export(self, file_path: str) -> Data:
|
||||
"""Process a single file with Docling and export to the specified format."""
|
||||
# Import docling components only when needed
|
||||
docling_imports = self._try_import_docling()
|
||||
|
||||
if docling_imports is None:
|
||||
msg = "Docling not available for advanced processing"
|
||||
raise ImportError(msg)
|
||||
|
||||
conversion_status = docling_imports.conversion_status
|
||||
document_converter = docling_imports.document_converter
|
||||
image_ref_mode = docling_imports.image_ref_mode
|
||||
|
||||
try:
|
||||
# Create converter based on strategy and pipeline setting
|
||||
if docling_imports.strategy == "latest" and self.pipeline == "standard":
|
||||
converter = self._create_advanced_converter(docling_imports)
|
||||
else:
|
||||
# Use basic converter for compatibility
|
||||
converter = document_converter()
|
||||
self.log("Using basic DocumentConverter for Docling processing")
|
||||
|
||||
# Process single file
|
||||
result = converter.convert(file_path)
|
||||
|
||||
# Check if conversion was successful
|
||||
success = False
|
||||
if hasattr(result, "status"):
|
||||
if hasattr(conversion_status, "SUCCESS"):
|
||||
success = result.status == conversion_status.SUCCESS
|
||||
else:
|
||||
success = str(result.status).lower() == "success"
|
||||
elif hasattr(result, "document"):
|
||||
# If no status but has document, assume success
|
||||
success = result.document is not None
|
||||
|
||||
if not success:
|
||||
return Data(data={"error": "Docling conversion failed", "file_path": file_path})
|
||||
|
||||
if self.markdown:
|
||||
self.log("Exporting document to Markdown format")
|
||||
# Export the document to the specified format
|
||||
exported_content = self._export_document(result.document, image_ref_mode)
|
||||
|
||||
return Data(
|
||||
text=exported_content,
|
||||
data={
|
||||
"exported_content": exported_content,
|
||||
"export_format": self.EXPORT_FORMAT,
|
||||
"file_path": file_path,
|
||||
},
|
||||
)
|
||||
|
||||
return Data(
|
||||
data={
|
||||
"doc": self.docling_to_dataframe_simple(result.document.export_to_dict()),
|
||||
"export_format": self.EXPORT_FORMAT,
|
||||
"file_path": file_path,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e: # noqa: BLE001
|
||||
return Data(data={"error": f"Docling processing error: {e!s}", "file_path": file_path})
|
||||
|
||||
def docling_to_dataframe_simple(self, doc):
|
||||
"""Extract all text elements into a simple DataFrame."""
|
||||
return [
|
||||
{
|
||||
"page_no": text["prov"][0]["page_no"] if text["prov"] else None,
|
||||
"label": text["label"],
|
||||
"text": text["text"],
|
||||
"level": text.get("level", None), # for headers
|
||||
}
|
||||
for text in doc["texts"]
|
||||
]
|
||||
|
||||
def _export_document(self, document: Any, image_ref_mode: type[Enum]) -> str:
|
||||
"""Export document to Markdown format with placeholder images."""
|
||||
try:
|
||||
image_mode = (
|
||||
image_ref_mode(self.IMAGE_MODE) if hasattr(image_ref_mode, self.IMAGE_MODE) else self.IMAGE_MODE
|
||||
)
|
||||
|
||||
# Always export to Markdown since it's fixed
|
||||
return document.export_to_markdown(
|
||||
image_mode=image_mode,
|
||||
image_placeholder=self.md_image_placeholder,
|
||||
page_break_placeholder=self.md_page_break_placeholder,
|
||||
)
|
||||
|
||||
except Exception as e: # noqa: BLE001
|
||||
self.log(f"Markdown export failed: {e}, using basic text export")
|
||||
# Fallback to basic text export
|
||||
try:
|
||||
return document.export_to_text()
|
||||
except Exception: # noqa: BLE001
|
||||
return str(document)
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Add table
Add a link
Reference in a new issue