-
Notifications
You must be signed in to change notification settings - Fork 0
Use thread-local MarkItDown; add doc classifier #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
63bd356
f6e00c3
8fea2dd
4293a6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,8 +69,9 @@ | |
|
|
||
| _MARKITDOWN_UNSET = object() # sentinel: init never attempted | ||
| _markitdown_instance = _MARKITDOWN_UNSET | ||
| _markitdown_instances = threading.local() | ||
| _markitdown_generation = 0 | ||
| _markitdown_lock = threading.Lock() | ||
| _markitdown_convert_lock = threading.Lock() | ||
|
|
||
|
|
||
| def _build_markitdown_kwargs() -> Dict[str, Any]: | ||
|
|
@@ -110,42 +111,50 @@ def _build_markitdown_kwargs() -> Dict[str, Any]: | |
|
|
||
| def get_markitdown_instance() -> Optional[MarkItDown]: | ||
| """ | ||
| Create and configure a MarkItDown instance (thread-safe). | ||
| Create and configure a MarkItDown instance (thread-safe, thread-local). | ||
|
|
||
| Uses a module-level cache protected by a lock so concurrent threads | ||
| in batch processing don't race on initialization. A failed init is | ||
| remembered (instance set to ``None``) so subsequent calls return | ||
| immediately without retrying or logging duplicate errors. | ||
| Uses a thread-local cache so concurrent threads in batch processing | ||
| don't serialize on conversion tasks. | ||
| """ | ||
| global _markitdown_instance | ||
|
|
||
| cached = _markitdown_instance | ||
| if cached is not _MARKITDOWN_UNSET: | ||
| return cached # either a live instance or None (failed init) | ||
| if ( | ||
| not hasattr(_markitdown_instances, "instance") | ||
| or getattr(_markitdown_instances, "generation", None) != _markitdown_generation | ||
| ): | ||
| with _markitdown_lock: | ||
| current_generation = _markitdown_generation | ||
|
|
||
| with _markitdown_lock: | ||
| if _markitdown_instance is not _MARKITDOWN_UNSET: | ||
| return _markitdown_instance # pragma: no cover | ||
|
|
||
| if MarkItDown is None: | ||
| logger.error("MarkItDown not installed. Install with: pip install markitdown") | ||
| _markitdown_instance = None | ||
| return None | ||
| if MarkItDown is None: | ||
| logger.error("MarkItDown not installed. Install with: pip install markitdown") | ||
| _markitdown_instances.instance = None | ||
| _markitdown_instances.generation = current_generation | ||
| _markitdown_instance = None | ||
| return None | ||
|
|
||
| try: | ||
| _markitdown_instance = MarkItDown(**_build_markitdown_kwargs()) | ||
| return _markitdown_instance | ||
| try: | ||
| instance = MarkItDown(**_build_markitdown_kwargs()) | ||
| _markitdown_instances.instance = instance | ||
| _markitdown_instances.generation = current_generation | ||
| _markitdown_instance = instance | ||
| except Exception as e: | ||
| logger.error("Error initializing MarkItDown: %s", e) | ||
| _markitdown_instances.instance = None | ||
| _markitdown_instances.generation = current_generation | ||
| _markitdown_instance = None | ||
|
|
||
| except Exception as e: | ||
| logger.error("Error initializing MarkItDown: %s", e) | ||
| _markitdown_instance = None # remember the failure | ||
| return None | ||
| return _markitdown_instances.instance | ||
|
|
||
|
|
||
| def reset_markitdown_instance(): | ||
| def reset_markitdown_instance() -> None: | ||
| """Reset the cached MarkItDown instance so the next call retries initialization.""" | ||
| global _markitdown_instance | ||
| global _markitdown_generation, _markitdown_instance | ||
| with _markitdown_lock: | ||
| if hasattr(_markitdown_instances, "instance"): | ||
| delattr(_markitdown_instances, "instance") | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| if hasattr(_markitdown_instances, "generation"): | ||
| delattr(_markitdown_instances, "generation") | ||
| _markitdown_generation += 1 | ||
| _markitdown_instance = _MARKITDOWN_UNSET | ||
|
|
||
|
|
||
|
|
@@ -177,10 +186,8 @@ def convert_with_markitdown( | |
| try: | ||
| logger.info("Converting with MarkItDown: %s", file_path.name) | ||
|
|
||
| # Serialize convert() — MarkItDown is not documented as thread-safe for | ||
| # concurrent converts while we share one cached instance across workers. | ||
| with _markitdown_convert_lock: | ||
| result = md.convert(str(file_path)) | ||
| # Convert concurrently using thread-local MarkItDown instance | ||
| result = md.convert(str(file_path)) | ||
|
|
||
| if result and hasattr(result, "markdown"): | ||
| markdown_content = result.markdown | ||
|
|
@@ -273,17 +280,16 @@ def convert_stream_with_markitdown( | |
|
|
||
| try: | ||
| logger.info("Converting stream with MarkItDown: %s", filename) | ||
| with _markitdown_convert_lock: | ||
| stream_info = None | ||
| if StreamInfo is not None: | ||
| stream_info = StreamInfo( | ||
| extension=Path(filename).suffix, | ||
| filename=filename, | ||
| ) | ||
| if stream_info is not None: | ||
| result = md.convert_stream(stream, stream_info=stream_info) | ||
| else: | ||
| result = md.convert_stream(stream, file_extension=Path(filename).suffix) | ||
| stream_info = None | ||
| if StreamInfo is not None: | ||
| stream_info = StreamInfo( | ||
| extension=Path(filename).suffix, | ||
| filename=filename, | ||
| ) | ||
| if stream_info is not None: | ||
| result = md.convert_stream(stream, stream_info=stream_info) | ||
| else: | ||
| result = md.convert_stream(stream, file_extension=Path(filename).suffix) | ||
|
|
||
| if result and hasattr(result, "markdown"): | ||
| return True, result.markdown, None | ||
|
|
@@ -770,7 +776,7 @@ def convert_pdf_to_images( | |
| # Configure poppler path for Windows | ||
| poppler_path = (config.POPPLER_PATH or None) if sys.platform == "win32" else None | ||
|
|
||
| # Build conversion parameters | ||
| # Build conversion parameters (using paths_only to avoid RAM spikes on large PDFs) | ||
| convert_params = { | ||
| "pdf_path": str(pdf_path), | ||
| "dpi": dpi, | ||
|
|
@@ -779,28 +785,24 @@ def convert_pdf_to_images( | |
| "poppler_path": poppler_path, | ||
| "thread_count": max(1, thread_count), | ||
| "use_pdftocairo": config.PDF_IMAGE_USE_PDFTOCAIRO, | ||
| "output_file": "page", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Setting Useful? React with 👍 / 👎. |
||
| "paths_only": True, | ||
| } | ||
|
|
||
| # Convert PDF to images | ||
| images = convert_from_path(**convert_params) | ||
| # Convert PDF to images directly on disk | ||
| temp_paths = convert_from_path(**convert_params) | ||
|
|
||
| # Save images | ||
| # Rename output files to preserve expected page_001.ext structure | ||
| image_paths = [] | ||
| file_extension = "jpg" if config.PDF_IMAGE_FORMAT == "jpeg" else config.PDF_IMAGE_FORMAT | ||
|
|
||
| for i, image in enumerate(images, 1): | ||
| image_path = output_dir / f"page_{i:03d}.{file_extension}" | ||
|
|
||
| # Save with format-specific options | ||
| if config.PDF_IMAGE_FORMAT == "jpeg": | ||
| image.save(str(image_path), "JPEG", quality=85, optimize=True, progressive=True) | ||
| elif config.PDF_IMAGE_FORMAT == "png": | ||
| image.save(str(image_path), "PNG", optimize=True) | ||
| else: | ||
| image.save(str(image_path), config.PDF_IMAGE_FORMAT.upper()) | ||
|
|
||
| image_paths.append(image_path) | ||
| logger.debug("Saved page %d to %s", i, image_path.name) | ||
| for i, temp_path_str in enumerate(temp_paths, 1): | ||
| temp_path = Path(temp_path_str) | ||
| target_path = output_dir / f"page_{i:03d}.{file_extension}" | ||
| if temp_path.exists() and temp_path != target_path: | ||
| temp_path.replace(target_path) | ||
| image_paths.append(target_path) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PDF page path appended even when file missingLow Severity When Reviewed by Cursor Bugbot for commit 4293a6e. Configure here. |
||
| logger.debug("Saved page %d to %s", i, target_path.name) | ||
|
|
||
| logger.info( | ||
| "Converted %d pages to %s images", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -953,11 +953,124 @@ def _cleanup_temp_files(temp_files: List[Path]) -> None: | |
| logger.warning("Could not delete temporary file %s: %s", temp_file.name, e) | ||
|
|
||
|
|
||
| def _ocr_shared_optional_params() -> Dict[str, Any]: | ||
| def _filename_has_keyword(name: str, keywords: List[str]) -> bool: | ||
| """Return True when a keyword appears as a filename token.""" | ||
| return any(re.search(rf"(?<![a-z0-9]){re.escape(keyword)}(?![a-z0-9])", name) for keyword in keywords) | ||
|
|
||
|
|
||
| def classify_document_type(file_path: Path) -> str: | ||
| """Classify document type into generic, invoice, financial_statement, contract, or form. | ||
|
|
||
| Uses a hybrid approach: | ||
| 1. Regex checks on filename/path. | ||
| 2. Parsing first-page text (if PDF or text file) for key identifiers. | ||
| 3. Cheap LLM classification fallback (using ministral-8b-latest or mistral-small-latest). | ||
| """ | ||
| name = file_path.name.lower() | ||
|
|
||
| # 1. Filename heuristic | ||
| if _filename_has_keyword(name, ["invoice", "receipt", "bill"]): | ||
| logger.debug("Classified %s as 'invoice' via filename", file_path.name) | ||
| return "invoice" | ||
| if _filename_has_keyword(name, ["contract", "agreement", "nda", "lease"]): | ||
| logger.debug("Classified %s as 'contract' via filename", file_path.name) | ||
| return "contract" | ||
| if _filename_has_keyword(name, ["statement", "financial", "balance_sheet", "income"]): | ||
| logger.debug("Classified %s as 'financial_statement' via filename", file_path.name) | ||
| return "financial_statement" | ||
| if _filename_has_keyword(name, ["form", "w9", "w2", "tax"]): | ||
| logger.debug("Classified %s as 'form' via filename", file_path.name) | ||
| return "form" | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| # 2. First-page text content check (for text PDFs/txt files) | ||
| ext = file_path.suffix.lower().lstrip(".") | ||
| first_text = "" | ||
|
|
||
| if ext == "pdf": | ||
| try: | ||
| import pdfplumber | ||
|
|
||
| with pdfplumber.open(file_path) as pdf: | ||
| if pdf.pages: | ||
| first_text = (pdf.pages[0].extract_text() or "")[:1000] | ||
| except Exception as e: | ||
| logger.debug("Could not extract PDF text for classification: %s", e) | ||
| elif ext == "txt": | ||
| try: | ||
| first_text = file_path.read_text(errors="ignore")[:1000] | ||
| except Exception as e: | ||
| logger.debug("Could not read text file for classification: %s", e) | ||
|
|
||
| if first_text: | ||
| first_text_lower = first_text.lower() | ||
| if any(w in first_text_lower for w in ["invoice", "receipt", "purchase order", "amount due"]): | ||
| logger.debug("Classified %s as 'invoice' via page 1 text", file_path.name) | ||
| return "invoice" | ||
| if any(w in first_text_lower for w in ["agreement", "contract", "parties", "hereby", "undersigned"]): | ||
| logger.debug("Classified %s as 'contract' via page 1 text", file_path.name) | ||
| return "contract" | ||
| if any( | ||
| w in first_text_lower | ||
| for w in ["statement of", "balance sheet", "cash flow", "income statement", "assets", "liabilities"] | ||
| ): | ||
| logger.debug("Classified %s as 'financial_statement' via page 1 text", file_path.name) | ||
| return "financial_statement" | ||
| if any(w in first_text_lower for w in ["form ", "w-9", "w-2", "tax return", "filer"]): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Text heuristic "form " matches inside common wordsMedium Severity The substring check Reviewed by Cursor Bugbot for commit 4293a6e. Configure here. |
||
| logger.debug("Classified %s as 'form' via page 1 text", file_path.name) | ||
| return "form" | ||
|
|
||
| # 3. LLM fallback check (if API key is present) | ||
| if config.MISTRAL_API_KEY: | ||
| try: | ||
| client = get_mistral_client() | ||
| if client: | ||
| prompt = ( | ||
| f"Classify the document '{file_path.name}' into one of these types: " | ||
| "invoice, financial_statement, contract, form, generic.\n" | ||
| "Reply with only the lowercase name of the type.\n" | ||
| ) | ||
| if first_text: | ||
| prompt += f"First page excerpt:\n{first_text[:500]}\n" | ||
|
|
||
| model = config.MISTRAL_DOCUMENT_QNA_MODEL or "ministral-8b-latest" | ||
| response = client.chat.complete( | ||
| model=model, | ||
| messages=[{"role": "user", "content": prompt}], | ||
| max_tokens=10, | ||
| ) | ||
| if response and response.choices: | ||
| result = response.choices[0].message.content.strip().lower() | ||
| for t in ["invoice", "financial_statement", "contract", "form"]: | ||
| if t in result: | ||
| logger.debug("Classified %s as '%s' via LLM", file_path.name, t) | ||
| return t | ||
| except Exception as e: | ||
| logger.debug("LLM classification check failed: %s", e) | ||
|
|
||
| logger.debug("Classified %s as 'generic'", file_path.name) | ||
| return "generic" | ||
|
|
||
|
|
||
| def _ocr_shared_optional_params(file_path: Optional[Path] = None, doc_type: str = "auto") -> Dict[str, Any]: | ||
| """OCR fields shared by sync ``ocr.process`` and batch JSONL ``body``.""" | ||
| fields: Dict[str, Any] = {} | ||
| bbox_format = get_bbox_annotation_format() | ||
| doc_format = get_document_annotation_format() | ||
|
|
||
| doc_format = None | ||
| if config.MISTRAL_ENABLE_STRUCTURED_OUTPUT and config.MISTRAL_ENABLE_DOCUMENT_ANNOTATION: | ||
| # Resolve dynamic classification only when document annotation can use it. | ||
| resolved_doc_type = doc_type | ||
| if resolved_doc_type == "auto": | ||
| nested = config.MISTRAL_DOCUMENT_SCHEMA_TYPE | ||
| if nested == "auto": | ||
| if file_path is not None: | ||
| resolved_doc_type = classify_document_type(file_path) | ||
| else: | ||
| resolved_doc_type = "generic" | ||
| else: | ||
| resolved_doc_type = nested | ||
|
|
||
| doc_format = get_document_annotation_format(doc_type=resolved_doc_type) | ||
| if bbox_format is not None: | ||
| fields["bbox_annotation_format"] = bbox_format | ||
| if doc_format is not None: | ||
|
|
@@ -982,6 +1095,8 @@ def build_ocr_process_kwargs( | |
| include_retries: bool, | ||
| pages: Optional[List[int]] = None, | ||
| request_id: Optional[str] = None, | ||
| file_path: Optional[Path] = None, | ||
| doc_type: str = "auto", | ||
| ) -> Dict[str, Any]: | ||
| """Build kwargs for ``client.ocr.process`` or a batch JSONL line ``body``.""" | ||
| ocr_params: Dict[str, Any] = { | ||
|
|
@@ -995,7 +1110,7 @@ def build_ocr_process_kwargs( | |
| ocr_params["id"] = request_id | ||
| if pages is not None: | ||
| ocr_params["pages"] = pages | ||
| ocr_params.update(_ocr_shared_optional_params()) | ||
| ocr_params.update(_ocr_shared_optional_params(file_path=file_path, doc_type=doc_type)) | ||
| return ocr_params | ||
|
|
||
|
|
||
|
|
@@ -1130,6 +1245,7 @@ def _report_progress(message: str, progress: float = 0.0): | |
| include_retries=True, | ||
| pages=pages, | ||
| request_id=ocr_id, | ||
| file_path=file_path, | ||
| ) | ||
|
|
||
| response = client.ocr.process(**ocr_params) | ||
|
|
@@ -2521,6 +2637,7 @@ def _prepare_batch_entries( | |
| include_retries=False, | ||
| pages=None, | ||
| request_id=custom_id, | ||
| file_path=file_path, | ||
| ) | ||
| body["include_image_base64"] = include_image_base64 | ||
|
|
||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset_markitdown_instance()only deletesinstancefrom the current thread’sthreading.local()storage, so worker threads in a long-lived thread pool retain stale cached instances (or cachedNoneafter an init failure). After calling reset, those threads will continue using old state and may never retry initialization, which breaks the reset API’s expected behavior in multithreaded runs.Useful? React with 👍 / 👎.