diff --git a/CHANGELOG.md b/CHANGELOG.md index 60d5daa..3d2a419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,19 @@ All notable changes to OpenCR are documented here. The format follows [Keep a Ch ## [Unreleased] +### Changed + +- **Breaking:** OpenCR is GPU-first again. The in-process Apple Silicon / CPU + `MODEL_BACKEND=local` path, CPU Docker profile, and local `transformers` + dependency file were removed. +- Default OCR model is now `deepseek-ai/DeepSeek-OCR-2`. +- `docker compose up -d` now starts the NVIDIA/vLLM stack directly; no compose + profile is required. + +## [v1.0.0] + ### Added + - Apache-2.0 license (`LICENSE`). - English-first README with Turkish sibling at `README.tr.md`. - `CONTRIBUTING.md`, GitHub Actions CI workflow, project `Makefile`. @@ -15,11 +27,13 @@ All notable changes to OpenCR are documented here. The format follows [Keep a Ch - Publish modal now prefills `username/run-name` and adds the `opencr` discoverability tag to dataset cards. ### Changed + - **Breaking:** `docker compose up` no longer starts services without an explicit profile. Use `--profile gpu` (vLLM, NVIDIA) or `--profile cpu` (in-process transformers). - `INPUT_DIR` / `OUTPUT_DIR` default to `./input` / `./output` outside Docker, `/data/...` inside. - OpenAPI metadata now declares Apache-2.0; UI footer no longer claims "All rights reserved". ### Fixed + - `.gitignore` now covers `.DS_Store`, IDE folders, lint caches, and HF caches. --- @@ -34,6 +48,7 @@ All notable changes to OpenCR are documented here. The format follows [Keep a Ch 6. GitHub auto-creates a release page from the tag; paste the changelog entry into it. Bump rules: + - **PATCH** for bug fixes that don't change behavior. - **MINOR** for backwards-compatible features. - **MAJOR** for breaking changes (env var renames, removed endpoints, behavior shifts users have to adapt to). diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f429790..b25067a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -19,8 +19,8 @@ expected text is one of the highest-leverage contributions. PaddleOCR / Marker on a Turkish corpus and post the table — even informal numbers are useful. -- **Model-backend ports.** MLX, llama.cpp, ONNX, or any other runtime -that improves throughput on a target platform. +- **Deployment recipes.** vLLM, hosted GPU endpoints, and reproducible +benchmark environments that improve throughput or quality. - **Translations.** README and dataset cards in additional languages. @@ -33,7 +33,7 @@ make install make test ``` -`make run` starts a local dev server on http://localhost:39672 with the `local` model backend (no GPU needed; ~5–30 s/page on M-series Macs). +`make run` starts a local dev server on http://localhost:39672 and points it at `MODEL_SERVER_URL` (default: `http://localhost:39671`). Start the bundled GPU stack with `docker compose up -d`, or provide another OpenAI-compatible GPU endpoint. ## Code style diff --git a/Makefile b/Makefile index 7455fb2..c2d6779 100644 --- a/Makefile +++ b/Makefile @@ -1,19 +1,20 @@ -.PHONY: help install install-local run run-remote test lint format docker-up docker-down clean +.PHONY: help install run run-remote test lint format docker-up docker-down clean PY ?= python3 VENV ?= .venv PIP := $(VENV)/bin/pip PYBIN := $(VENV)/bin/python +MODEL_SERVER_URL ?= http://localhost:39671 help: @echo "OpenCR developer targets:" - @echo " make install # venv + base deps + local-backend deps (Mac/CPU friendly)" - @echo " make run # start dev server on http://localhost:39672 with the local backend" + @echo " make install # venv + base/dev deps" + @echo " make run # start dev server on http://localhost:39672, using MODEL_SERVER_URL" @echo " make run-remote # start dev server pointing at MODEL_SERVER_URL" @echo " make test # run pytest suite" @echo " make lint # ruff check" @echo " make format # ruff format" - @echo " make docker-up # docker compose up (NVIDIA GPU profile)" + @echo " make docker-up # docker compose up (NVIDIA GPU stack)" @echo " make docker-down # docker compose down" $(VENV): @@ -21,14 +22,14 @@ $(VENV): $(PIP) install -U pip install: $(VENV) - $(PIP) install -r ocr_pipeline/requirements.txt -r requirements-local.txt - $(PIP) install pytest pytest-asyncio ruff + $(PIP) install -r ocr_pipeline/requirements.txt + $(PIP) install -r requirements-dev.txt run: $(VENV) - MODEL_BACKEND=local $(PYBIN) -m uvicorn ocr_pipeline.main:app --host 0.0.0.0 --port 39672 --reload + MODEL_BACKEND=remote MODEL_SERVER_URL=$(MODEL_SERVER_URL) $(PYBIN) -m uvicorn ocr_pipeline.main:app --host 0.0.0.0 --port 39672 --reload run-remote: $(VENV) - MODEL_BACKEND=remote $(PYBIN) -m uvicorn ocr_pipeline.main:app --host 0.0.0.0 --port 39672 --reload + MODEL_BACKEND=remote MODEL_SERVER_URL=$(MODEL_SERVER_URL) $(PYBIN) -m uvicorn ocr_pipeline.main:app --host 0.0.0.0 --port 39672 --reload test: $(VENV) PYTHONPATH=. $(PYBIN) -m pytest -q @@ -40,7 +41,7 @@ format: $(VENV) $(VENV)/bin/ruff format ocr_pipeline tests scripts docker-up: - docker compose --profile gpu up -d + docker compose up -d docker-down: docker compose down diff --git a/README.md b/README.md index cc85549..ffd3169 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,16 @@ For Turkish documents, see: [README.tr.md](./README.tr.md) ## Why OpenCR? -- **Turkish-first accuracy.** Built around DeepSeek-OCR, it handles Turkish characters and difficult page layouts better than off-the-shelf OCR. +- **Turkish-first accuracy.** Built around DeepSeek-OCR-2, it handles Turkish characters and difficult page layouts better than off-the-shelf OCR. - **Dataset factory.** Outputs are packaged directly as `pages.parquet` + `documents.parquet` with deterministic train/validation/test splits and a HuggingFace dataset card. - **Operator console.** A single-page web UI to monitor runs, page-by-page validate quality, retry, and publish to HuggingFace. -- **Pluggable backends.** Production-grade NVIDIA + vLLM by default; runs in-process on Apple Silicon / CPU for development; or talk to any OpenAI-compatible model server. +- **GPU-first backend.** Production-grade NVIDIA + vLLM by default, with an optional remote mode for any OpenAI-compatible GPU model server. --- ## Quickstart -### Option 1 — Docker (NVIDIA GPU, fastest path to inference) +### Option 1 — Docker (NVIDIA GPU, primary path) Requires Docker, an NVIDIA GPU, and the NVIDIA Container Toolkit. @@ -29,27 +29,10 @@ docker compose up -d Open http://localhost:39672. Drop PDFs in `./input/`, hit **Start OCR run**. -### Option 2 — Apple Silicon / CPU (in-process inference, no GPU needed) - -For local development, demos, and small jobs on a Mac or Linux box with no GPU. - -```bash -git clone https://github.com/cdliai/opencr.git -cd opencr -python3 -m venv .venv && source .venv/bin/activate -pip install -r ocr_pipeline/requirements.txt -r requirements-local.txt -MODEL_BACKEND=local ./scripts/start.sh -``` - -Open http://localhost:39672. The DeepSeek-OCR model (~6 GB) downloads -on first request and runs in-process via `transformers` on MPS (Apple Silicon) -or CPU. Expect **5–30 seconds per page on M-series, much slower on CPU** — -fine for development, not for production batch jobs. - -### Option 3 — Remote model server (point at any OpenAI-compatible endpoint) +### Option 2 — Remote model server (point at any OpenAI-compatible endpoint) If you already run vLLM somewhere, or use OpenRouter, or another endpoint -serving DeepSeek-OCR: +serving DeepSeek-OCR-2: ```bash pip install -r ocr_pipeline/requirements.txt @@ -64,11 +47,10 @@ Configurable via environment variables (or a `.env` file): | Variable | Default | Description | | -------------------- | -------------------------------- | ------------------------------------------------------------------------------------------------- | -| `MODEL_BACKEND` | `vllm` | `vllm` (NVIDIA, OpenAI-compatible server), `local` (in-process transformers), `remote` (alias). | +| `MODEL_BACKEND` | `vllm` | `vllm` for the bundled NVIDIA model server, or `remote` for another OpenAI-compatible endpoint. | | `MODEL_SERVER_URL` | `http://ocr-model:39671` | Base URL for `vllm` / `remote` backends. | -| `MODEL_NAME` | `deepseek-ai/DeepSeek-OCR` | Model identifier. | +| `MODEL_NAME` | `deepseek-ai/DeepSeek-OCR-2` | Model identifier. | | `MODEL_API_KEY` | `EMPTY` | API key for remote endpoints. | -| `LOCAL_DEVICE` | auto | `auto`, `mps`, `cuda`, or `cpu` for the `local` backend. | | `INPUT_DIR` | `./input` (or `/data/input`) | Where to read PDFs from. | | `OUTPUT_DIR` | `./output` (or `/data/output`) | Where artifacts and the SQLite DB land. | | `HOST` / `PORT` | `0.0.0.0` / `39672` | Where the web console serves. | @@ -116,9 +98,8 @@ Published datasets are tagged `opencr` so they're discoverable via [HuggingFace' ┌───────────────────────────────┐ │ Model backend │ │ ┌─────────────────────────┐ │ - │ │ vllm (NVIDIA, prod) │ │ - │ │ local (MPS/CPU, dev) │ │ - │ │ remote (any OpenAI URL) │ │ + │ │ vLLM (NVIDIA, default) │ │ + │ │ remote (OpenAI URL) │ │ │ └─────────────────────────┘ │ └───────────────────────────────┘ ``` @@ -145,8 +126,8 @@ Tests live under `tests/`. UI is plain HTML + Alpine.js — no build step. ## Contributing Contributions are welcome — bug reports, Turkish-language -test fixtures, benchmarks against other OCR engines, model-backend -ports (MLX, llama.cpp), and documentation translations are +test fixtures, benchmarks against other OCR engines, deployment +recipes, and documentation translations are especially useful. See [CONTRIBUTING.md](./CONTRIBUTING.md). diff --git a/README.tr.md b/README.tr.md index 7c7ef0d..6905ff8 100644 --- a/README.tr.md +++ b/README.tr.md @@ -4,19 +4,19 @@ OpenCR, özellikle Türkçe metinler, arşiv dökümanları ve karmaşık sayfa ## Neden OpenCR? -- **Türkçe Odaklı Doğruluk:** DeepSeek-OCR tabanlı yapısıyla, standart OCR araçlarının zorlandığı Türkçe karakterlerde ve karmaşık sayfa düzenlerinde üstün performans sağlar. +- **Türkçe Odaklı Doğruluk:** DeepSeek-OCR-2 tabanlı yapısıyla, standart OCR araçlarının zorlandığı Türkçe karakterlerde ve karmaşık sayfa düzenlerinde güçlü bir başlangıç noktası sağlar. - **Veri Seti Fabrikası:** Çıkarılan metinleri doğrudan `.parquet` formatında paketler ve tek tıkla HuggingFace'e yüklemeye hazır hale getirir. - **Operatör Konsolu:** İşlemleri izlemek, sayfa sayfa kontrol etmek ve hataları düzeltmek için modern bir web arayüzü sunar. ## Kurulum -### Docker ile Çalıştırma (GPU Gerekir) +### Docker ile Çalıştırma (NVIDIA GPU Gerekir) ```bash -docker-compose up -d +docker compose up -d ``` -### Lokal Geliştirme ve Web Arayüzü (Apple Silicon / CPU) -Pipeline arayüzünü Apple bilgisayarınızda veya CPU üzerinde denemek için: +### Harici Model Sunucusu ile Geliştirme +Zaten çalışan OpenAI-compatible bir vLLM / GPU endpoint'iniz varsa: 1. **Klasör ve Ortam Hazırlığı:** ```bash @@ -30,13 +30,12 @@ Pipeline arayüzünü Apple bilgisayarınızda veya CPU üzerinde denemek için: ```bash export INPUT_DIR="./input" export OUTPUT_DIR="./output" - export PYTHONPATH=$PYTHONPATH:. - python3 ocr_pipeline/main.py + MODEL_BACKEND=remote MODEL_SERVER_URL="https://your-endpoint" ./scripts/start.sh ``` Erişim: **http://localhost:39672** ## Mimari -- **Backend:** vLLM tabanlı DeepSeek-OCR (Ağır iş yükü). +- **Backend:** vLLM tabanlı DeepSeek-OCR-2 (GPU-first). - **Frontend/API:** FastAPI & Alpine.js (Yönetim konsolu). --- diff --git a/docker-compose.yml b/docker-compose.yml index f5c3577..509f8c2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,15 +1,11 @@ -# Two profiles ship out of the box: -# -# docker compose --profile gpu up -d # production: vLLM model server + pipeline (NVIDIA) -# docker compose --profile cpu up -d # CPU/Mac: pipeline only, in-process transformers backend -# -# Without an explicit --profile, no services run. Always pick one. +# GPU-first OpenCR stack: vLLM model server + pipeline. +# Requires Docker, an NVIDIA GPU, and the NVIDIA Container Toolkit. services: ocr-model: - profiles: ["gpu"] build: ./ocr-model runtime: nvidia + ipc: host restart: unless-stopped environment: - NVIDIA_VISIBLE_DEVICES=all @@ -39,7 +35,6 @@ services: start_period: 600s ocr-pipeline: - profiles: ["gpu"] build: ./ocr_pipeline restart: unless-stopped ports: @@ -50,30 +45,12 @@ services: environment: - MODEL_BACKEND=vllm - MODEL_SERVER_URL=http://ocr-model:39671 + - MODEL_NAME=deepseek-ai/DeepSeek-OCR-2 - INPUT_DIR=/data/input - OUTPUT_DIR=/data/output depends_on: ocr-model: condition: service_healthy - ocr-pipeline-cpu: - profiles: ["cpu"] - build: - context: . - dockerfile: ocr_pipeline/Dockerfile.cpu - restart: unless-stopped - ports: - - "39672:39672" - volumes: - - ./input:/data/input - - ./output:/data/output - - hf-cache:/root/.cache/huggingface - environment: - - MODEL_BACKEND=local - - LOCAL_DEVICE=cpu - - INPUT_DIR=/data/input - - OUTPUT_DIR=/data/output - - HF_HOME=/root/.cache/huggingface - volumes: hf-cache: diff --git a/docs/ottoman-turkish-ocr-research-brief.md b/docs/ottoman-turkish-ocr-research-brief.md new file mode 100644 index 0000000..37813bd --- /dev/null +++ b/docs/ottoman-turkish-ocr-research-brief.md @@ -0,0 +1,218 @@ +# Ottoman and Turkish OCR Research Brief + +This note describes the current OpenCR pipeline and frames the research question we need help with: how to get more reliable OCR and structured text from Turkish, old Turkish, Latinized Ottoman, and Ottoman-script material without making the operator workflow heavy or over-engineered. + +## Current System + +OpenCR is a GPU-first OCR workbench. A user uploads or registers PDF documents, groups them, edits document-level metadata, runs OCR, inspects page images beside extracted text, and exports the result for corpus work or model training. + +The current OCR path is: + +1. Register PDFs in a document catalog. +2. Store document metadata such as title, author, work, book, date label, date precision, language, script, license, citation, notes, and group path. +3. Render each PDF page to an image. +4. Send the page image to DeepSeek-OCR-2 through the vLLM/OpenAI-compatible backend. +5. Clean the OCR text conservatively. +6. Validate the page for obvious extraction failures and corpus-quality warnings. +7. Store per-page metadata, quality flags, raw text, clean text, markdown, source PDFs, OCR image/text pairs, and text bundles. +8. Export HuggingFace-friendly datasets. + +The system intentionally keeps the pipeline small. It does not try to silently "fix" historical text into modern Turkish. The goal is to preserve historical orthography, diacritics, transliteration choices, and document provenance. + +Relevant implementation areas: + +- `ocr_pipeline/services/batch_processor.py`: page rendering, OCR calls, retry strategy, validation, metadata collection. +- `ocr_pipeline/services/ocr_engine.py`: OpenAI-compatible client for vLLM or another GPU model server. +- `ocr_pipeline/services/output_validator.py`: page-level validation and quality flags. +- `ocr_pipeline/services/text_cleaner.py`: conservative OCR cleanup. +- `ocr_pipeline/services/text_normalizer.py`: optional NLP-oriented normalization for exported text only. +- `ocr_pipeline/services/text_bundle_exporter.py`: raw, clean, and normalized text bundles. +- `ocr_pipeline/services/ocr_pair_exporter.py`: page image plus text pairs for OCR model fine-tuning. +- `ocr_pipeline/services/dataset_exporter.py`: HuggingFace-style page/document dataset export. + +## Model Context + +OpenCR now uses `deepseek-ai/DeepSeek-OCR-2` as the base model. The model is distributed on HuggingFace with Apache-2.0 license metadata, is listed as a 3B BF16 image-text-to-text model, and supports Free OCR plus grounded markdown prompts. The DeepSeek-OCR-2 paper introduces DeepEncoder V2 / Visual Causal Flow, where visual tokens can be reordered by document semantics instead of being forced through a fixed raster-scan order. That makes it a better first candidate for complex pages, but not a substitute for domain-specific Ottoman/Turkish benchmarking. + +The vLLM recipe for DeepSeek-OCR-2 documents OpenAI-compatible online serving with `vllm serve deepseek-ai/DeepSeek-OCR-2`, the custom `NGramPerReqLogitsProcessor`, disabled prefix caching, and multimodal image prompts. That matches OpenCR's GPU-first runtime direction. + +For Ottoman Turkish, the research risk is larger than plain OCR accuracy. Arabic-script Ottoman has right-to-left script behavior, ligatures, weak or ambiguous vowel representation, historical fonts, and no clean one-to-one mapping into modern Latin Turkish. Prior work on Ottoman periodicals emphasizes that transcription into a Latin writing system is itself a modeling choice, not merely a character-recognition task. + +## What We Already Preserve + +OpenCR keeps separate text layers: + +- `raw`: the model output after only minimal capture. +- `clean`: conservative cleaned text for reading and corpus publication. +- `normalized`: optional NLP-oriented text, currently used only in text-bundle exports. + +This split matters. For scholarly work, `clean` should remain close to what OCR produced. `normalized` can join broken line hyphenation, remove simple markup leaks, and make tokenization easier, but it must not replace the archive-facing text. + +OpenCR also stores: + +- source PDF SHA256, +- source filename, +- model name, +- pipeline version, +- extraction mode, +- extraction attempt, +- DPI, +- page status, +- validation issues, +- quality flags, +- language/script metadata, +- project attribution through OpenCR/cdli.ai metadata. + +Current quality flags include visible corpus warnings such as line-break hyphenation and markup leakage. These are not the same as "OCR is wrong"; they mean the page should not be treated as ground truth without review. + +## Main Research Question + +How can we improve OCR accuracy and corpus usefulness for Turkish, old Turkish, Latinized Ottoman, and Ottoman-script documents while preserving historically meaningful forms and keeping the OpenCR workflow benchmarkable and repeatable? + +## Questions To Investigate + +1. Which DeepSeek-OCR-2 prompt and image settings work best for our material? + + Compare Free OCR, grounded markdown, crop mode on/off, image sizes, and DPI values. Measure separately for Latinized Ottoman, modern Turkish Latin, Arabic-script Ottoman, tables, title pages, and degraded scans. + +2. When should we use OCR, HTR/ATR, or a specialist engine? + + DeepSeek-OCR-2 is useful as a general vision-language OCR model. Kraken/eScriptorium-style ATR may be better for historical and non-Latin scripts when trained or fine-tuned on our domain. We should compare rather than assume one model is best. + +3. What is the right target text? + + For Ottoman-script documents, there are at least three possible targets: + + - diplomatic transcription preserving script-level details, + - scholarly transliteration, + - modern Turkish normalization. + + These should be separate dataset columns or export layers, not one overwritten text field. + +4. Which errors are dangerous for scholarship? + + Aggregate CER/WER is not enough. Historical OCR can silently modernize orthography, normalize rare forms, drop diacritics, or turn historically meaningful spelling into more common contemporary spelling. We need an error taxonomy. + +5. How much ground truth is enough? + + Start with a small reviewed set: 20-50 pages chosen across document types, scan quality, script, date, and layout. Manually correct at page or line level, then calculate CER/WER and error categories. + +6. What should be corrected automatically? + + Only low-risk transformations should be automatic in `normalized`: line-break hyphen joining, obvious markup removal, whitespace normalization, and page header/footer handling if confidence is high. Orthographic modernization should remain opt-in and separately labeled. + +## Evaluation Protocol + +Use a small gold set first, then expand only after the measurements are useful. + +Recommended first benchmark: + +- 10 pages: clean modern Turkish Latin print. +- 10 pages: Latinized Ottoman / early Republican Turkish with extended Latin characters. +- 10 pages: Arabic-script Ottoman print. +- 5 pages: tables, treaties, or structured forms. +- 5 pages: degraded scans or unusual typography. + +For every page, store: + +- page image, +- source PDF, +- OCR raw text, +- OCR clean text, +- human-reviewed text, +- target convention used by the reviewer, +- reviewer notes, +- CER, +- WER, +- quality flags, +- error categories. + +Important error categories: + +- character substitution, +- dropped diacritic, +- inserted diacritic, +- word split, +- word merge, +- line-break hyphenation, +- layout-order error, +- header/footer leakage, +- table structure loss, +- script confusion, +- Ottoman-to-modern normalization, +- hallucinated word, +- omitted text. + +## Practical Improvements Worth Trying + +These are useful without making OpenCR heavy. + +1. Add an "evaluation set" mode. + + Let users mark selected pages as evaluation pages and attach reviewed text later. The pipeline can then calculate CER/WER and export a small benchmark bundle. + +2. Add extraction profiles. + + Instead of many UI controls, define a few named profiles: + + - `latin_print_fast` + - `latin_print_careful` + - `ottoman_arabic_print` + - `tables_and_forms` + - `fine_tune_pairs` + + A profile can choose DPI, prompt, crop mode, and validation thresholds. + +3. Keep image/text pairs export central. + + Fine-tuning needs page or line images matched with reviewed text. Current OCR pairs are useful, but the strongest version should include `reviewed_text`, `review_status`, and `target_convention`. + +4. Add script-aware validation. + + If metadata says `script=latin_extended`, warn when the page is mostly Arabic script. If metadata says Ottoman Arabic script, warn when the extracted text is mostly Latin unless transliteration is the intended target. + +5. Add a no-silent-modernization rule. + + Any step that changes historical spelling or transliteration must write to a new layer, not mutate `clean`. + +6. Add simple page-level layout labels. + + `prose`, `title_page`, `table`, `mixed`, `index`, `blank`, and `image_only` would help researchers filter outputs and compare OCR modes. + +7. Store per-run extraction profile. + + HuggingFace exports should say not only which model was used, but also which extraction profile, DPI, prompt mode, crop mode, and cleanup mode were used. + +## What Not To Do Yet + +Do not add a large automatic correction pipeline before we have ground truth. It would make the text look cleaner while hiding errors. + +Do not collapse Ottoman-script transcription, transliteration, and modern Turkish normalization into one field. They answer different scholarly questions. + +Do not judge model quality only from pages that "look readable." A page can be readable and still be bad for named entities, dates, legal terms, diacritics, or rare Ottoman forms. + +Do not publish a dataset as research-grade unless it has a reviewed subset and clear quality metadata. + +## Suggested Research Deliverable + +Ask the researcher to produce: + +1. A short survey of OCR/ATR options for Turkish, old Turkish, Latinized Ottoman, and Arabic-script Ottoman print. +2. A recommended transcription target schema. +3. A 20-50 page benchmark design. +4. A CER/WER plus error-taxonomy evaluation plan. +5. Recommended DeepSeek-OCR-2 profile settings to test. +6. A proposal for when to use DeepSeek-OCR-2 versus Kraken/eScriptorium-style specialist ATR. +7. A minimal metadata schema for HuggingFace publication that preserves source, script, date, model, pipeline, and review state. + +## Source Notes + +- DeepSeek-OCR-2 model card: https://huggingface.co/deepseek-ai/DeepSeek-OCR-2 +- DeepSeek-OCR-2 paper: https://arxiv.org/abs/2601.20552 +- vLLM DeepSeek-OCR-2 recipe: https://docs.vllm.ai/projects/recipes/en/latest/DeepSeek/DeepSeek-OCR-2.html +- Ottoman Turkish periodical transcription case study: https://arxiv.org/abs/2011.01139 +- Kraken documentation: https://kraken.re/main/ +- Arabic-script OCR with Kraken case study: https://arxiv.org/abs/2402.10943 +- Historical OCR error-pattern study: https://arxiv.org/abs/2602.14524 +- Historical newspaper OCR ground-truth example: https://lab.kb.nl/dataset/historical-newspapers-ocr-ground-truth +- Printed Ottoman Turkish OCR study: https://ideas.repec.org/a/tec/techni/v18y2023i1p47-64.html diff --git a/ocr-model/Dockerfile b/ocr-model/Dockerfile index 6cbbdde..809f827 100644 --- a/ocr-model/Dockerfile +++ b/ocr-model/Dockerfile @@ -1,7 +1,7 @@ -FROM vllm/vllm-openai:latest +FROM vllm/vllm-openai:cu129-nightly ENTRYPOINT ["python3", "-m", "vllm.entrypoints.openai.api_server"] CMD [ \ - "--model", "deepseek-ai/DeepSeek-OCR", \ + "--model", "deepseek-ai/DeepSeek-OCR-2", \ "--trust-remote-code", \ "--logits-processors", "vllm.model_executor.models.deepseek_ocr:NGramPerReqLogitsProcessor", \ "--no-enable-prefix-caching", \ diff --git a/ocr_pipeline/Dockerfile b/ocr_pipeline/Dockerfile index 71e5ba0..53824e5 100644 --- a/ocr_pipeline/Dockerfile +++ b/ocr_pipeline/Dockerfile @@ -20,4 +20,4 @@ ENV PYTHONPATH=/app EXPOSE 39672 -CMD ["uvicorn", "ocr_pipeline.main:app", "--host", "0.0.0.0", "--port", "39672"] +CMD ["uvicorn", "ocr_pipeline.main:app", "--host", "0.0.0.0", "--port", "39672", "--no-access-log"] diff --git a/ocr_pipeline/Dockerfile.cpu b/ocr_pipeline/Dockerfile.cpu deleted file mode 100644 index 7b1aedb..0000000 --- a/ocr_pipeline/Dockerfile.cpu +++ /dev/null @@ -1,34 +0,0 @@ -# CPU-only image: pipeline + in-process transformers backend. -# No vLLM, no NVIDIA runtime needed. Builds on any host. -# -# Build context is the repo root (set by docker-compose.yml) so we can pull in -# requirements-local.txt alongside the pipeline source. -# -# The image is ~3 GB because it bundles torch + transformers; the model -# weights themselves (~6 GB) download on first request and cache to the -# hf-cache volume. -FROM python:3.12-slim - -RUN apt-get update && apt-get install -y --no-install-recommends \ - poppler-utils \ - git \ - && rm -rf /var/lib/apt/lists/* - -WORKDIR /app - -COPY ocr_pipeline/requirements.txt /tmp/requirements.txt -COPY requirements-local.txt /tmp/requirements-local.txt -# `--extra-index-url` pulls the CPU-only torch wheel. -RUN pip install --no-cache-dir \ - --extra-index-url https://download.pytorch.org/whl/cpu \ - -r /tmp/requirements.txt -r /tmp/requirements-local.txt - -COPY ocr_pipeline /app/ocr_pipeline/ - -ENV PYTHONPATH=/app -ENV MODEL_BACKEND=local -ENV LOCAL_DEVICE=cpu - -EXPOSE 39672 - -CMD ["uvicorn", "ocr_pipeline.main:app", "--host", "0.0.0.0", "--port", "39672"] diff --git a/ocr_pipeline/config.py b/ocr_pipeline/config.py index 4e4a019..831459c 100644 --- a/ocr_pipeline/config.py +++ b/ocr_pipeline/config.py @@ -19,24 +19,19 @@ def _default_output_dir() -> Path: class Settings(BaseSettings): # Model backend selection - # - "vllm" / "remote": call any OpenAI-compatible /v1/chat/completions server - # - "local" / "transformers": load DeepSeek-OCR in-process via transformers (Mac/CPU) - model_backend: Literal["vllm", "remote", "local", "transformers"] = "vllm" + # - "vllm" / "remote": call any OpenAI-compatible /v1/chat/completions server. + # OpenCR is GPU-first; local Apple/CPU transformers inference is not shipped. + model_backend: Literal["vllm", "remote"] = "vllm" model_server_url: str = "http://ocr-model:39671" - model_name: str = "deepseek-ai/DeepSeek-OCR" + model_name: str = "deepseek-ai/DeepSeek-OCR-2" model_api_key: str = "EMPTY" model_timeout: float = 120.0 - # Local backend (Apple Silicon / CPU) - local_device: Literal["auto", "mps", "cuda", "cpu"] = "auto" - local_dtype: Literal["auto", "float16", "bfloat16", "float32"] = "auto" - local_model_cache: Path = Path.home() / ".cache" / "huggingface" - # Startup readiness (used by the remote backend) model_ready_timeout: int = 300 model_ready_interval: int = 5 - # NGram processor defaults (vLLM-only feature; ignored by local backend) + # NGram processor defaults for DeepSeek-OCR-2 on vLLM. ngram_size: int = 30 window_size: int = 90 whitelist_token_ids: list[int] = [128821, 128822] # , @@ -59,6 +54,7 @@ class Settings(BaseSettings): # Server host: str = "0.0.0.0" port: int = 39672 + log_level: str = "INFO" # Pipeline pipeline_version: str = "2.0.0" @@ -76,9 +72,5 @@ class Settings(BaseSettings): model_config = {"env_prefix": "", "case_sensitive": False, "extra": "ignore"} - @property - def is_local_backend(self) -> bool: - return self.model_backend in ("local", "transformers") - settings = Settings() diff --git a/ocr_pipeline/main.py b/ocr_pipeline/main.py index 8a69d6f..dc13370 100644 --- a/ocr_pipeline/main.py +++ b/ocr_pipeline/main.py @@ -9,29 +9,45 @@ from starlette.middleware.sessions import SessionMiddleware from ocr_pipeline.config import settings -from ocr_pipeline.routers import auth, health, extract, jobs, metrics, runs, ui +from ocr_pipeline.routers import ( + auth, + documents, + health, + extract, + jobs, + metrics, + runs, + ui, +) from ocr_pipeline.services.db import init_database from ocr_pipeline.services.run_orchestrator import init_orchestrator from ocr_pipeline.services.run_storage import RunStorage from ocr_pipeline.services.startup import wait_for_model_server logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + level=getattr(logging, settings.log_level.upper(), logging.INFO), + format="%(asctime)s %(levelname)-7s %(name)s :: %(message)s", ) +for noisy_logger in ("httpx", "httpcore", "urllib3", "huggingface_hub"): + logging.getLogger(noisy_logger).setLevel(logging.WARNING) logger = logging.getLogger("ocr_pipeline") @asynccontextmanager async def lifespan(app: FastAPI): logger.info("OpenCR v%s starting (cdli.ai)", settings.pipeline_version) - logger.info("Model server: %s | Model: %s", settings.model_server_url, settings.model_name) + logger.info( + "Model server: %s | Model: %s", settings.model_server_url, settings.model_name + ) db = init_database(settings.db_path) await db.connect() orphans = await db.fail_orphan_runs() if orphans: logger.warning("Marked %d orphan run(s) as failed (process restart).", orphans) + failed_docs = await db.fail_documents_for_failed_runs() + if failed_docs: + logger.warning("Marked %d incomplete document(s) in failed runs.", failed_docs) storage = RunStorage(output_root=settings.output_dir, runs_root=settings.runs_dir) init_orchestrator(db, storage) @@ -39,7 +55,9 @@ async def lifespan(app: FastAPI): if await wait_for_model_server(): logger.info("Pipeline ready to accept requests.") else: - logger.warning("Model server not ready — extraction requests will 503 until it is available.") + logger.warning( + "Model server not ready — extraction requests will 503 until it is available." + ) yield @@ -55,7 +73,10 @@ async def lifespan(app: FastAPI): ), version=settings.pipeline_version, contact={"name": "cdli.ai", "url": "https://cdli.ai"}, - license_info={"name": "Apache-2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0"}, + license_info={ + "name": "Apache-2.0", + "url": "https://www.apache.org/licenses/LICENSE-2.0", + }, lifespan=lifespan, ) @@ -71,7 +92,7 @@ async def lifespan(app: FastAPI): https_only=False, ) -for r in (health, extract, jobs, runs, metrics, ui, auth): +for r in (health, extract, jobs, runs, documents, metrics, ui, auth): app.include_router(r.router) _static_dir = Path(__file__).parent / "static" @@ -86,4 +107,7 @@ async def serve_index(): if __name__ == "__main__": import uvicorn - uvicorn.run("ocr_pipeline.main:app", host=settings.host, port=settings.port, reload=False) + + uvicorn.run( + "ocr_pipeline.main:app", host=settings.host, port=settings.port, reload=False + ) diff --git a/ocr_pipeline/models/metadata.py b/ocr_pipeline/models/metadata.py index 9724c27..59eee67 100644 --- a/ocr_pipeline/models/metadata.py +++ b/ocr_pipeline/models/metadata.py @@ -44,6 +44,7 @@ class PageMetadata: page_height: float image_count: int estimated_complexity: str + quality_flags: list[str] = field(default_factory=list) @dataclass diff --git a/ocr_pipeline/models/schemas.py b/ocr_pipeline/models/schemas.py index 635cc3f..8907197 100644 --- a/ocr_pipeline/models/schemas.py +++ b/ocr_pipeline/models/schemas.py @@ -1,17 +1,25 @@ from pydantic import BaseModel, Field -from typing import Any, Optional +from typing import Optional class ExtractRequest(BaseModel): """Single PDF extraction request.""" + file_path: str = Field(description="Path to the PDF file") - output_dir: Optional[str] = Field(None, description="Output directory override (deprecated)") - strip_refs: bool = Field(False, description="Strip model reference blocks from output") - export_parquet: bool = Field(False, description="Export trainable Parquet artifacts") + output_dir: Optional[str] = Field( + None, description="Output directory override (deprecated)" + ) + strip_refs: bool = Field( + False, description="Strip model reference blocks from output" + ) + export_parquet: bool = Field( + False, description="Export trainable Parquet artifacts" + ) class ExtractResponse(BaseModel): """Single PDF extraction response.""" + run_id: str document_id: str filename: str @@ -30,10 +38,17 @@ class ExtractResponse(BaseModel): class JobRequest(BaseModel): """Batch extraction job request (compatibility wrapper around runs).""" + file_paths: list[str] = Field(description="List of PDF file paths to process") - output_dir: Optional[str] = Field(None, description="Output directory override (deprecated)") - strip_refs: bool = Field(False, description="Strip model reference blocks (bounding boxes) from output") - export_parquet: bool = Field(False, description="Export a trainable Parquet bundle for the job") + output_dir: Optional[str] = Field( + None, description="Output directory override (deprecated)" + ) + strip_refs: bool = Field( + False, description="Strip model reference blocks (bounding boxes) from output" + ) + export_parquet: bool = Field( + False, description="Export a trainable Parquet bundle for the job" + ) name: Optional[str] = Field(None, description="Optional human-friendly name") @@ -68,6 +83,54 @@ class FileInfo(BaseModel): path: str +class DocumentUpdate(BaseModel): + display_title: Optional[str] = None + group_path: Optional[str] = None + author: Optional[str] = None + work: Optional[str] = None + book: Optional[str] = None + document_date_label: Optional[str] = None + document_date_precision: Optional[str] = None + language: Optional[str] = None + script: Optional[str] = None + license: Optional[str] = None + source_citation: Optional[str] = None + notes: Optional[str] = None + tags_json: Optional[str] = None + + +class BulkDocumentUpdate(BaseModel): + document_ids: list[str] + group_path: Optional[str] = None + + +class DocumentSummary(BaseModel): + id: str + filename: str + display_title: str + group_path: Optional[str] = None + source_path: str + file_sha256: str + file_size_bytes: int + total_pages: Optional[int] = None + pdf_title: Optional[str] = None + pdf_author: Optional[str] = None + author: Optional[str] = None + work: Optional[str] = None + book: Optional[str] = None + document_date_label: Optional[str] = None + document_date_precision: Optional[str] = None + language: Optional[str] = None + script: Optional[str] = None + license: Optional[str] = None + source_citation: Optional[str] = None + notes: Optional[str] = None + tags_json: Optional[str] = None + metadata_complete: bool = False + latest_run_id: Optional[str] = None + latest_run_status: Optional[str] = None + + class StagedDocumentInfo(BaseModel): document_id: str filename: str @@ -146,6 +209,7 @@ class PageSummary(BaseModel): page_num: int status: str validation_issues: list[str] = Field(default_factory=list) + quality_flags: list[str] = Field(default_factory=list) script_direction: Optional[str] = None primary_script: Optional[str] = None detected_languages: list[str] = Field(default_factory=list) @@ -165,9 +229,13 @@ class RunDocumentDetail(RunDocumentSummary): class HFPublishRequest(BaseModel): - repo_id: str = Field(description="HuggingFace dataset repo (e.g. user/my-ocr-dataset)") + repo_id: str = Field( + description="HuggingFace dataset repo (e.g. user/my-ocr-dataset)" + ) private: bool = False - token: Optional[str] = Field(None, description="HF token; if absent, uses HF_TOKEN env") + token: Optional[str] = Field( + None, description="HF token; if absent, uses HF_TOKEN env" + ) commit_message: Optional[str] = None diff --git a/ocr_pipeline/routers/documents.py b/ocr_pipeline/routers/documents.py new file mode 100644 index 0000000..df4d731 --- /dev/null +++ b/ocr_pipeline/routers/documents.py @@ -0,0 +1,80 @@ +from fastapi import APIRouter, HTTPException, Path as PathParam, Query + +from ocr_pipeline.models.schemas import ( + BulkDocumentUpdate, + DocumentSummary, + DocumentUpdate, + RunSummary, +) +from ocr_pipeline.routers.runs import _run_summary +from ocr_pipeline.services.db import get_db + + +router = APIRouter() + +ID = PathParam(..., pattern=r"^[A-Za-z0-9_\-]{1,64}$") + + +def _document_summary(row: dict) -> DocumentSummary: + data = dict(row) + data["display_title"] = ( + data.get("display_title") or data.get("pdf_title") or data["filename"] + ) + data["metadata_complete"] = bool(data.get("metadata_complete")) + return DocumentSummary(**data) + + +@router.get("/api/documents", response_model=list[DocumentSummary]) +async def list_documents(limit: int = Query(500, ge=1, le=1000)): + return [_document_summary(d) for d in await get_db().list_documents(limit=limit)] + + +@router.patch("/api/documents/bulk", response_model=list[DocumentSummary]) +async def update_documents_bulk(payload: BulkDocumentUpdate): + if not payload.document_ids: + raise HTTPException(status_code=400, detail="document_ids must not be empty") + db = get_db() + try: + await db.update_documents_metadata( + payload.document_ids, + group_path=payload.group_path, + ) + except KeyError as exc: + raise HTTPException( + status_code=404, detail=f"Document not found: {exc.args[0]}" + ) + documents = await db.list_documents(limit=1000) + by_id = {doc["id"]: doc for doc in documents} + return [ + _document_summary(by_id[document_id]) for document_id in payload.document_ids + ] + + +@router.get("/api/documents/{document_id}", response_model=DocumentSummary) +async def get_document(document_id: str = ID): + doc = await get_db().get_document(document_id) + if not doc: + raise HTTPException(status_code=404, detail="Document not found") + listed = [ + d for d in await get_db().list_documents(limit=1000) if d["id"] == document_id + ] + return _document_summary(listed[0] if listed else doc) + + +@router.patch("/api/documents/{document_id}", response_model=DocumentSummary) +async def update_document(payload: DocumentUpdate, document_id: str = ID): + try: + await get_db().update_document_metadata( + document_id, + **payload.model_dump(exclude_unset=True), + ) + except KeyError: + raise HTTPException(status_code=404, detail="Document not found") + return await get_document(document_id) + + +@router.get("/api/documents/{document_id}/runs", response_model=list[RunSummary]) +async def list_document_runs(document_id: str = ID): + if not await get_db().get_document(document_id): + raise HTTPException(status_code=404, detail="Document not found") + return [_run_summary(r) for r in await get_db().list_document_runs(document_id)] diff --git a/ocr_pipeline/routers/health.py b/ocr_pipeline/routers/health.py index 13878ad..cad059c 100644 --- a/ocr_pipeline/routers/health.py +++ b/ocr_pipeline/routers/health.py @@ -12,7 +12,7 @@ async def health_check(): status = "ready" if model_readiness.ready else "waiting" - + resp = HealthResponse( status=status, pipeline_version=settings.pipeline_version, diff --git a/ocr_pipeline/routers/runs.py b/ocr_pipeline/routers/runs.py index 5f3e4df..daeedab 100644 --- a/ocr_pipeline/routers/runs.py +++ b/ocr_pipeline/routers/runs.py @@ -1,23 +1,42 @@ import asyncio +import hashlib import json from io import BytesIO from pathlib import Path from fastapi import APIRouter, HTTPException, Path as PathParam, Query, Request -from fastapi.responses import FileResponse, PlainTextResponse, Response, StreamingResponse +from fastapi.responses import ( + FileResponse, + PlainTextResponse, + Response, + StreamingResponse, +) from ocr_pipeline.config import settings from ocr_pipeline.models.schemas import ( - HFPublishRequest, HFPublishResponse, PageSummary, RunCreateRequest, - RunCreateResponse, RunDetail, RunDocumentDetail, RunDocumentSummary, - RunSummary, StagedDocumentInfo, + HFPublishRequest, + HFPublishResponse, + PageSummary, + RunCreateRequest, + RunCreateResponse, + RunDetail, + RunDocumentDetail, + RunDocumentSummary, + RunSummary, + StagedDocumentInfo, +) +from ocr_pipeline.services.auth_session import ( + is_oauth_enabled, + session_token, + session_user, ) -from ocr_pipeline.services.auth_session import is_oauth_enabled, session_token, session_user from ocr_pipeline.services.db import get_db from ocr_pipeline.services.hf_publisher import publish_run_to_hf +from ocr_pipeline.services.ocr_pair_exporter import OCRPairExporter from ocr_pipeline.services.pdf_renderer import PDFRenderer from ocr_pipeline.services.run_orchestrator import get_orchestrator from ocr_pipeline.services.startup import model_readiness +from ocr_pipeline.services.text_bundle_exporter import TextBundleExporter router = APIRouter() @@ -33,9 +52,12 @@ "source": ("artifact_source_pdf", "application/pdf"), } TEXT_MODES = { - "raw": "artifact_raw_txt", "raw_txt": "artifact_raw_txt", - "txt": "artifact_clean_txt", "clean": "artifact_clean_txt", - "md": "artifact_markdown", "markdown": "artifact_markdown", + "raw": "artifact_raw_txt", + "raw_txt": "artifact_raw_txt", + "txt": "artifact_clean_txt", + "clean": "artifact_clean_txt", + "md": "artifact_markdown", + "markdown": "artifact_markdown", } @@ -103,10 +125,12 @@ def _doc_summary(row: dict) -> RunDocumentSummary: def _page_summary(row: dict) -> PageSummary: def _bool(v): return bool(v) if v is not None else None + return PageSummary( page_num=row["page_num"], status=row["status"], validation_issues=_parse_str_list(row.get("validation_issues")), + quality_flags=_parse_str_list(row.get("quality_flags")), script_direction=row.get("script_direction"), primary_script=row.get("primary_script"), detected_languages=_parse_str_list(row.get("detected_languages")), @@ -149,7 +173,9 @@ def _existing_path(rd: dict, field: str) -> Path: @router.post("/api/runs", response_model=RunCreateResponse) async def create_run(request: RunCreateRequest): if not model_readiness.ready: - raise HTTPException(status_code=503, detail=f"Model server not ready: {model_readiness.status}") + raise HTTPException( + status_code=503, detail=f"Model server not ready: {model_readiness.status}" + ) if not request.file_paths: raise HTTPException(status_code=400, detail="file_paths must not be empty") @@ -164,7 +190,40 @@ async def create_run(request: RunCreateRequest): except FileNotFoundError as exc: raise HTTPException(status_code=404, detail=str(exc)) - orchestrator.start(result, strip_refs=request.strip_refs, export_parquet=request.export_parquet) + orchestrator.start( + result, strip_refs=request.strip_refs, export_parquet=request.export_parquet + ) + + return RunCreateResponse( + run_id=result.run_id, + status="queued", + documents_total=len(result.documents), + pages_total_estimate=result.pages_total_estimate, + documents=[ + StagedDocumentInfo( + document_id=d.document_id, + filename=d.filename, + file_sha256=d.file_sha256, + deduped=d.deduped, + estimated_pages=d.estimated_pages, + ) + for d in result.documents + ], + ) + + +@router.post("/api/runs/{run_id}/retry", response_model=RunCreateResponse) +async def retry_run(run_id: str = ID): + if not model_readiness.ready: + raise HTTPException( + status_code=503, detail=f"Model server not ready: {model_readiness.status}" + ) + try: + result = await get_orchestrator().retry_incomplete_run(run_id) + except KeyError: + raise HTTPException(status_code=404, detail="Run not found") + except ValueError as exc: + raise HTTPException(status_code=409, detail=str(exc)) return RunCreateResponse( run_id=result.run_id, @@ -193,29 +252,37 @@ async def list_runs(limit: int = Query(50, ge=1, le=500)): async def get_run(run_id: str = ID): run = await _require_run(run_id) documents = await get_db().list_run_documents(run_id) - return RunDetail(**_run_summary(run).model_dump(), - documents=[_doc_summary(d) for d in documents]) + return RunDetail( + **_run_summary(run).model_dump(), documents=[_doc_summary(d) for d in documents] + ) @router.delete("/api/runs/{run_id}") async def delete_run(run_id: str = ID): run = await _require_run(run_id) if run["status"] == "processing": - raise HTTPException(status_code=409, detail="Cannot delete a run that is still processing") + raise HTTPException( + status_code=409, detail="Cannot delete a run that is still processing" + ) await get_db().delete_run(run_id) return {"deleted": run_id} -@router.get("/api/runs/{run_id}/documents/{document_id}", response_model=RunDocumentDetail) +@router.get( + "/api/runs/{run_id}/documents/{document_id}", response_model=RunDocumentDetail +) async def get_run_document(run_id: str = ID, document_id: str = ID): rd = await _require_doc(run_id, document_id) pages = await get_db().list_pages(run_id, document_id) - return RunDocumentDetail(**_doc_summary(rd).model_dump(), - pages=[_page_summary(p) for p in pages]) + return RunDocumentDetail( + **_doc_summary(rd).model_dump(), pages=[_page_summary(p) for p in pages] + ) @router.get("/api/runs/{run_id}/documents/{document_id}/text") -async def get_run_document_text(run_id: str = ID, document_id: str = ID, mode: str = "txt"): +async def get_run_document_text( + run_id: str = ID, document_id: str = ID, mode: str = "txt" +): field = TEXT_MODES.get(mode) if not field: raise HTTPException(status_code=400, detail="Unsupported mode") @@ -267,7 +334,138 @@ async def download_dataset_bundle(run_id: str = ID): bundle = run.get("dataset_bundle") if not bundle or not Path(bundle).exists(): raise HTTPException(status_code=404, detail="Dataset bundle not available") - return FileResponse(bundle, media_type="application/zip", filename=Path(bundle).name) + return FileResponse( + bundle, media_type="application/zip", filename=Path(bundle).name + ) + + +@router.get("/api/runs/{run_id}/ocr-pairs/download") +async def download_ocr_pairs( + run_id: str = ID, + dpi: int = Query(160, ge=50, le=400), + text_mode: str = Query("clean", pattern="^(clean|raw)$"), + document_ids: str | None = Query(None), +): + db = get_db() + run = await _require_run(run_id) + if run["status"] != "completed": + raise HTTPException(status_code=409, detail="Run is not yet completed") + + documents = await db.list_run_documents(run_id) + selected_ids = { + part.strip() for part in (document_ids or "").split(",") if part.strip() + } or None + if selected_ids: + available_ids = {doc["document_id"] for doc in documents} + missing = selected_ids - available_ids + if missing: + raise HTTPException( + status_code=404, + detail=f"Document not found in run: {sorted(missing)[0]}", + ) + pages_by_document = { + doc["document_id"]: await db.list_pages(run_id, doc["document_id"]) + for doc in documents + } + catalog_by_document = { + doc["document_id"]: await db.get_document(doc["document_id"]) or {} + for doc in documents + } + scope = "all" + if selected_ids: + scope = hashlib.sha256( + ",".join(sorted(selected_ids)).encode("utf-8") + ).hexdigest()[:12] + export_dir = ( + settings.runs_dir / run_id / "dataset" / f"ocr_pairs_{text_mode}_{dpi}_{scope}" + ) + cached_bundle = export_dir.with_suffix(".zip") + if cached_bundle.exists(): + return FileResponse( + cached_bundle, + media_type="application/zip", + filename=f"{run_id}-ocr-pairs.zip", + ) + exporter = OCRPairExporter(export_dir) + result = await asyncio.to_thread( + exporter.export_run, + run=run, + documents=documents, + pages_by_document=pages_by_document, + catalog_by_document=catalog_by_document, + document_ids=selected_ids, + dpi=dpi, + text_mode=text_mode, + ) + if result.pages_count == 0: + raise HTTPException(status_code=404, detail="No completed OCR pages to export") + return FileResponse( + result.bundle, + media_type="application/zip", + filename=f"{run_id}-ocr-pairs.zip", + ) + + +@router.get("/api/runs/{run_id}/text-bundle/download") +async def download_text_bundle( + run_id: str = ID, + document_ids: str | None = Query(None), +): + db = get_db() + run = await _require_run(run_id) + if run["status"] != "completed": + raise HTTPException(status_code=409, detail="Run is not yet completed") + + documents = await db.list_run_documents(run_id) + selected_ids = { + part.strip() for part in (document_ids or "").split(",") if part.strip() + } or None + if selected_ids: + available_ids = {doc["document_id"] for doc in documents} + missing = selected_ids - available_ids + if missing: + raise HTTPException( + status_code=404, + detail=f"Document not found in run: {sorted(missing)[0]}", + ) + + pages_by_document = { + doc["document_id"]: await db.list_pages(run_id, doc["document_id"]) + for doc in documents + } + catalog_by_document = { + doc["document_id"]: await db.get_document(doc["document_id"]) or {} + for doc in documents + } + scope = "all" + if selected_ids: + scope = hashlib.sha256( + ",".join(sorted(selected_ids)).encode("utf-8") + ).hexdigest()[:12] + export_dir = settings.runs_dir / run_id / "dataset" / f"text_bundle_{scope}" + cached_bundle = export_dir.with_suffix(".zip") + if cached_bundle.exists(): + return FileResponse( + cached_bundle, + media_type="application/zip", + filename=f"{run_id}-text-bundle.zip", + ) + + result = await asyncio.to_thread( + TextBundleExporter(export_dir).export_run, + run=run, + documents=documents, + pages_by_document=pages_by_document, + catalog_by_document=catalog_by_document, + document_ids=selected_ids, + ) + if result.pages_count == 0: + raise HTTPException(status_code=404, detail="No completed text to export") + return FileResponse( + result.bundle, + media_type="application/zip", + filename=f"{run_id}-text-bundle.zip", + ) @router.get("/api/runs/{run_id}/stream") @@ -276,14 +474,18 @@ async def stream_run(run_id: str = ID, after_event_id: int = 0): orchestrator = get_orchestrator() async def gen(): - async for event in orchestrator.subscribe(run_id, after_event_id=after_event_id): + async for event in orchestrator.subscribe( + run_id, after_event_id=after_event_id + ): yield f"data: {json.dumps(event, ensure_ascii=False, default=str)}\n\n" return StreamingResponse(gen(), media_type="text/event-stream") @router.post("/api/runs/{run_id}/publish/hf", response_model=HFPublishResponse) -async def publish_to_hf(payload: HFPublishRequest, http_request: Request, run_id: str = ID): +async def publish_to_hf( + payload: HFPublishRequest, http_request: Request, run_id: str = ID +): db = get_db() run = await _require_run(run_id) if run["status"] != "completed": @@ -293,7 +495,9 @@ async def publish_to_hf(payload: HFPublishRequest, http_request: Request, run_id # 1. signed-in user's HF OAuth token (preferred — tied to a real user) # 2. token explicitly passed in the request body (paste-token mode) # 3. HF_TOKEN env var (single-user / dev fallback, resolved inside publisher) - user = session_user(http_request.session) if hasattr(http_request, "session") else None + user = ( + session_user(http_request.session) if hasattr(http_request, "session") else None + ) sess_tok = session_token(http_request.session) if user else None # If OAuth is enabled and the user is signed in, ignore the body token — @@ -302,7 +506,9 @@ async def publish_to_hf(payload: HFPublishRequest, http_request: Request, run_id # publishes entirely so the panel acts as a true gate. if is_oauth_enabled(): if not sess_tok: - raise HTTPException(status_code=401, detail="Sign in with HuggingFace to publish.") + raise HTTPException( + status_code=401, detail="Sign in with HuggingFace to publish." + ) token = sess_tok else: token = payload.token diff --git a/ocr_pipeline/routers/ui.py b/ocr_pipeline/routers/ui.py index 451264e..dfe7ada 100644 --- a/ocr_pipeline/routers/ui.py +++ b/ocr_pipeline/routers/ui.py @@ -1,11 +1,15 @@ """Static-friendly endpoints for input file management. Output/dataset listing moved to /api/runs.""" + +import hashlib from pathlib import Path from fastapi import APIRouter, HTTPException, UploadFile from ocr_pipeline.config import settings from ocr_pipeline.models.schemas import FileInfo +from ocr_pipeline.services.db import get_db +from ocr_pipeline.services.document_catalog import catalog_pdf router = APIRouter() @@ -22,11 +26,18 @@ async def upload_pdf(file: UploadFile): raise HTTPException(status_code=400, detail="Invalid filename") settings.input_dir.mkdir(parents=True, exist_ok=True) - dest = settings.input_dir / safe_name content = await file.read() + digest = hashlib.sha256(content).hexdigest() + dest = settings.input_dir / f"{digest[:16]}__{safe_name}" dest.write_bytes(content) + await catalog_pdf(get_db(), dest, filename=safe_name) - return {"filename": safe_name, "size": len(content), "path": str(dest)} + return { + "filename": safe_name, + "stored_filename": dest.name, + "size": len(content), + "path": str(dest), + } @router.get("/api/files/input", response_model=list[FileInfo]) @@ -36,14 +47,20 @@ async def list_input_files(): if not input_dir.exists(): return [] + documents_by_path = { + doc["source_path"]: doc for doc in await get_db().list_documents(limit=1000) + } files = [] for p in sorted(input_dir.iterdir()): if p.is_file() and p.suffix.lower() == ".pdf": stat = p.stat() - files.append(FileInfo( - name=p.name, - size=stat.st_size, - modified=stat.st_mtime, - path=str(p), - )) + document = documents_by_path.get(str(p)) + files.append( + FileInfo( + name=document["filename"] if document else p.name, + size=stat.st_size, + modified=stat.st_mtime, + path=str(p), + ) + ) return files diff --git a/ocr_pipeline/services/batch_processor.py b/ocr_pipeline/services/batch_processor.py index e810a0d..ac8bd88 100644 --- a/ocr_pipeline/services/batch_processor.py +++ b/ocr_pipeline/services/batch_processor.py @@ -33,7 +33,7 @@ PAGE_DB_FIELDS = ( "validation_issues", "script_direction", "primary_script", "detected_languages", "token_count_cl100k", "text_length_chars", "text_length_words", - "extraction_mode", "extraction_attempt", "dpi_used", + "extraction_mode", "extraction_attempt", "dpi_used", "quality_flags", "has_embedded_text", "is_image_only", ) @@ -188,9 +188,10 @@ async def process_document( run_id: str, document_id: str, file_sha256: str, + filename: str | None = None, artifact_paths: ArtifactPaths, ) -> DocumentMetadata: - filename = pdf_path.name + filename = filename or pdf_path.name file_size = (await asyncio.to_thread(pdf_path.stat)).st_size started_at = datetime.now(timezone.utc).isoformat() diff --git a/ocr_pipeline/services/dataset_exporter.py b/ocr_pipeline/services/dataset_exporter.py index 67394b4..66db49b 100644 --- a/ocr_pipeline/services/dataset_exporter.py +++ b/ocr_pipeline/services/dataset_exporter.py @@ -1,7 +1,7 @@ import hashlib import json import zipfile -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from pathlib import Path import pyarrow as pa @@ -12,6 +12,14 @@ from ocr_pipeline.services.run_storage import ArtifactPaths +PROJECT_METADATA = { + "project": "opencr", + "generator": "OpenCR", + "organization": "cdli.ai", + "organization_url": "https://cdli.ai", +} + + @dataclass class DatasetExportResult: export_id: str @@ -29,6 +37,7 @@ class DocumentExport: metadata: DocumentMetadata document_id: str artifact_paths: ArtifactPaths + catalog_metadata: dict = field(default_factory=dict) class DatasetExporter: @@ -46,13 +55,27 @@ def _split_pages(text: str, total_pages: int) -> list[str]: @staticmethod def _split_name(stable_key: str) -> str: - bucket = int(hashlib.sha256(stable_key.encode("utf-8")).hexdigest()[:8], 16) % 100 + bucket = ( + int(hashlib.sha256(stable_key.encode("utf-8")).hexdigest()[:8], 16) % 100 + ) if bucket < 90: return "train" if bucket < 95: return "validation" return "test" + @staticmethod + def _language_list(value) -> list[str]: + if isinstance(value, list): + return [str(v).strip() for v in value if str(v).strip()] + if not value: + return [] + return [part.strip() for part in str(value).split(",") if part.strip()] + + @staticmethod + def _text_sha256(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest() + def export_run( self, run_id: str, @@ -66,10 +89,23 @@ def export_run( for entry in documents: doc_meta = entry.metadata + catalog = entry.catalog_metadata or {} paths = entry.artifact_paths - raw_text = paths.raw_txt.read_text(encoding="utf-8") if paths.raw_txt.exists() else "" - clean_text = paths.clean_txt.read_text(encoding="utf-8") if paths.clean_txt.exists() else "" - markdown_text = paths.markdown.read_text(encoding="utf-8") if paths.markdown.exists() else "" + raw_text = ( + paths.raw_txt.read_text(encoding="utf-8") + if paths.raw_txt.exists() + else "" + ) + clean_text = ( + paths.clean_txt.read_text(encoding="utf-8") + if paths.clean_txt.exists() + else "" + ) + markdown_text = ( + paths.markdown.read_text(encoding="utf-8") + if paths.markdown.exists() + else "" + ) raw_pages = self._split_pages(raw_text, doc_meta.total_pages) clean_pages = self._split_pages(clean_text, doc_meta.total_pages) split = self._split_name(doc_meta.file_sha256) @@ -77,16 +113,36 @@ def export_run( for page_meta, page_raw_text, page_clean_text in zip( doc_meta.pages, raw_pages, clean_pages ): + page_id = f"{entry.document_id}_page_{page_meta.page_num:04d}" page_rows.append( { "dataset_export_id": export_id, "run_id": run_id, + "page_id": page_id, "document_id": entry.document_id, "document_name": doc_meta.filename, + "title": catalog.get("display_title") + or catalog.get("title") + or doc_meta.pdf_title, + "group_path": catalog.get("group_path"), + "author": catalog.get("author") or doc_meta.pdf_author, + "work": catalog.get("work"), + "book": catalog.get("book"), + "document_date_label": catalog.get("document_date_label"), + "document_date_precision": catalog.get( + "document_date_precision" + ), + "language": self._language_list(catalog.get("language")) + or page_meta.detected_languages, + "script": catalog.get("script") or page_meta.primary_script, + "license": catalog.get("license"), + "source_citation": catalog.get("source_citation"), "page_number": page_meta.page_num, "source_pdf_sha256": doc_meta.file_sha256, "raw_text": page_raw_text, "clean_text": page_clean_text, + "raw_text_sha256": self._text_sha256(page_raw_text), + "clean_text_sha256": self._text_sha256(page_clean_text), "validation_status": page_meta.validation_status, "validation_issues": page_meta.validation_issues, "script_direction": page_meta.script_direction, @@ -110,10 +166,27 @@ def export_run( "run_id": run_id, "document_id": entry.document_id, "document_name": doc_meta.filename, + "title": catalog.get("display_title") + or catalog.get("title") + or doc_meta.pdf_title, + "group_path": catalog.get("group_path"), + "author": catalog.get("author") or doc_meta.pdf_author, + "work": catalog.get("work"), + "book": catalog.get("book"), + "document_date_label": catalog.get("document_date_label"), + "document_date_precision": catalog.get("document_date_precision"), + "language": self._language_list(catalog.get("language")) + or doc_meta.languages_detected, + "script": catalog.get("script") or doc_meta.dominant_script, + "license": catalog.get("license"), + "source_citation": catalog.get("source_citation"), + "notes": catalog.get("notes"), "source_pdf_sha256": doc_meta.file_sha256, "page_count": doc_meta.total_pages, "raw_text": raw_text, "clean_text": clean_text, + "raw_text_sha256": self._text_sha256(raw_text), + "clean_text_sha256": self._text_sha256(clean_text), "markdown": markdown_text, "pages_pass": doc_meta.pages_pass, "pages_warn": doc_meta.pages_warn, @@ -140,6 +213,7 @@ def export_run( manifest_payload = { "export_id": export_id, "run_id": run_id, + "created_by": PROJECT_METADATA, "documents_count": len(document_rows), "pages_count": len(page_rows), "artifacts": { diff --git a/ocr_pipeline/services/db.py b/ocr_pipeline/services/db.py index 9b6d3eb..44bd0a1 100644 --- a/ocr_pipeline/services/db.py +++ b/ocr_pipeline/services/db.py @@ -10,6 +10,44 @@ logger = logging.getLogger("ocr_pipeline.db") +DOCUMENT_METADATA_FIELDS = { + "display_title", + "group_path", + "author", + "work", + "book", + "document_date_label", + "document_date_precision", + "language", + "script", + "license", + "source_citation", + "notes", + "tags_json", +} + +DOCUMENT_METADATA_COLUMNS = { + "display_title": "TEXT", + "group_path": "TEXT", + "author": "TEXT", + "work": "TEXT", + "book": "TEXT", + "document_date_label": "TEXT", + "document_date_precision": "TEXT", + "language": "TEXT", + "script": "TEXT", + "license": "TEXT", + "source_citation": "TEXT", + "notes": "TEXT", + "tags_json": "TEXT", + "catalog_updated_at": "TEXT", +} + +PAGE_METADATA_COLUMNS = { + "quality_flags": "TEXT", +} + + SCHEMA = """ CREATE TABLE IF NOT EXISTS runs ( id TEXT PRIMARY KEY, @@ -43,6 +81,20 @@ pdf_author TEXT, pdf_creation_date TEXT, pdf_producer TEXT, + display_title TEXT, + group_path TEXT, + author TEXT, + work TEXT, + book TEXT, + document_date_label TEXT, + document_date_precision TEXT, + language TEXT, + script TEXT, + license TEXT, + source_citation TEXT, + notes TEXT, + tags_json TEXT, + catalog_updated_at TEXT, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL ); @@ -90,6 +142,7 @@ extraction_mode TEXT, extraction_attempt INTEGER, dpi_used INTEGER, + quality_flags TEXT, has_embedded_text INTEGER, is_image_only INTEGER, PRIMARY KEY (run_id, document_id, page_num), @@ -139,6 +192,7 @@ async def connect(self) -> None: await self._conn.execute("PRAGMA journal_mode=WAL;") await self._conn.execute("PRAGMA foreign_keys=ON;") await self._conn.executescript(SCHEMA) + await self._migrate() await self._conn.commit() logger.info("Database ready at %s", self.db_path) @@ -153,6 +207,20 @@ def conn(self) -> aiosqlite.Connection: raise RuntimeError("Database not connected; call connect() first.") return self._conn + async def _migrate(self) -> None: + """Apply additive migrations for existing local SQLite catalogs.""" + await self._ensure_columns("documents", DOCUMENT_METADATA_COLUMNS) + await self._ensure_columns("pages", PAGE_METADATA_COLUMNS) + + async def _ensure_columns(self, table: str, columns: dict[str, str]) -> None: + async with self.conn.execute(f"PRAGMA table_info({table})") as cur: + existing = {row["name"] for row in await cur.fetchall()} + for name, column_type in columns.items(): + if name not in existing: + await self.conn.execute( + f"ALTER TABLE {table} ADD COLUMN {name} {column_type}" + ) + @asynccontextmanager async def cursor(self) -> AsyncIterator[aiosqlite.Cursor]: async with self.conn.cursor() as cur: @@ -209,7 +277,9 @@ async def update_run(self, run_id: str, **fields: Any) -> None: await self.conn.commit() async def get_run(self, run_id: str) -> Optional[dict[str, Any]]: - async with self.conn.execute("SELECT * FROM runs WHERE id = ?", (run_id,)) as cur: + async with self.conn.execute( + "SELECT * FROM runs WHERE id = ?", (run_id,) + ) as cur: row = await cur.fetchone() return _row_to_dict(row) @@ -227,6 +297,12 @@ async def delete_run(self, run_id: str) -> None: async def fail_orphan_runs(self) -> int: """Mark any run still in `processing`/`queued` as failed. Called once on startup so a crashed process does not leave runs visibly live.""" + async with self.conn.execute( + "SELECT id FROM runs WHERE status IN ('queued', 'processing')" + ) as cur: + run_ids = [row["id"] for row in await cur.fetchall()] + for run_id in run_ids: + await self.fail_incomplete_run_documents(run_id) cur = await self.conn.execute( """ UPDATE runs @@ -243,6 +319,35 @@ async def fail_orphan_runs(self) -> int: await cur.close() return affected + async def fail_incomplete_run_documents(self, run_id: str) -> None: + await self.conn.execute( + """ + UPDATE run_documents + SET status = 'failed', + completed_at = COALESCE(completed_at, ?) + WHERE run_id = ? + AND status != 'completed' + """, + (_now(), run_id), + ) + await self.conn.commit() + + async def fail_documents_for_failed_runs(self) -> int: + cur = await self.conn.execute( + """ + UPDATE run_documents + SET status = 'failed', + completed_at = COALESCE(completed_at, ?) + WHERE status != 'completed' + AND run_id IN (SELECT id FROM runs WHERE status = 'failed') + """, + (_now(),), + ) + await self.conn.commit() + affected = cur.rowcount or 0 + await cur.close() + return affected + # ---------- documents (content-addressed) ---------- async def upsert_document( @@ -269,6 +374,8 @@ async def upsert_document( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET filename = excluded.filename, + source_path = excluded.source_path, + file_size_bytes = excluded.file_size_bytes, last_seen_at = excluded.last_seen_at, total_pages = COALESCE(excluded.total_pages, documents.total_pages), pdf_title = COALESCE(excluded.pdf_title, documents.pdf_title), @@ -300,6 +407,122 @@ async def get_document(self, document_id: str) -> Optional[dict[str, Any]]: ) as cur: return _row_to_dict(await cur.fetchone()) + async def list_documents(self, limit: int = 500) -> list[dict[str, Any]]: + async with self.conn.execute( + """ + SELECT d.id, d.filename, d.source_path, d.file_sha256, d.file_size_bytes, + d.total_pages, d.pdf_title, d.pdf_author, d.pdf_creation_date, d.pdf_producer, + d.group_path, d.author, d.work, d.book, d.document_date_label, d.document_date_precision, + d.language, d.script, d.license, d.source_citation, d.notes, d.tags_json, + d.catalog_updated_at, d.first_seen_at, d.last_seen_at, + COALESCE(NULLIF(d.display_title, ''), NULLIF(d.pdf_title, ''), d.filename) + AS display_title, + CASE + WHEN COALESCE(d.author, '') != '' + AND COALESCE(d.work, '') != '' + AND COALESCE(d.document_date_label, '') != '' + AND COALESCE(d.document_date_precision, '') != '' + AND COALESCE(d.language, '') != '' + AND COALESCE(d.script, '') != '' + AND COALESCE(d.license, '') != '' + THEN 1 ELSE 0 + END AS metadata_complete, + ( + SELECT r.id + FROM run_documents rd + JOIN runs r ON r.id = rd.run_id + WHERE rd.document_id = d.id + ORDER BY r.created_at DESC + LIMIT 1 + ) AS latest_run_id, + ( + SELECT r.status + FROM run_documents rd + JOIN runs r ON r.id = rd.run_id + WHERE rd.document_id = d.id + ORDER BY r.created_at DESC + LIMIT 1 + ) AS latest_run_status + FROM documents d + ORDER BY d.last_seen_at DESC + LIMIT ? + """, + (limit,), + ) as cur: + rows = await cur.fetchall() + return [_row_to_dict(r) for r in rows] # type: ignore[misc] + + async def update_document_metadata( + self, document_id: str, **fields: Any + ) -> dict[str, Any]: + clean = {k: v for k, v in fields.items() if k in DOCUMENT_METADATA_FIELDS} + if clean: + clean["catalog_updated_at"] = _now() + cols = ", ".join(f"{k} = ?" for k in clean) + values = [*clean.values(), document_id] + cur = await self.conn.execute( + f"UPDATE documents SET {cols} WHERE id = ?", + values, + ) + await self.conn.commit() + affected = cur.rowcount or 0 + await cur.close() + if not affected: + raise KeyError(document_id) + + doc = await self.get_document(document_id) + if not doc: + raise KeyError(document_id) + return doc + + async def update_documents_metadata( + self, document_ids: list[str], **fields: Any + ) -> list[dict[str, Any]]: + if not document_ids: + return [] + clean = {k: v for k, v in fields.items() if k in DOCUMENT_METADATA_FIELDS} + placeholders = ", ".join("?" for _ in document_ids) + async with self.conn.execute( + f"SELECT id FROM documents WHERE id IN ({placeholders})", + document_ids, + ) as cur: + existing = {row["id"] for row in await cur.fetchall()} + missing = [ + document_id for document_id in document_ids if document_id not in existing + ] + if missing: + raise KeyError(missing[0]) + if clean: + clean["catalog_updated_at"] = _now() + cols = ", ".join(f"{k} = ?" for k in clean) + values = [*clean.values()] + for document_id in document_ids: + await self.conn.execute( + f"UPDATE documents SET {cols} WHERE id = ?", + [*values, document_id], + ) + await self.conn.commit() + docs = [] + for document_id in document_ids: + doc = await self.get_document(document_id) + if doc: + docs.append(doc) + return docs + + async def list_document_runs(self, document_id: str) -> list[dict[str, Any]]: + async with self.conn.execute( + """ + SELECT r.*, rd.status AS document_status, rd.pages_pass, rd.pages_warn, rd.pages_fail + FROM run_documents rd + JOIN runs r ON r.id = rd.run_id + WHERE rd.document_id = ? + ORDER BY r.created_at DESC + """, + (document_id,), + ) as cur: + rows = await cur.fetchall() + return [_row_to_dict(r) for r in rows] # type: ignore[misc] + async def get_document_by_sha(self, file_sha256: str) -> Optional[dict[str, Any]]: async with self.conn.execute( "SELECT * FROM documents WHERE file_sha256 = ?", (file_sha256,) @@ -328,7 +551,9 @@ async def link_run_document( ) await self.conn.commit() - async def update_run_document(self, run_id: str, document_id: str, **fields: Any) -> None: + async def update_run_document( + self, run_id: str, document_id: str, **fields: Any + ) -> None: if not fields: return cols = ", ".join(f"{k} = ?" for k in fields) @@ -392,11 +617,12 @@ async def upsert_page( "extraction_mode": None, "extraction_attempt": None, "dpi_used": None, + "quality_flags": None, "has_embedded_text": None, "is_image_only": None, } defaults.update(fields) - for list_key in ("validation_issues", "detected_languages"): + for list_key in ("validation_issues", "detected_languages", "quality_flags"): v = defaults.get(list_key) if isinstance(v, list): defaults[list_key] = json.dumps(v, ensure_ascii=False) @@ -431,11 +657,18 @@ async def list_pages(self, run_id: str, document_id: str) -> list[dict[str, Any] # ---------- events ---------- - async def append_event(self, run_id: str, event_type: str, payload: dict[str, Any]) -> int: + async def append_event( + self, run_id: str, event_type: str, payload: dict[str, Any] + ) -> int: now = _now() cur = await self.conn.execute( "INSERT INTO run_events (run_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)", - (run_id, event_type, json.dumps(payload, ensure_ascii=False, default=str), now), + ( + run_id, + event_type, + json.dumps(payload, ensure_ascii=False, default=str), + now, + ), ) await self.conn.commit() last_id = cur.lastrowid or 0 diff --git a/ocr_pipeline/services/document_catalog.py b/ocr_pipeline/services/document_catalog.py new file mode 100644 index 0000000..6cc94a3 --- /dev/null +++ b/ocr_pipeline/services/document_catalog.py @@ -0,0 +1,36 @@ +import asyncio +import hashlib +from pathlib import Path + +import fitz + +from ocr_pipeline.services.db import Database + + +def _hash_file_sync(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def _count_pages_sync(path: Path) -> int: + with fitz.open(str(path)) as doc: + return len(doc) + + +async def catalog_pdf(db: Database, path: Path, *, filename: str | None = None) -> dict: + sha = await asyncio.to_thread(_hash_file_sync, path) + try: + page_count = await asyncio.to_thread(_count_pages_sync, path) + except Exception: + page_count = 0 + return await db.upsert_document( + sha[:16], + filename=filename or path.name, + source_path=str(path), + file_sha256=sha, + file_size_bytes=(await asyncio.to_thread(path.stat)).st_size, + total_pages=page_count or None, + ) diff --git a/ocr_pipeline/services/hf_publisher.py b/ocr_pipeline/services/hf_publisher.py index 5be5609..f172a47 100644 --- a/ocr_pipeline/services/hf_publisher.py +++ b/ocr_pipeline/services/hf_publisher.py @@ -47,7 +47,7 @@ def _build_dataset_card( f"# OpenCR Dataset — Run `{run['id']}`", "", f"Generated by **[OpenCR](https://cdli.ai)** (cdli.ai) using " - f"`{run.get('model_used', 'DeepSeek-OCR')}` on " + f"`{run.get('model_used', 'DeepSeek-OCR-2')}` on " f"{run.get('completed_at') or run.get('created_at')}.", "", "## Summary", diff --git a/ocr_pipeline/services/local_ocr_engine.py b/ocr_pipeline/services/local_ocr_engine.py deleted file mode 100644 index 2305b1b..0000000 --- a/ocr_pipeline/services/local_ocr_engine.py +++ /dev/null @@ -1,198 +0,0 @@ -"""In-process OCR engine using HuggingFace `transformers`. - -Used by the `local` model backend so OpenCR runs on Apple Silicon, CPU-only -boxes, and any environment without a GPU model server. Trades throughput for -zero-deployment-friction: a single Python process boots the web UI and serves -inference. - -Caveats: -- DeepSeek-OCR is ~3B params + a vision tower; on M-series Macs expect - 5–30 s/page, on CPU much slower. Production batch jobs should use vLLM. -- The model loads lazily on the first extraction request so server startup - stays fast. -- `transformers` and `torch` are intentionally optional — they only get - imported when this module is instantiated. Install them via - `requirements-local.txt`. -""" -from __future__ import annotations - -import asyncio -import logging -import tempfile -from pathlib import Path -from typing import Any - -from PIL import Image - -from ocr_pipeline.config import settings - -logger = logging.getLogger("ocr_pipeline.local_engine") - -# Maps the same `mode` strings the remote backend uses to the prompt strings -# DeepSeek-OCR's reference inference helper expects. -LOCAL_PROMPTS = { - "markdown": "\n<|grounding|>Convert the document to markdown.", - "free_ocr": "\nFree OCR.", - "figure": "\nParse the figure.", -} - - -def _resolve_device(requested: str) -> str: - if requested != "auto": - return requested - try: - import torch - except ImportError as exc: - raise RuntimeError( - "MODEL_BACKEND=local requires `torch`. Install with: " - "pip install -r requirements-local.txt" - ) from exc - if torch.cuda.is_available(): - return "cuda" - if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available(): - return "mps" - return "cpu" - - -def _resolve_dtype(requested: str, device: str): - import torch - if requested == "float16": - return torch.float16 - if requested == "bfloat16": - return torch.bfloat16 - if requested == "float32": - return torch.float32 - # auto — bf16 on CUDA, fp16 on MPS, fp32 on CPU (mps doesn't love bf16, cpu hates fp16) - if device == "cuda": - return torch.bfloat16 - if device == "mps": - return torch.float16 - return torch.float32 - - -class LocalOCREngine: - """In-process DeepSeek-OCR inference via `transformers`. - - Only one instance is loaded per process; concurrent requests serialize on - the same model object via an asyncio lock since GPU/MPS memory makes - parallel calls impractical at this size. - """ - - _instance: "LocalOCREngine | None" = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self, model_name: str | None = None) -> None: - if getattr(self, "_initialized", False): - return - self.model_name = model_name or settings.model_name - self._model: Any = None - self._tokenizer: Any = None - self._device: str | None = None - self._dtype: Any = None - self._lock = asyncio.Lock() - self._initialized = True - - async def _ensure_loaded(self) -> None: - if self._model is not None: - return - async with self._lock: - if self._model is not None: - return - await asyncio.to_thread(self._load_blocking) - - def _load_blocking(self) -> None: - try: - import torch - from transformers import AutoModel, AutoTokenizer - except ImportError as exc: - raise RuntimeError( - "MODEL_BACKEND=local requires `transformers` and `torch`. " - "Install with: pip install -r requirements-local.txt" - ) from exc - - device = _resolve_device(settings.local_device) - dtype = _resolve_dtype(settings.local_dtype, device) - logger.info( - "Loading %s on %s (%s). First boot downloads ~6 GB.", - self.model_name, device, dtype, - ) - - # eager attention works everywhere; flash-attn-2 is CUDA-only and would - # break MPS/CPU loads. - attn_impl = "flash_attention_2" if device == "cuda" else "eager" - - tokenizer = AutoTokenizer.from_pretrained( - self.model_name, trust_remote_code=True, - cache_dir=str(settings.local_model_cache), - ) - model = AutoModel.from_pretrained( - self.model_name, - trust_remote_code=True, - use_safetensors=True, - attn_implementation=attn_impl, - cache_dir=str(settings.local_model_cache), - ) - model = model.eval().to(dtype) - if device != "cpu": - model = model.to(device) - - self._tokenizer = tokenizer - self._model = model - self._device = device - self._dtype = dtype - logger.info("Local OCR engine ready on %s", device) - - async def extract_page( - self, - image: Image.Image, - mode: str = "markdown", - ngram_size: int | None = None, # noqa: ARG002 (vLLM-only knob) - window_size: int | None = None, # noqa: ARG002 - ) -> str: - await self._ensure_loaded() - prompt = LOCAL_PROMPTS.get(mode, LOCAL_PROMPTS["markdown"]) - - async with self._lock: - return await asyncio.to_thread(self._infer_blocking, image, prompt) - - def _infer_blocking(self, image: Image.Image, prompt: str) -> str: - # DeepSeek-OCR's `model.infer` (registered via trust_remote_code) expects a - # path on disk for the image and writes its result alongside it. We feed it - # a temp dir so nothing leaks into the output volume. - with tempfile.TemporaryDirectory() as tmpdir: - tmp = Path(tmpdir) - image_path = tmp / "page.png" - image.save(image_path, format="PNG") - - try: - result = self._model.infer( - self._tokenizer, - prompt=prompt, - image_file=str(image_path), - output_path=str(tmp), - base_size=1024, - image_size=640, - crop_mode=True, - save_results=False, - test_compress=False, - ) - except TypeError: - # Older variants of the remote-code helper had a slightly - # different signature; fall back to the minimal kwargs. - result = self._model.infer( - self._tokenizer, - prompt=prompt, - image_file=str(image_path), - output_path=str(tmp), - ) - - if isinstance(result, str): - return result - # Some forks return a dict / list; prefer a 'text' key, else stringify. - if isinstance(result, dict) and "text" in result: - return str(result["text"]) - return str(result) if result is not None else "" diff --git a/ocr_pipeline/services/metadata_collector.py b/ocr_pipeline/services/metadata_collector.py index 859dd14..5c8e30c 100644 --- a/ocr_pipeline/services/metadata_collector.py +++ b/ocr_pipeline/services/metadata_collector.py @@ -3,7 +3,7 @@ import fitz import tiktoken -from ocr_pipeline.models.metadata import PageMetadata, DocumentMetadata +from ocr_pipeline.models.metadata import PageMetadata from ocr_pipeline.services.script_detector import ScriptAnalysis from ocr_pipeline.services.output_validator import ValidationResult from ocr_pipeline.services.page_analyzer import PageProfile @@ -12,7 +12,7 @@ class MetadataCollector: """Builds metadata during extraction.""" - def __init__(self, model_name: str = "deepseek-ai/DeepSeek-OCR"): + def __init__(self, model_name: str = "deepseek-ai/DeepSeek-OCR-2"): self.model_name = model_name try: self._tokenizer = tiktoken.get_encoding("cl100k_base") @@ -23,7 +23,7 @@ def count_tokens(self, text: str) -> int: """Count tokens using tiktoken cl100k_base encoding.""" if self._tokenizer: return len(self._tokenizer.encode(text)) - return int(len(text.split()) * 1.3)# Fallback: rough estimate + return int(len(text.split()) * 1.3) # Fallback: rough estimate def build_page_metadata( self, @@ -38,7 +38,7 @@ def build_page_metadata( page_profile: PageProfile, ) -> PageMetadata: words = text.split() - lines = [l for l in text.split("\n") if l.strip()] + lines = [line for line in text.split("\n") if line.strip()] token_count = self.count_tokens(text) return PageMetadata( @@ -64,11 +64,13 @@ def build_page_metadata( validation_issues=validation_result.issues, repetition_ratio=validation_result.metrics.get("repetition_ratio", 0), has_embedded_text=page_profile.has_embedded_text, - is_image_only=not page_profile.has_embedded_text and page_profile.has_images, + is_image_only=not page_profile.has_embedded_text + and page_profile.has_images, page_width=page_profile.width, page_height=page_profile.height, image_count=page_profile.image_count, estimated_complexity=page_profile.estimated_complexity, + quality_flags=validation_result.metrics.get("quality_flags", []), ) def extract_pdf_metadata(self, pdf_path: Path) -> dict: diff --git a/ocr_pipeline/services/ocr_engine.py b/ocr_pipeline/services/ocr_engine.py index 494334d..3afa010 100644 --- a/ocr_pipeline/services/ocr_engine.py +++ b/ocr_pipeline/services/ocr_engine.py @@ -1,15 +1,9 @@ """OCR engine abstraction. -Two backends ship today: - -- `RemoteOCREngine` calls any OpenAI-compatible `/v1/chat/completions` endpoint - (vLLM serving DeepSeek-OCR is the production target; remote endpoints like - OpenRouter or a self-hosted shim work the same way). -- `LocalOCREngine` runs DeepSeek-OCR in-process via `transformers`. Slow but - needs no GPU server — used for Apple Silicon / CPU development. - -Pick a backend with the `MODEL_BACKEND` env var. `OCREngine()` returns the -right instance based on `settings.model_backend`. +OpenCR is GPU-first: `RemoteOCREngine` calls an OpenAI-compatible +`/v1/chat/completions` endpoint, with vLLM serving DeepSeek-OCR-2 as the +default production target. `MODEL_BACKEND=remote` can point the same client at +another compatible GPU service. """ from __future__ import annotations @@ -108,13 +102,5 @@ async def extract_page( def OCREngine(*args, **kwargs) -> _OCREngineProtocol: - """Factory. Returns the engine matching `settings.model_backend`. - - Existing callers do `OCREngine()` so we keep this name as a callable. - """ - if settings.is_local_backend: - # Imported lazily so projects without the local extras (transformers, - # torch) can still use the remote backend without import errors. - from ocr_pipeline.services.local_ocr_engine import LocalOCREngine - return LocalOCREngine(*args, **kwargs) + """Factory kept for existing callers.""" return RemoteOCREngine(*args, **kwargs) diff --git a/ocr_pipeline/services/ocr_pair_exporter.py b/ocr_pipeline/services/ocr_pair_exporter.py new file mode 100644 index 0000000..17e3a13 --- /dev/null +++ b/ocr_pipeline/services/ocr_pair_exporter.py @@ -0,0 +1,246 @@ +import hashlib +import json +import shutil +import tempfile +import zipfile +from dataclasses import dataclass +from pathlib import Path + +from ocr_pipeline.config import settings +from ocr_pipeline.services.dataset_exporter import PROJECT_METADATA +from ocr_pipeline.services.output_writer import PAGE_BREAK +from ocr_pipeline.services.pdf_renderer import PDFRenderer + + +@dataclass(frozen=True) +class OCRPairExportResult: + export_dir: Path + bundle: Path + pages_count: int + + +class OCRPairExporter: + """Builds image/text pairs for OCR model fine-tuning.""" + + def __init__(self, export_dir: Path, renderer: PDFRenderer | None = None): + self.export_dir = export_dir + self.renderer = renderer or PDFRenderer() + + @staticmethod + def _split_pages(text: str, total_pages: int) -> list[str]: + pages = text.split(PAGE_BREAK) if text else [""] + if len(pages) < total_pages: + pages.extend([""] * (total_pages - len(pages))) + return pages[:total_pages] + + @staticmethod + def _split_name(stable_key: str) -> str: + bucket = ( + int(hashlib.sha256(stable_key.encode("utf-8")).hexdigest()[:8], 16) % 100 + ) + if bucket < 90: + return "train" + if bucket < 95: + return "validation" + return "test" + + @staticmethod + def _json_list(raw: str | None) -> list[str]: + if isinstance(raw, list): + return [str(item) for item in raw] + if not raw: + return [] + try: + value = json.loads(raw) + except json.JSONDecodeError: + return [part.strip() for part in raw.split(",") if part.strip()] + return [str(item) for item in value] if isinstance(value, list) else [] + + @staticmethod + def _language_list(value) -> list[str]: + if isinstance(value, list): + return [str(item).strip() for item in value if str(item).strip()] + if not value: + return [] + return [part.strip() for part in str(value).split(",") if part.strip()] + + def export_run( + self, + *, + run: dict, + documents: list[dict], + pages_by_document: dict[str, list[dict]], + catalog_by_document: dict[str, dict], + document_ids: set[str] | None = None, + dpi: int = 160, + text_mode: str = "clean", + ) -> OCRPairExportResult: + if text_mode not in {"clean", "raw"}: + raise ValueError("text_mode must be clean or raw") + + tmp_parent = self.export_dir.parent + tmp_parent.mkdir(parents=True, exist_ok=True) + tmp_path = Path( + tempfile.mkdtemp(prefix=f"{self.export_dir.name}.", dir=tmp_parent) + ) + images_dir = tmp_path / "images" + images_dir.mkdir(parents=True, exist_ok=True) + + split_rows: dict[str, list[dict]] = {"train": [], "validation": [], "test": []} + pages_count = 0 + + for doc in documents: + if doc.get("status") != "completed": + continue + document_id = doc["document_id"] + if document_ids is not None and document_id not in document_ids: + continue + catalog = catalog_by_document.get(document_id, {}) + pdf_path_str = doc.get("artifact_source_pdf") or doc.get( + "document_source_path" + ) + if not pdf_path_str: + continue + pdf_path = Path(pdf_path_str) + if not pdf_path.exists(): + continue + + total_pages = int( + doc.get("total_pages") + or len(pages_by_document.get(document_id, [])) + or 0 + ) + raw_pages = self._split_pages( + self._read_text(doc.get("artifact_raw_txt")), total_pages + ) + clean_pages = self._split_pages( + self._read_text(doc.get("artifact_clean_txt")), total_pages + ) + page_rows = { + row["page_num"]: row for row in pages_by_document.get(document_id, []) + } + + for page_num in range(1, total_pages + 1): + page_id = f"{document_id}_page_{page_num:04d}" + image_rel = f"images/{page_id}.png" + image = self.renderer.render_page(pdf_path, page_num, dpi) + image.save(images_dir / f"{page_id}.png", format="PNG") + + page_meta = page_rows.get(page_num, {}) + raw_text = raw_pages[page_num - 1] + clean_text = clean_pages[page_num - 1] + text = clean_text if text_mode == "clean" else raw_text + split_key = doc.get("file_sha256") or document_id + split = self._split_name(split_key) + image_path = images_dir / f"{page_id}.png" + image_hash = hashlib.sha256(image_path.read_bytes()).hexdigest() + + split_rows[split].append( + { + "id": page_id, + "run_id": run["id"], + "image": image_rel, + "text": text, + "raw_text": raw_text, + "clean_text": clean_text, + "text_mode": text_mode, + "label_source": "cleaned_machine_ocr" + if text_mode == "clean" + else "machine_ocr", + "review_status": "unreviewed", + "document_id": document_id, + "document_name": doc.get("document_filename"), + "page": page_num, + "group_path": catalog.get("group_path"), + "title": catalog.get("display_title") + or catalog.get("pdf_title"), + "author": catalog.get("author") or catalog.get("pdf_author"), + "work": catalog.get("work"), + "book": catalog.get("book"), + "document_date_label": catalog.get("document_date_label"), + "document_date_precision": catalog.get( + "document_date_precision" + ), + "language": self._language_list(catalog.get("language")) + or self._json_list(page_meta.get("detected_languages")), + "script": catalog.get("script") + or page_meta.get("primary_script"), + "ocr_status": page_meta.get("status"), + "validation_issues": self._json_list( + page_meta.get("validation_issues") + ), + "extraction_mode": page_meta.get("extraction_mode"), + "extraction_attempt": page_meta.get("extraction_attempt"), + "dpi_used": page_meta.get("dpi_used"), + "render_dpi": dpi, + "image_width": image.width, + "image_height": image.height, + "image_sha256": image_hash, + "source_file": doc.get("document_filename"), + "source_pdf_sha256": doc.get("file_sha256"), + "ocr_model": run.get("model_used"), + "pipeline_version": run.get("pipeline_version"), + } + ) + pages_count += 1 + + self._write_jsonl(tmp_path, split_rows) + self._write_manifest(tmp_path, run, pages_count, dpi, text_mode) + if self.export_dir.exists(): + shutil.rmtree(self.export_dir) + tmp_path.replace(self.export_dir) + bundle = self.export_dir.with_suffix(".zip") + if bundle.exists(): + bundle.unlink() + with zipfile.ZipFile(bundle, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for path in sorted(self.export_dir.rglob("*")): + if path.is_file(): + archive.write(path, arcname=path.relative_to(self.export_dir)) + return OCRPairExportResult(self.export_dir, bundle, pages_count) + + @staticmethod + def _read_text(path_str: str | None) -> str: + if not path_str: + return "" + path = Path(path_str) + return path.read_text(encoding="utf-8") if path.exists() else "" + + def _write_jsonl(self, export_dir: Path, split_rows: dict[str, list[dict]]) -> None: + for split, rows in split_rows.items(): + path = export_dir / f"{split}.jsonl" + path.write_text( + "".join(json.dumps(row, ensure_ascii=False) + "\n" for row in rows), + encoding="utf-8", + ) + + def _write_manifest( + self, export_dir: Path, run: dict, pages_count: int, dpi: int, text_mode: str + ) -> None: + payload = { + "export_type": "ocr_pairs", + "run_id": run["id"], + "created_by": PROJECT_METADATA, + "pages_count": pages_count, + "image_format": "png", + "dpi": dpi, + "text_mode": text_mode, + "dataset_purpose": "ocr_audit", + "label_source": "cleaned_machine_ocr" + if text_mode == "clean" + else "machine_ocr", + "review_status": "unreviewed", + "schema_version": 1, + "split_strategy": { + "method": "sha256_bucket", + "key": "source_pdf_sha256", + "ratios": {"train": 0.90, "validation": 0.05, "test": 0.05}, + }, + "ocr_model": run.get("model_used") or settings.model_name, + "pipeline_version": run.get("pipeline_version") + or settings.pipeline_version, + "splits": ["train", "validation", "test"], + } + (export_dir / "manifest.json").write_text( + json.dumps(payload, indent=2, ensure_ascii=False), + encoding="utf-8", + ) diff --git a/ocr_pipeline/services/output_validator.py b/ocr_pipeline/services/output_validator.py index dd0b1e1..9c4f662 100644 --- a/ocr_pipeline/services/output_validator.py +++ b/ocr_pipeline/services/output_validator.py @@ -25,15 +25,16 @@ class OutputValidator: """ # --- Thresholds — tune based on your corpus --- # - MAX_REPETITION_RATIO = 0.35 # If >35% of lines are duplicates -> WARN - MAX_REPETITION_RATIO_FAIL = 0.60 # If >60% -> FAIL - MIN_UNIQUE_CHARS = 10 # Minimum unique characters for non-empty - MAX_SINGLE_CHAR_RATIO = 0.50 # If >50% of text is one character -> FAIL - MIN_TEXT_LENGTH = 20 # Below this = probably blank page - MAX_CONSECUTIVE_DUPES = 5 # 5+ identical consecutive lines = WARN + MAX_REPETITION_RATIO = 0.35 # If >35% of lines are duplicates -> WARN + MAX_REPETITION_RATIO_FAIL = 0.60 # If >60% -> FAIL + MIN_UNIQUE_CHARS = 10 # Minimum unique characters for non-empty + MAX_SINGLE_CHAR_RATIO = 0.50 # If >50% of text is one character -> FAIL + MIN_TEXT_LENGTH = 20 # Below this = probably blank page + MAX_CONSECUTIVE_DUPES = 5 # 5+ identical consecutive lines = WARN def validate(self, text: str, page_num: int) -> ValidationResult: issues: list[str] = [] + quality_flags: list[str] = [] metrics: dict = {} # Check 1: Empty / near-empty output @@ -46,7 +47,7 @@ def validate(self, text: str, page_num: int) -> ValidationResult: ) # Check 2: Line-level repetition - lines = [l.strip() for l in stripped.split("\n") if l.strip()] + lines = [line.strip() for line in stripped.split("\n") if line.strip()] single_char_ratio = 0.0 if lines: @@ -110,6 +111,27 @@ def validate(self, text: str, page_num: int) -> ValidationResult: issues.append(f"Page {page_num}: model artifact detected") break + # Check 6: Corpus quality signals. These are usually still usable OCR, + # but researchers need them visible before treating text as ground truth. + hyphen_breaks = re.findall( + r"(?iu)[^\W\d_]{2,}-\s*\n\s*[^\W\d_]{2,}", stripped + ) + inline_hyphen_breaks = re.findall( + r"(?iu)[^\W\d_]{2,}-\s{1,3}[^\W\d_]{2,}", stripped + ) + hyphen_break_count = len(hyphen_breaks) + len(inline_hyphen_breaks) + if hyphen_break_count: + quality_flags.append("line_hyphenation") + metrics["line_hyphenation_count"] = hyphen_break_count + issues.append( + f"Page {page_num}: line-break hyphenation remains " + f"({hyphen_break_count} segment(s))" + ) + + if re.search(r"]*>", stripped, re.I): + quality_flags.append("markup_leak") + issues.append(f"Page {page_num}: markup tag leaked into clean text") + # Determine overall status if any("extreme repetition" in i or "model artifact" in i for i in issues): status = ValidationStatus.FAIL @@ -120,5 +142,6 @@ def validate(self, text: str, page_num: int) -> ValidationResult: else: status = ValidationStatus.PASS + metrics["quality_flags"] = quality_flags metrics["text_length"] = len(stripped) return ValidationResult(status=status, issues=issues, metrics=metrics) diff --git a/ocr_pipeline/services/run_orchestrator.py b/ocr_pipeline/services/run_orchestrator.py index 5561e04..f023bbf 100644 --- a/ocr_pipeline/services/run_orchestrator.py +++ b/ocr_pipeline/services/run_orchestrator.py @@ -74,6 +74,7 @@ async def _stage_document(self, file_path: Path) -> StagedDocument: document_id = sha[:16] canonical = self.storage.source_pdf_path(document_id) existing = await self.db.get_document_by_sha(sha) + filename = existing["filename"] if existing else file_path.name if not canonical.exists(): self.storage.sources_dir().mkdir(parents=True, exist_ok=True) @@ -87,7 +88,7 @@ async def _stage_document(self, file_path: Path) -> StagedDocument: await self.db.upsert_document( document_id, - filename=file_path.name, + filename=filename, source_path=str(canonical), file_sha256=sha, file_size_bytes=size, @@ -97,7 +98,7 @@ async def _stage_document(self, file_path: Path) -> StagedDocument: return StagedDocument( document_id=document_id, file_sha256=sha, - filename=file_path.name, + filename=filename, source_path=canonical, deduped=existing is not None, estimated_pages=page_count, @@ -133,12 +134,23 @@ async def create_run( for s in staged: await self.db.link_run_document(run_id, s.document_id, status="pending") + logger.info( + "run=%s queued docs=%d pages=%d name=%s", + run_id, + len(staged), + pages_total, + name or "-", + ) observability.job_created() - return CreateRunResult(run_id=run_id, documents=staged, pages_total_estimate=pages_total) + return CreateRunResult( + run_id=run_id, documents=staged, pages_total_estimate=pages_total + ) # ---------- execution ---------- - def start(self, result: CreateRunResult, *, strip_refs: bool, export_parquet: bool) -> asyncio.Task: + def start( + self, result: CreateRunResult, *, strip_refs: bool, export_parquet: bool + ) -> asyncio.Task: task = asyncio.create_task( self._run(result, strip_refs=strip_refs, export_parquet=export_parquet) ) @@ -146,6 +158,37 @@ def start(self, result: CreateRunResult, *, strip_refs: bool, export_parquet: bo task.add_done_callback(self._tasks.discard) return task + async def retry_incomplete_run(self, run_id: str) -> CreateRunResult: + run = await self.db.get_run(run_id) + if not run: + raise KeyError(run_id) + if run["status"] != "failed": + raise ValueError("Only failed runs can be retried") + + documents = await self.db.list_run_documents(run_id) + retry_paths = [ + doc["document_source_path"] + for doc in documents + if doc["status"] != "completed" and doc.get("document_source_path") + ] + if not retry_paths: + raise ValueError("No incomplete documents to retry") + + name = run.get("name") or run_id + logger.info("run=%s retry queued incomplete_docs=%d", run_id, len(retry_paths)) + result = await self.create_run( + retry_paths, + name=f"{name} retry", + strip_refs=bool(run.get("strip_refs")), + export_parquet=bool(run.get("export_parquet")), + ) + self.start( + result, + strip_refs=bool(run.get("strip_refs")), + export_parquet=bool(run.get("export_parquet")), + ) + return result + async def _run( self, result: CreateRunResult, @@ -155,12 +198,19 @@ async def _run( ) -> None: run_id = result.run_id started_at = _now() - await self.db.update_run(run_id, status="processing", stage="ocr", started_at=started_at) - await self._emit(run_id, "run_started", {"started_at": started_at}) - pages_total = result.pages_total_estimate pages_completed = 0 documents_meta: list = [] + await self.db.update_run( + run_id, status="processing", stage="ocr", started_at=started_at + ) + logger.info( + "run=%s started docs=%d pages=%d", + run_id, + len(result.documents), + pages_total, + ) + await self._emit(run_id, "run_started", {"started_at": started_at}) async def page_event(event: dict) -> None: nonlocal pages_completed @@ -169,69 +219,138 @@ async def page_event(event: dict) -> None: pages_completed += 1 progress = (pages_completed / pages_total) if pages_total else 0 await self.db.update_run( - run_id, pages_completed=pages_completed, progress=min(0.99, progress), + run_id, + pages_completed=pages_completed, + progress=min(0.99, progress), ) observability.page_completed( processing_time_ms=event.get("processing_time_ms", 0.0), token_count=event.get("token_count", 0), validation_status=event.get("validation_status", "pass"), ) + logger.info( + "run=%s page=%d/%d doc=%s status=%s time=%.1fms", + run_id, + pages_completed, + pages_total, + event.get("document"), + event.get("validation_status"), + event.get("processing_time_ms", 0.0), + ) elif etype == "page_retry": observability.page_retry() + logger.info( + "run=%s page=%s retry attempt=%s strategy=%s reason=%s", + run_id, + event.get("page"), + event.get("attempt"), + event.get("new_strategy"), + event.get("reason"), + ) await self._emit(run_id, etype, event) try: - for staged in result.documents: - paths = self.storage.artifact_paths(run_id, staged.document_id, staged.filename) - processor = BatchProcessor(self.db, event_callback=page_event, strip_refs=strip_refs) + for index, staged in enumerate(result.documents, start=1): + paths = self.storage.artifact_paths( + run_id, staged.document_id, staged.filename + ) + logger.info( + "run=%s doc=%d/%d started %s", + run_id, + index, + len(result.documents), + staged.filename, + ) + processor = BatchProcessor( + self.db, event_callback=page_event, strip_refs=strip_refs + ) doc_meta = await processor.process_document( staged.source_path, run_id=run_id, document_id=staged.document_id, file_sha256=staged.file_sha256, + filename=staged.filename, artifact_paths=paths, ) documents_meta.append((staged.document_id, paths, doc_meta)) observability.document_completed() - await self.db.update_run(run_id, documents_completed=len(documents_meta)) + await self.db.update_run( + run_id, documents_completed=len(documents_meta) + ) + logger.info( + "run=%s doc=%d/%d completed %s pass=%d warn=%d fail=%d", + run_id, + index, + len(result.documents), + staged.filename, + doc_meta.pages_pass, + doc_meta.pages_warn, + doc_meta.pages_fail, + ) - dataset_bundle = await self._maybe_export(run_id, documents_meta, export_parquet) + dataset_bundle = await self._maybe_export( + run_id, documents_meta, export_parquet + ) completed_at = _now() await self.db.update_run( run_id, - status="completed", stage="completed", progress=1.0, + status="completed", + stage="completed", + progress=1.0, pages_completed=pages_total, dataset_bundle=dataset_bundle, completed_at=completed_at, ) + logger.info("run=%s completed bundle=%s", run_id, dataset_bundle or "-") observability.job_completed() - await self._emit(run_id, "run_complete", { - "completed_at": completed_at, - "documents_total": len(result.documents), - "documents_completed": len(documents_meta), - "pages_total": pages_total, - "dataset_bundle": dataset_bundle, - **self._aggregate_totals(documents_meta), - }) + await self._emit( + run_id, + "run_complete", + { + "completed_at": completed_at, + "documents_total": len(result.documents), + "documents_completed": len(documents_meta), + "pages_total": pages_total, + "dataset_bundle": dataset_bundle, + **self._aggregate_totals(documents_meta), + }, + ) except Exception as exc: logger.exception("Run %s failed", run_id) + await self.db.fail_incomplete_run_documents(run_id) await self.db.update_run( - run_id, status="failed", stage="failed", error=str(exc), completed_at=_now(), + run_id, + status="failed", + stage="failed", + error=str(exc), + completed_at=_now(), ) observability.job_failed() await self._emit(run_id, "run_failed", {"error": str(exc)}) - async def _maybe_export(self, run_id: str, documents_meta: list, export_parquet: bool) -> str | None: + async def _maybe_export( + self, run_id: str, documents_meta: list, export_parquet: bool + ) -> str | None: if not (export_parquet and documents_meta): return None await self.db.update_run(run_id, stage="exporting") + logger.info("run=%s exporting dataset docs=%d", run_id, len(documents_meta)) await self._emit(run_id, "dataset_export_started", {}) - exports = [DocumentExport(metadata=m, document_id=did, artifact_paths=p) - for (did, p, m) in documents_meta] + exports = [] + for did, paths, meta in documents_meta: + exports.append( + DocumentExport( + metadata=meta, + document_id=did, + artifact_paths=paths, + catalog_metadata=await self.db.get_document(did) or {}, + ) + ) result = await asyncio.to_thread( DatasetExporter(self.storage.dataset_dir(run_id)).export_run, - run_id, exports, + run_id, + exports, ) return str(result.bundle) @@ -242,7 +361,9 @@ def _aggregate_totals(documents_meta) -> dict: "pages_warn": sum(m.pages_warn for _, _, m in documents_meta), "pages_fail": sum(m.pages_fail for _, _, m in documents_meta), "pages_empty": sum(m.pages_empty for _, _, m in documents_meta), - "total_time_ms": round(sum(m.total_processing_time_ms for _, _, m in documents_meta), 1), + "total_time_ms": round( + sum(m.total_processing_time_ms for _, _, m in documents_meta), 1 + ), } # ---------- events ---------- @@ -262,7 +383,9 @@ async def _emit(self, run_id: str, event_type: str, payload: dict) -> None: except asyncio.QueueFull: logger.warning("Listener queue full for run %s; dropping event", run_id) - async def subscribe(self, run_id: str, after_event_id: int = 0) -> AsyncIterator[dict]: + async def subscribe( + self, run_id: str, after_event_id: int = 0 + ) -> AsyncIterator[dict]: for ev in await self.db.list_events(run_id, after_id=after_event_id): yield {**ev["payload"], "event_id": ev["id"]} diff --git a/ocr_pipeline/services/script_detector.py b/ocr_pipeline/services/script_detector.py index 051fd3c..633c8fa 100644 --- a/ocr_pipeline/services/script_detector.py +++ b/ocr_pipeline/services/script_detector.py @@ -1,12 +1,11 @@ import unicodedata from dataclasses import dataclass from enum import Enum -from collections import Counter class ScriptDirection(str, Enum): - LTR = "ltr" # Latin, Cyrillic, etc. - RTL = "rtl" # Arabic, Hebrew, etc. + LTR = "ltr" # Latin, Cyrillic, etc. + RTL = "rtl" # Arabic, Hebrew, etc. MIXED = "mixed" # Both present significantly UNDETERMINED = "undetermined" @@ -23,15 +22,15 @@ class ScriptFamily(str, Enum): class ScriptAnalysis: direction: ScriptDirection primary_script: ScriptFamily - ltr_ratio: float # 0.0 to 1.0 - rtl_ratio: float # 0.0 to 1.0 + ltr_ratio: float # 0.0 to 1.0 + rtl_ratio: float # 0.0 to 1.0 arabic_char_count: int latin_char_count: int - extended_latin_count: int # Characters with diacritics beyond basic ASCII + extended_latin_count: int # Characters with diacritics beyond basic ASCII has_diacritics: bool - sample_rtl_chars: str # First few RTL characters found - sample_ltr_chars: str # First few LTR characters found - detected_languages: list[str] # Best-guess language hints + sample_rtl_chars: str # First few RTL characters found + sample_ltr_chars: str # First few LTR characters found + detected_languages: list[str] # Best-guess language hints class ScriptDetector: diff --git a/ocr_pipeline/services/startup.py b/ocr_pipeline/services/startup.py index cc42c34..527e2e2 100644 --- a/ocr_pipeline/services/startup.py +++ b/ocr_pipeline/services/startup.py @@ -34,26 +34,13 @@ async def wait_for_model_server() -> bool: """ Block until the model server is healthy and can list its model. Called once at pipeline startup. Returns True if ready, False if timed out. - - For the in-process `local` backend there is nothing to wait for — the model - loads lazily on the first request — so we mark ready immediately. """ - if settings.is_local_backend: - model_readiness.ready = True - model_readiness.model_name = settings.model_name - model_readiness.error = None - model_readiness.checked_at = time.time() - logger.info("Local backend selected; model will load on first request.") - return True - base = settings.model_server_url timeout = settings.model_ready_timeout interval = settings.model_ready_interval deadline = time.monotonic() + timeout - logger.info( - "Waiting for model server at %s (timeout %ds)...", base, timeout - ) + logger.info("Waiting for model server at %s (timeout %ds)...", base, timeout) async with httpx.AsyncClient(timeout=10) as client: while time.monotonic() < deadline: @@ -61,12 +48,16 @@ async def wait_for_model_server() -> bool: resp = await client.get(f"{base}/health") if resp.status_code != 200: model_readiness.error = f"health returned {resp.status_code}" - logger.info("Model server not healthy yet (%s)", model_readiness.error) + logger.info( + "Model server not healthy yet (%s)", model_readiness.error + ) await asyncio.sleep(interval) continue - except (httpx.ConnectError, httpx.ReadTimeout, httpx.ConnectTimeout) as exc: + except httpx.HTTPError as exc: model_readiness.error = f"connection failed ({type(exc).__name__})" - logger.info("Model server not reachable yet (%s)", model_readiness.error) + logger.info( + "Model server not reachable yet (%s)", model_readiness.error + ) await asyncio.sleep(interval) continue diff --git a/ocr_pipeline/services/text_bundle_exporter.py b/ocr_pipeline/services/text_bundle_exporter.py new file mode 100644 index 0000000..be98ead --- /dev/null +++ b/ocr_pipeline/services/text_bundle_exporter.py @@ -0,0 +1,281 @@ +import hashlib +import json +import re +import shutil +import tempfile +import zipfile +from dataclasses import dataclass +from pathlib import Path + +from ocr_pipeline.config import settings +from ocr_pipeline.services.dataset_exporter import PROJECT_METADATA +from ocr_pipeline.services.output_writer import PAGE_BREAK +from ocr_pipeline.services.text_normalizer import TextNormalizer + + +@dataclass(frozen=True) +class TextBundleExportResult: + export_dir: Path + bundle: Path + documents_count: int + pages_count: int + + +class TextBundleExporter: + """Builds plain-text exports for corpus and NLP work.""" + + def __init__(self, export_dir: Path): + self.export_dir = export_dir + self.normalizer = TextNormalizer() + + @staticmethod + def _read_text(path_str: str | None) -> str: + if not path_str: + return "" + path = Path(path_str) + return path.read_text(encoding="utf-8") if path.exists() else "" + + @staticmethod + def _split_pages(text: str, total_pages: int) -> list[str]: + pages = text.split(PAGE_BREAK) if text else [""] + if len(pages) < total_pages: + pages.extend([""] * (total_pages - len(pages))) + return pages[:total_pages] + + @staticmethod + def _json_list(raw) -> list[str]: + if isinstance(raw, list): + return [str(item) for item in raw] + if not raw: + return [] + try: + value = json.loads(raw) + except (TypeError, json.JSONDecodeError): + return [part.strip() for part in str(raw).split(",") if part.strip()] + return [str(item) for item in value] if isinstance(value, list) else [] + + @staticmethod + def _language_list(value) -> list[str]: + if isinstance(value, list): + return [str(item).strip() for item in value if str(item).strip()] + if not value: + return [] + return [part.strip() for part in str(value).split(",") if part.strip()] + + @staticmethod + def _text_sha256(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest() + + @staticmethod + def _file_stem(filename: str, document_id: str) -> str: + stem = Path(filename or document_id).stem or document_id + safe = re.sub(r"[^\w.-]+", "_", stem).strip("._-") + return f"{safe or 'document'}__{document_id[:8]}" + + def export_run( + self, + *, + run: dict, + documents: list[dict], + pages_by_document: dict[str, list[dict]], + catalog_by_document: dict[str, dict], + document_ids: set[str] | None = None, + ) -> TextBundleExportResult: + tmp_parent = self.export_dir.parent + tmp_parent.mkdir(parents=True, exist_ok=True) + tmp_path = Path( + tempfile.mkdtemp(prefix=f"{self.export_dir.name}.", dir=tmp_parent) + ) + clean_dir = tmp_path / "clean" + raw_dir = tmp_path / "raw" + normalized_dir = tmp_path / "normalized" + clean_dir.mkdir() + raw_dir.mkdir() + normalized_dir.mkdir() + + page_rows: list[dict] = [] + document_rows: list[dict] = [] + + for doc in documents: + document_id = doc["document_id"] + if doc.get("status") != "completed": + continue + if document_ids is not None and document_id not in document_ids: + continue + + total_pages = int( + doc.get("total_pages") + or len(pages_by_document.get(document_id, [])) + or 0 + ) + raw_text = self._read_text(doc.get("artifact_raw_txt")) + clean_text = self._read_text(doc.get("artifact_clean_txt")) + normalized_text = self.normalizer.normalize_for_nlp(clean_text) + if not raw_text and not clean_text: + continue + + catalog = catalog_by_document.get(document_id, {}) + display_name = ( + catalog.get("display_title") + or catalog.get("title") + or doc.get("document_filename") + or "" + ) + stem = self._file_stem(display_name, document_id) + raw_rel = f"raw/{stem}.txt" + clean_rel = f"clean/{stem}.txt" + normalized_rel = f"normalized/{stem}.txt" + (tmp_path / raw_rel).write_text(raw_text, encoding="utf-8") + (tmp_path / clean_rel).write_text(clean_text, encoding="utf-8") + (tmp_path / normalized_rel).write_text( + normalized_text, encoding="utf-8" + ) + + raw_pages = self._split_pages(raw_text, total_pages) + clean_pages = self._split_pages(clean_text, total_pages) + normalized_pages = [ + self.normalizer.normalize_for_nlp(page) for page in clean_pages + ] + page_meta = { + row["page_num"]: row for row in pages_by_document.get(document_id, []) + } + language = self._language_list(catalog.get("language")) + document_quality_flags: set[str] = set() + + for page_num in range(1, total_pages + 1): + meta = page_meta.get(page_num, {}) + page_raw = raw_pages[page_num - 1] + page_clean = clean_pages[page_num - 1] + page_normalized = normalized_pages[page_num - 1] + quality_flags = self._json_list(meta.get("quality_flags")) + document_quality_flags.update(quality_flags) + page_rows.append( + { + "id": f"{document_id}_page_{page_num:04d}", + "run_id": run["id"], + "document_id": document_id, + "document_name": doc.get("document_filename"), + "group_path": catalog.get("group_path"), + "title": catalog.get("display_title") + or catalog.get("pdf_title"), + "author": catalog.get("author") or catalog.get("pdf_author"), + "work": catalog.get("work"), + "book": catalog.get("book"), + "document_date_label": catalog.get("document_date_label"), + "document_date_precision": catalog.get( + "document_date_precision" + ), + "language": language + or self._json_list(meta.get("detected_languages")), + "script": catalog.get("script") + or meta.get("primary_script"), + "page": page_num, + "raw_text": page_raw, + "clean_text": page_clean, + "normalized_text": page_normalized, + "raw_text_sha256": self._text_sha256(page_raw), + "clean_text_sha256": self._text_sha256(page_clean), + "normalized_text_sha256": self._text_sha256( + page_normalized + ), + "ocr_status": meta.get("status"), + "quality_flags": quality_flags, + "validation_issues": self._json_list( + meta.get("validation_issues") + ), + "extraction_mode": meta.get("extraction_mode"), + "extraction_attempt": meta.get("extraction_attempt"), + "source_file": doc.get("document_filename"), + "source_pdf_sha256": doc.get("file_sha256"), + "ocr_model": run.get("model_used") or settings.model_name, + "pipeline_version": run.get("pipeline_version") + or settings.pipeline_version, + } + ) + + document_rows.append( + { + "run_id": run["id"], + "document_id": document_id, + "document_name": doc.get("document_filename"), + "group_path": catalog.get("group_path"), + "title": catalog.get("display_title") + or catalog.get("pdf_title"), + "author": catalog.get("author") or catalog.get("pdf_author"), + "work": catalog.get("work"), + "book": catalog.get("book"), + "document_date_label": catalog.get("document_date_label"), + "document_date_precision": catalog.get("document_date_precision"), + "language": language, + "script": catalog.get("script"), + "page_count": total_pages, + "raw_file": raw_rel, + "clean_file": clean_rel, + "normalized_file": normalized_rel, + "raw_text_sha256": self._text_sha256(raw_text), + "clean_text_sha256": self._text_sha256(clean_text), + "normalized_text_sha256": self._text_sha256(normalized_text), + "quality_flags": sorted(document_quality_flags), + "source_pdf_sha256": doc.get("file_sha256"), + "ocr_model": run.get("model_used") or settings.model_name, + "pipeline_version": run.get("pipeline_version") + or settings.pipeline_version, + } + ) + + self._write_jsonl(tmp_path / "pages.jsonl", page_rows) + self._write_jsonl(tmp_path / "documents.jsonl", document_rows) + self._write_manifest(tmp_path, run, len(document_rows), len(page_rows)) + + if self.export_dir.exists(): + shutil.rmtree(self.export_dir) + tmp_path.replace(self.export_dir) + + bundle = self.export_dir.with_suffix(".zip") + if bundle.exists(): + bundle.unlink() + with zipfile.ZipFile(bundle, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for path in sorted(self.export_dir.rglob("*")): + if path.is_file(): + archive.write(path, arcname=path.relative_to(self.export_dir)) + + return TextBundleExportResult( + export_dir=self.export_dir, + bundle=bundle, + documents_count=len(document_rows), + pages_count=len(page_rows), + ) + + @staticmethod + def _write_jsonl(path: Path, rows: list[dict]) -> None: + path.write_text( + "".join(json.dumps(row, ensure_ascii=False) + "\n" for row in rows), + encoding="utf-8", + ) + + @staticmethod + def _write_manifest( + export_dir: Path, run: dict, documents_count: int, pages_count: int + ) -> None: + payload = { + "export_type": "text_bundle", + "run_id": run["id"], + "created_by": PROJECT_METADATA, + "documents_count": documents_count, + "pages_count": pages_count, + "schema_version": 1, + "artifacts": { + "clean_text_dir": "clean/", + "raw_text_dir": "raw/", + "normalized_text_dir": "normalized/", + "pages_jsonl": "pages.jsonl", + "documents_jsonl": "documents.jsonl", + }, + "ocr_model": run.get("model_used") or settings.model_name, + "pipeline_version": run.get("pipeline_version") + or settings.pipeline_version, + } + (export_dir / "manifest.json").write_text( + json.dumps(payload, indent=2, ensure_ascii=False), + encoding="utf-8", + ) diff --git a/ocr_pipeline/services/text_cleaner.py b/ocr_pipeline/services/text_cleaner.py index f4056c1..65c1a38 100644 --- a/ocr_pipeline/services/text_cleaner.py +++ b/ocr_pipeline/services/text_cleaner.py @@ -1,3 +1,4 @@ +import html import re import unicodedata @@ -24,18 +25,34 @@ class TextCleaner: ] ARTIFACT_PATTERNS = [ - re.compile(r"<\|[a-z_]+\|>"), # Any remaining special tokens - re.compile(r"\x00"), # Null bytes + re.compile(r"<\|/?[a-z_]+\|>"), # Any remaining special tokens + re.compile(r"\x00"), # Null bytes ] - # <|ref|>text<|/ref|>[[x, y, w, h]] — model reference blocks with bounding boxes + # <|ref|>text<|/ref|><|det|>[[x, y, w, h]]<|/det|> — grounding boxes + # are useful for debugging, but should not leak into clean corpus text. + _REF_DET_BLOCK_RE = re.compile( + r"<\|ref\|>.*?<\|/ref\|>\s*<\|det\|>\s*\[\[.*?\]\]\s*<\|/det\|>\s*", + re.DOTALL, + ) + + # Older/simple reference block shape without explicit det tags. _REF_BLOCK_RE = re.compile( - r"<\|ref\|>(.*?)<\|/ref\|>\[\[[\d\s,]+\]\]", + r"<\|ref\|>(.*?)<\|/ref\|>\s*\[\[[\d\s,]+\]\]", re.DOTALL, ) # End-of-line soft hyphens: "word-\n" or "word- \n" followed by continuation _HYPHEN_RE = re.compile(r"(\w)- ?\n(\w)") + _TABLE_RE = re.compile(r"]*>.*?", re.IGNORECASE | re.DOTALL) + _ROW_RE = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) + _CELL_RE = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) + _BR_RE = re.compile(r"", re.IGNORECASE) + _PARA_END_RE = re.compile(r"", re.IGNORECASE) + _HTML_TAG_RE = re.compile( + r"]*>", + re.IGNORECASE, + ) def clean(self, text: str, strip_refs: bool = False) -> str: """Full cleaning pipeline.""" @@ -43,10 +60,10 @@ def clean(self, text: str, strip_refs: bool = False) -> str: return "" text = self._normalize_unicode(text) - if strip_refs: - text = self._strip_ref_blocks(text) + text = self._strip_ref_blocks(text) text = self._strip_model_tokens(text) text = self._strip_artifacts(text) + text = self._html_to_text(text) text = self._rejoin_hyphens(text) text = self._normalize_whitespace(text) text = self._fix_common_ocr_issues(text) @@ -65,7 +82,8 @@ def clean_fidelity(self, text: str, strip_refs: bool = False) -> str: return text.strip() def _strip_ref_blocks(self, text: str) -> str: - """Remove <|ref|>...<|/ref|>[[bbox]] blocks, keeping the inner text.""" + """Remove grounding boxes while preserving older inline ref text.""" + text = self._REF_DET_BLOCK_RE.sub("", text) return self._REF_BLOCK_RE.sub(r"\1", text) def _rejoin_hyphens(self, text: str) -> str: @@ -89,6 +107,30 @@ def _strip_artifacts(self, text: str) -> str: text = pattern.sub("", text) return text + def _html_to_text(self, text: str) -> str: + """Convert occasional model-emitted HTML into readable plain text.""" + text = self._TABLE_RE.sub(lambda match: self._table_to_text(match.group(0)), text) + text = self._BR_RE.sub("\n", text) + text = self._PARA_END_RE.sub("\n", text) + text = self._HTML_TAG_RE.sub("", text) + return html.unescape(text) + + def _table_to_text(self, table: str) -> str: + rows: list[str] = [] + + for row_match in self._ROW_RE.finditer(table): + cells: list[str] = [] + for cell_match in self._CELL_RE.finditer(row_match.group(1)): + cell = self._HTML_TAG_RE.sub("", cell_match.group(1)) + cell = html.unescape(cell) + cell = re.sub(r"\s+", " ", cell).strip() + if cell: + cells.append(cell) + if cells: + rows.append(" | ".join(cells)) + + return "\n".join(rows) + def _normalize_whitespace(self, text: str) -> str: # Replace multiple blank lines with a single blank line text = re.sub(r"\n{3,}", "\n\n", text) diff --git a/ocr_pipeline/services/text_normalizer.py b/ocr_pipeline/services/text_normalizer.py new file mode 100644 index 0000000..7e14f31 --- /dev/null +++ b/ocr_pipeline/services/text_normalizer.py @@ -0,0 +1,22 @@ +import re + + +class TextNormalizer: + """Conservative NLP-oriented normalization for exported clean text.""" + + _MARKUP_RE = re.compile( + r"]*>", + re.I, + ) + _LINE_HYPHEN_RE = re.compile( + r"(?iu)([^\W\d_]{2,})-\s*\n\s*([^\W\d_]{2,})" + ) + _INLINE_HYPHEN_RE = re.compile( + r"(?iu)([^\W\d_]{2,})-\s{1,3}([^\W\d_]{2,})" + ) + + def normalize_for_nlp(self, text: str) -> str: + normalized = self._MARKUP_RE.sub("", text) + normalized = self._LINE_HYPHEN_RE.sub(r"\1\2", normalized) + normalized = self._INLINE_HYPHEN_RE.sub(r"\1\2", normalized) + return re.sub(r"\s+", " ", normalized).strip() diff --git a/ocr_pipeline/static/css/style.css b/ocr_pipeline/static/css/style.css index 2912a34..132ed6b 100644 --- a/ocr_pipeline/static/css/style.css +++ b/ocr_pipeline/static/css/style.css @@ -68,7 +68,35 @@ a { color: var(--accent); } .title-row { display: flex; align-items: baseline; gap: 10px; } .title-row h1 { margin: 0; font-size: 1.6rem; line-height: 1; } -.version { font-size: 0.85rem; color: var(--muted); } + +.view-nav { + display: flex; + align-items: center; + gap: 6px; + padding: 4px; + border: 1px solid var(--border); + border-radius: 999px; + background: var(--surface-strong); +} + +.nav-tab { + min-height: 32px; + padding: 6px 12px; + border: 0; + border-radius: 999px; + background: transparent; + color: var(--muted); + font: inherit; + font-size: 0.84rem; + font-weight: 700; + cursor: pointer; +} + +.nav-tab:hover, +.nav-tab.active { + background: var(--accent-soft); + color: var(--accent); +} .topbar-meta { display: flex; align-items: center; gap: 16px; } @@ -113,6 +141,9 @@ a { color: var(--accent); } min-height: calc(100vh - 100px); } +.console-grid.document-mode { grid-template-columns: 280px minmax(0, 1fr); } +.console-grid.solo-mode { grid-template-columns: minmax(0, 1fr); } + .rail, .stage, .inspector { border: 1px solid var(--border); border-radius: var(--radius); @@ -254,13 +285,16 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); .file-size { color: var(--muted); font-size: 0.85rem; white-space: nowrap; } .intake-options { display: grid; gap: 12px; grid-template-columns: 1fr; } -.field { display: flex; flex-direction: column; gap: 6px; } -.field span { font-size: 0.84rem; color: var(--muted); font-weight: 600; } +.field { display: flex; flex-direction: column; gap: 5px; min-width: 0; } +.field span { font-size: 0.76rem; color: var(--muted); font-weight: 700; } .field input[type="text"], .field input[type="password"] { - padding: 10px 14px; + min-width: 0; + height: 36px; + padding: 8px 11px; border: 1px solid var(--border-strong); - border-radius: var(--radius-sm); + border-radius: 8px; font: inherit; + font-size: 0.88rem; background: var(--surface-strong); } .field input:focus { outline: 2px solid var(--accent); outline-offset: 1px; border-color: transparent; } @@ -280,6 +314,128 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); .intake-cta { display: flex; align-items: center; gap: 14px; margin-top: 6px; } +/* ---------------- document workbench ---------------- */ + +.document-workbench { display: flex; flex-direction: column; min-height: 100%; } + +.document-workbench-body { + display: grid; + grid-template-columns: minmax(0, 1fr) 390px; + gap: 14px; + padding: 16px 24px 24px; +} + +.document-library, +.document-editor { + border: 1px solid var(--border); + border-radius: var(--radius-sm); + background: rgba(255, 255, 255, 0.62); + overflow: hidden; +} + +.document-toolbar { + display: flex; + align-items: center; + gap: 12px; + min-height: 48px; + padding: 8px 12px; + border-bottom: 1px solid var(--border); +} +.document-toolbar.drag-over { background: var(--accent-soft); } +.toolbar-input, +.toolbar-select { + height: 30px; + min-width: 0; + border: 1px solid var(--border-strong); + border-radius: 999px; + background: var(--surface-strong); + color: var(--text); + font: inherit; + font-size: 0.82rem; +} +.toolbar-input { padding: 6px 11px; } +.toolbar-select { padding: 5px 10px; } +.toolbar-search { flex: 1; max-width: 260px; } +.toolbar-group { width: 150px; } + +.document-table { display: grid; } +.document-group { display: grid; } +.document-group-head { + display: flex; + align-items: center; + justify-content: space-between; + min-height: 28px; + padding: 7px 12px; + border-bottom: 1px solid var(--border); + background: rgba(31, 109, 85, 0.07); + color: var(--muted); + font-size: 0.72rem; + font-weight: 700; + letter-spacing: 0.06em; + text-transform: uppercase; +} +.document-row { + display: grid; + grid-template-columns: 24px minmax(220px, 1fr) 86px 84px 78px; + align-items: center; + gap: 10px; + min-height: 52px; + padding: 8px 12px; + border-bottom: 1px solid var(--border); + cursor: pointer; +} +.document-row:last-child { border-bottom: none; } +.document-row:hover { background: rgba(255, 255, 255, 0.72); } +.document-row.active { background: var(--accent-soft); } +.document-row-head { + min-height: 34px; + cursor: default; + background: rgba(115, 100, 82, 0.08); + color: var(--muted); + font-size: 0.72rem; + font-weight: 700; + letter-spacing: 0.08em; + text-transform: uppercase; +} +.document-title { min-width: 0; display: grid; gap: 2px; } +.document-title strong, +.document-title span { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } +.document-title span { color: var(--muted); font-size: 0.78rem; } + +.document-editor-head { + padding: 12px 14px; + border-bottom: 1px solid var(--border); +} +.document-editor-head .eyebrow { + margin: 0 0 4px; + text-transform: uppercase; + letter-spacing: 0.12em; + color: var(--muted); + font-size: 0.66rem; + font-weight: 700; +} +.document-editor-head h3 { + margin: 0; + font-size: 0.96rem; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} +.document-editor-fields { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 10px 12px; + padding: 12px 14px; +} +.document-editor-fields .field-wide { grid-column: 1 / -1; } +.field-row { display: contents; } +.document-editor-actions { + display: flex; + justify-content: flex-end; + padding: 10px 14px; + border-top: 1px solid var(--border); +} + /* ---------------- buttons ---------------- */ .btn { @@ -307,29 +463,74 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); .run-detail { display: flex; flex-direction: column; } -.run-summary { - display: grid; - grid-template-columns: repeat(4, 1fr); +.run-terminal { + margin: 16px 24px; + padding: 14px 16px; + border-radius: 8px; + background: #111827; + color: #d1d5db; + font-family: var(--font-mono); +} + +.run-terminal-head, +.run-terminal-status { + display: flex; + align-items: center; +} + +.run-terminal-head { + justify-content: space-between; gap: 12px; - padding: 16px 24px; } -.summary-card { - padding: 12px 14px; - border: 1px solid var(--border); - border-radius: var(--radius-sm); - background: rgba(255, 255, 255, 0.6); + +.run-terminal-status { gap: 8px; min-width: 0; } +.run-terminal-label { color: #f9fafb; font-weight: 700; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } +.run-terminal code { color: #93c5fd; font: inherit; font-weight: 700; white-space: nowrap; } + +.run-spinner { + width: 10px; + height: 10px; + border: 2px solid rgba(209, 213, 219, 0.28); + border-top-color: #34d399; + border-radius: 50%; + animation: run-spin 0.8s linear infinite; } -.summary-label { font-size: 0.72rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.1em; } -.summary-card strong { display: block; margin-top: 4px; font-size: 1.05rem; font-weight: 700; } -.progress-track { +@keyframes run-spin { to { transform: rotate(360deg); } } + +.run-terminal-bar { height: 6px; - margin-top: 10px; + margin: 12px 0; border-radius: 999px; - background: rgba(31, 109, 85, 0.12); + background: #1f2937; overflow: hidden; } -.progress-fill { height: 100%; background: linear-gradient(90deg, #1f6d55, #2d8a6b); transition: width 0.25s ease; } + +.run-terminal-fill { + height: 100%; + border-radius: inherit; + background: #34d399; + transition: width 0.25s ease; +} + +.run-terminal-body { + display: grid; + gap: 5px; + font-size: 0.82rem; +} + +.run-terminal-body p { + display: flex; + gap: 10px; + margin: 0; + min-width: 0; +} + +.terminal-key { + width: 72px; + flex: 0 0 72px; + color: #9ca3af; +} .run-actions { display: flex; gap: 10px; padding: 0 24px 16px; } @@ -352,23 +553,115 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); .doc-name { font-weight: 600; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } .doc-meta { display: flex; gap: 6px; align-items: center; margin-top: 4px; font-size: 0.82rem; color: var(--muted); } -.heatmap { +/* ---------------- profiles + benchmarks ---------------- */ + +.profile-console, +.benchmark-console, +.run-overview { display: flex; - flex-wrap: wrap; - gap: 2px; - margin-top: 10px; + flex-direction: column; + min-height: 100%; +} + +.profile-grid { + display: grid; + grid-template-columns: repeat(3, minmax(0, 1fr)); + gap: 14px; + padding: 18px 24px 24px; +} + +.profile-card, +.benchmark-panel { + border: 1px solid var(--border); + border-radius: var(--radius-sm); + background: rgba(255, 255, 255, 0.7); + overflow: hidden; +} + +.profile-card { padding: 14px 16px; } + +.profile-card-head { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 12px; + margin-bottom: 12px; +} + +.profile-card .eyebrow { + margin: 0 0 5px; + text-transform: uppercase; + letter-spacing: 0.12em; + color: var(--muted); + font-size: 0.66rem; + font-weight: 700; +} + +.profile-card h3 { + margin: 0; + font: 0.94rem var(--font-mono); + overflow-wrap: anywhere; +} + +.profile-specs { + display: grid; + gap: 8px; + margin: 0; +} + +.profile-specs div { + display: grid; + grid-template-columns: 74px minmax(0, 1fr); + gap: 10px; + align-items: baseline; +} + +.profile-specs dt { + color: var(--muted); + font-size: 0.72rem; + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.06em; +} + +.profile-specs dd { + margin: 0; + min-width: 0; + font-size: 0.86rem; + overflow-wrap: anywhere; +} + +.benchmark-panel { + display: grid; + margin: 18px 24px 24px; +} + +.benchmark-row { + display: grid; + grid-template-columns: minmax(180px, 1.2fr) minmax(180px, 1fr) 90px 90px 120px; + align-items: center; + gap: 12px; + min-height: 48px; + padding: 9px 14px; + border-bottom: 1px solid var(--border); +} + +.benchmark-row:last-child { border-bottom: 0; } + +.benchmark-row-head { + min-height: 36px; + background: rgba(115, 100, 82, 0.08); + color: var(--muted); + font-size: 0.72rem; + font-weight: 700; + letter-spacing: 0.08em; + text-transform: uppercase; } -.heat-cell { - width: 12px; - height: 12px; - border-radius: 2px; - background: rgba(115, 100, 82, 0.18); + +.benchmark-row code { + font-family: var(--font-mono); + color: var(--accent); } -.heat-cell.page-pass { background: var(--success); } -.heat-cell.page-warn { background: var(--warn); } -.heat-cell.page-fail { background: var(--error); } -.heat-cell.page-empty { background: rgba(115, 100, 82, 0.5); } -.heat-cell.page-pending { background: rgba(115, 100, 82, 0.18); } /* ---------------- inspector ---------------- */ @@ -445,9 +738,51 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); min-height: 200px; max-height: 380px; } +.inspector-text-head { + position: sticky; + top: 0; + z-index: 1; + display: flex; + justify-content: space-between; + gap: 12px; + padding: 8px 14px; + border-bottom: 1px solid var(--border); + background: #fdfaf3; + color: var(--muted); + font-size: 0.76rem; + font-weight: 700; + letter-spacing: 0.06em; + text-transform: uppercase; +} +.inspector-text-head code { + font-family: var(--font-mono); + color: var(--accent); +} +.page-quality { + display: grid; + gap: 6px; + padding: 10px 14px; + border-bottom: 1px solid var(--border); + color: var(--muted); + font-size: 0.75rem; +} +.page-quality div { display: flex; flex-wrap: wrap; align-items: center; gap: 6px; } +.page-quality span { + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.06em; +} +.page-quality code { + max-width: 100%; + padding: 2px 6px; + border-radius: 6px; + background: rgba(115, 100, 82, 0.11); + color: var(--text); + overflow-wrap: anywhere; +} .inspector-text pre { margin: 0; - padding: 16px 20px; + padding: 14px 16px; white-space: pre-wrap; word-break: break-word; font: 0.86rem/1.55 var(--font-mono); @@ -516,6 +851,8 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); @media (max-width: 1280px) { .console-grid { grid-template-columns: 240px minmax(0, 1fr) 460px; } + .console-grid.document-mode { grid-template-columns: 240px minmax(0, 1fr); } + .profile-grid { grid-template-columns: repeat(2, minmax(0, 1fr)); } } @media (max-width: 1080px) { @@ -526,11 +863,20 @@ input[type="checkbox"] { width: 15px; height: 15px; accent-color: var(--accent); .rail { max-height: 320px; } .inspector { max-height: 720px; } .metric-strip { display: none; } + .document-workbench-body { grid-template-columns: 1fr; } + .profile-grid { grid-template-columns: 1fr; } + .benchmark-panel { overflow-x: auto; } + .benchmark-row { min-width: 760px; } } @media (max-width: 720px) { .topbar { flex-direction: column; align-items: stretch; gap: 10px; } - .run-summary { grid-template-columns: repeat(2, 1fr); } + .view-nav { justify-content: space-between; overflow-x: auto; } + .document-toolbar { flex-wrap: wrap; } + .toolbar-search, .toolbar-group { max-width: none; width: 100%; } + .document-row { grid-template-columns: 24px minmax(160px, 1fr) 72px 76px; } + .document-row > :last-child { display: none; } + .document-editor-fields { grid-template-columns: 1fr; } .toast-container { right: 10px; left: 10px; } } diff --git a/ocr_pipeline/static/index.html b/ocr_pipeline/static/index.html index ef99aaa..dade4c7 100644 --- a/ocr_pipeline/static/index.html +++ b/ocr_pipeline/static/index.html @@ -16,12 +16,17 @@
-

OCR Operations · by cdli.ai

+

cdli.ai

OpenCR

-
+
Active
@@ -46,10 +51,10 @@

OpenCR

-
+
-