feat: integra pipeline PDF→Markdown a 9 stadi e test suite

Porta da branch marker la riscrittura completa di conversione/_pipeline/
(9 stadi PyMuPDF) e la suite tests/ senza modificare il resto del progetto
RAG (ollama/, step-5/, step-6/, step-8/, rag.py, retrieve.py, config.py).

requirements.txt: aggiunge PyMuPDF>=1.24.0 e pytest>=8.0, mantiene chromadb,
rimuove opendataloader-pdf e pymupdf4llm.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-11 14:44:16 +02:00
parent 5215f53ad0
commit e1b5298b20
38 changed files with 3691 additions and 169 deletions
+7 -20
View File
@@ -26,26 +26,13 @@ __pycache__/
.DS_Store
Thumbs.db
# Report generati dagli script
step-0/*_step0_report.txt
step-1/*_step1_report.txt
# Output step-2 — MD grezzo generato da marker
step-2/*/
# Output step-3 — profilo struttura generato da detect_structure.py
step-3/*/
# Output step-4 — MD revisionato e log generati da revise.py
step-4/*/
step-4/revision_log.md
# Output step-5 — chunk generati da chunker.py
step-5/*/
# Output step-6 — report generati da verify_chunks.py
step-6/*/
# Output conversione/ — generati da conversione/pipeline.py
# Output conversione/ — generati dagli script
conversione/*/
!conversione/_pipeline/
!conversione/_pipeline/**
conversione/_pipeline/__pycache__/
# Output chunks/ — generati da chunks/chunker.py e chunks/verify_chunks.py
chunks/*/
+187 -19
View File
@@ -1,46 +1,214 @@
# CLAUDE.md — RAG from Scratch
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Missione
Ricostruire la struttura logica di PDF digitali e serializzarla in Markdown **stabile e valido per la vettorizzazione RAG**, senza LLM né OCR. Il Markdown è solo il formato di output finale — la pipeline deve operare su una rappresentazione intermedia strutturata.
**Non supportato:** PDF scansionati (immagini), PDF protetti da password.
---
## Regole invarianti
- **Lingua:** Rispondi sempre in italiano.
- **Venv:** Usa `.venv/bin/python` o `source .venv/bin/activate`. Mai `pip`/`python` di sistema.
- **`raw.md` immutabile:** La copia di lavoro è sempre `clean.md`.
- **`raw.md` immutabile:** Non modificare mai `raw.md`. La copia di lavoro è sempre `clean.md`.
- **Niente LLM nella pipeline:** tutta la logica deve essere rule-based e riproducibile.
---
## Pipeline
## Setup
```
PDF → conversione → chunking → verifica → vettorizzazione → retrieval
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
`--stem` = nome PDF senza estensione = nome collection ChromaDB.
Dipendenze principali:
Per i path degli script e degli output usa `git ls-files` o esplora la root: la struttura è in evoluzione verso un programma unico.
- **PyMuPDF** (`fitz`) — estrazione primaria con metadati font e coordinate
- **pdfplumber** — ricostruzione tabelle (opzionale, non per parsing generico)
---
## Configurazione
## Comandi
`config.py` è la fonte di verità: `EMBED_MODEL`, `OLLAMA_MODEL`, `TOP_K`, `TEMPERATURE`, `SYSTEM_PROMPT`.
```bash
# Converti un PDF (posizionalo prima in sources/<nome>.pdf)
.venv/bin/python conversione/ --stem <nome>
**Se cambi `EMBED_MODEL`:** riesegui ingest con `--force` — embedding incoerenti non producono errori ma risposte insensate.
# Forza riesecuzione (sovrascrive clean.md esistente)
.venv/bin/python conversione/ --stem <nome> --force
**Se cambi `MIN_CHARS` / `MAX_CHARS`:** cerca tutte le occorrenze nel repo e sincronizza.
# Tutti i PDF in sources/
.venv/bin/python conversione/
# Validazione batch
.venv/bin/python conversione/ validate
.venv/bin/python conversione/ validate <stem> --detail
# Rimuove l'output di uno stem
bash conversione/clear.sh <nome>
# Test suite
.venv/bin/python -m pytest tests/
```
`--stem` = nome file PDF senza estensione.
---
## Workflow consigliato
## Architettura
1. Converti il PDF con lo script di conversione
2. `/prepare-md conversione/<stem>/clean.md`
3. Chunking
4. Vettorizza con `--stem <stem>`
6. `python rag.py --stem <stem>`
### Principio fondamentale
La pipeline **non converte direttamente PDF → Markdown**.
```
PDF → Structured Document Tree → Markdown
```
Il Markdown è generato solo dall'albero documentale. Non dal testo grezzo.
### Modello dati intermedio
```python
class Block:
text: str
page: int
bbox: tuple
font_size: float
font_name: str
is_bold: bool
block_type: str # "header" | "paragraph" | "list" | "table" | "code"
class Section:
title: str
level: int # 1, 2, 3
content: list[Block]
children: list[Section]
```
Il Markdown si genera **solo** da `Section`. Mai da `Block` direttamente.
---
### I 9 stadi della pipeline
#### Stage 1 — Metadata Extraction
Usa `page.get_text("dict")` (o `"rawdict"`) di PyMuPDF. **Non usare estrazione plain text.**
Per ogni span estrai: testo, font size, font name, flags, bbox, numero di pagina.
Estrai anche: TOC del documento, bookmark, dimensioni pagina.
#### Stage 2 — Layout Analysis
Identifica i blocchi strutturali preservando l'ordine di lettura:
headers, paragrafi, liste, tabelle, code block, interruzioni di pagina.
#### Stage 3 — Font Analysis
Inferisce la gerarchia visiva **per documento** (non hardcoded):
- calcola il font size dominante del corpo
- raggruppa i font size in cluster
- identifica i candidati header per dimensione
#### Stage 4 — Header Detection
Segnali combinati (tutti richiesti):
- font size > corpo testo
- boldness / semibold
- numerazione (`^\d+(\.\d+)*\s+`)
- spaziatura verticale sopra/sotto
- lunghezza riga corta
#### Stage 5 — Hierarchy Inference
Priorità delle regole (in ordine):
1. **Numerazione**`1` → H1, `1.1` → H2, `1.1.1` → H3 (ha precedenza sul font size)
2. **TOC del PDF** — se presente, è autoritativo; allineare i header rilevati alla sua gerarchia
3. **Font size clustering** — fallback se né numerazione né TOC esistono
#### Stage 6 — Document Tree Reconstruction
Costruisce l'albero `Section` con relazioni parent-child, ordinamento e nesting. Ogni nodo contiene titolo, livello, contenuto e figli.
#### Stage 7 — Markdown Generation
Serializza l'albero in Markdown valido:
- Header: `#`/`##`/`###` senza salti di livello
- Liste: preserva nesting ordered/unordered
- Tabelle: GitHub-compatible; fallback testo strutturato
- Code block: fenced con language tag dove rilevabile
#### Stage 8 — Hierarchy Normalization
Ripara le inconsistenze strutturali:
- salti di livello invalidi (`# A``#### B` diventa `# A``## B`)
- header vuoti (rimuovi o mergia)
- header consecutivi duplicati (collassa)
- nesting rotto
#### Stage 9 — Structural Validation
Valida il Markdown finale:
- nessun salto di livello heading
- nessuna sezione vuota
- liste correttamente annidate
- tabelle con colonne consistenti
- ordine uguale al PDF sorgente
---
## Cosa rende un Markdown perfetto per la vettorizzazione
- **Struttura semantica:** ogni header è un confine naturale di chunk; ogni sezione è un'unità concettuale.
- **Gerarchia corretta:** h1/h2/h3 riflettono la struttura logica, non il layout tipografico.
- **Testo pulito:** nessun artefatto di encoding, footnote superscript, `<br>`, dot-leader, PUA.
- **Paragrafi interi:** nessuna frase troncata da salto pagina.
- **Output deterministico:** stessa pipeline su stesso PDF produce sempre lo stesso output.
---
## Linee guida per sviluppare la pipeline
- Ogni stage deve essere **indipendentemente testabile**.
- Le regex per header numbering e simili vanno compilate in `_constants.py`, mai inline.
- PyMuPDF è il parser primario. pdfplumber si usa solo per tabelle complesse.
- Ogni stage deve ricevere l'output del precedente come struttura tipizzata, non testo grezzo.
- Prima di aggiungere un nuovo segnale di detection (Stage 4), validarlo su almeno 3 PDF diversi.
### Categorie di test richieste
| Categoria | Input | Validazione attesa |
|-----------|-------|-------------------|
| Header reconstruction | PDF con H1/H2/H3 numerati | gerarchia corretta, no level skip |
| TOC alignment | PDF con bookmark/TOC | markdown allineato al TOC |
| Mixed font sizes | Font inconsistenti, bold nel corpo | body non classificato come header |
| Broken layout | Header multi-riga, spacing irregolare | header mergiati, markdown valido |
| Tables | Tabelle nel PDF | markdown table con colonne preservate |
| Lists | Liste ordered/unordered annidate | nesting corretto |
| Large documents | PDF tecnico voluminoso | output deterministico, memoria stabile |
| Invalid hierarchy repair | `# A` + `#### B` artificiale | riparazione automatica in `# A` + `## B` |
---
## Pipeline attuale
La pipeline in `conversione/_pipeline/` (basata su trasformazioni testo con `_apply.py`) è **deprecata** e deve essere sostituita dall'architettura a 9 stadi descritta sopra. Durante la migrazione:
- separare estrazione da ricostruzione
- introdurre strutture intermedie esplicite (`Block`, `Section`)
- rimuovere l'architettura parser-centrica
- ogni stage deve essere indipendente e testabile
---
## Skills custom
- `/prepare-md <path|stem>` — corregge `clean.md`: sillabazione, artefatti, header, paragrafi spezzati, gerarchia.
- `/step6-fix <stem>` — verifica chunk, dry-run e applicazione fix via `fix_chunks.py`.
- `/prepare-md <path|stem>` — corregge `clean.md` quando la pipeline non basta: sillabazione, artefatti residui, header malformati, gerarchia incoerente.
+31 -122
View File
@@ -1,62 +1,9 @@
# RAG from Scratch
# PDF → Markdown
Sistema RAG (Retrieval-Augmented Generation) costruito da zero, senza framework di alto livello.
Funziona su qualsiasi PDF digitale. Gira interamente in locale, senza GPU, senza cloud.
Converte PDF digitali in Markdown strutturato e pulito.
**Stack:** Python · Ollama · ChromaDB
**Compatibile con:** Linux · macOS · Windows (WSL2) · CPU only · ~8 GB RAM
---
## Pipeline
```
PDF → conversione → chunking → verifica → vettorizzazione → retrieval
```
| Fase | Rischio | Motivo |
|---|---|---|
| Conversione | 🟡 Medio | Automatica, ma il PDF deve essere digitale e non protetto |
| Revisione Markdown | 🔴 Alto | La qualità del MD determina la qualità del RAG |
| Chunking | 🟡 Medio | Adattivo, dipende dalla qualità del MD |
| Vettorizzazione | 🟢 Basso | Meccanica, lenta ma affidabile |
| Retrieval | 🟡 Medio | Dipende dai parametri in `config.py` |
---
## Struttura del progetto
```
rag/
├── sources/ # PDF originali — non modificare
├── conversione/ # PDF → Markdown strutturato
│ ├── pipeline.py
│ ├── validate.py
│ └── <stem>/
│ ├── raw.md # grezzo — non modificare
│ ├── clean.md # copia di lavoro
│ └── report.json
├── step-5/ # Chunking
│ ├── chunker.py
│ └── <stem>/chunks.json
├── step-6/ # Verifica e fix chunk
│ ├── verify_chunks.py
│ ├── fix_chunks.py
│ └── <stem>/
│ ├── chunks.json
│ └── report.json
├── step-8/ # Vettorizzazione
│ └── ingest.py
├── ollama/ # Setup ambiente
│ ├── check_env.py
│ └── test_ollama.py
├── chroma_db/ # Vector store (generato)
├── config.py # Configurazione pipeline ← modifica qui
├── rag.py # Interrogazione RAG interattiva
└── retrieve.py # Retrieval puro (senza LLM)
```
`--stem` = nome del PDF senza estensione = nome della collection ChromaDB.
**Stack:** Python · opendataloader-pdf (XY-Cut++) · Java 11+
**Compatibile con:** Linux · macOS · Windows (WSL2)
---
@@ -68,92 +15,54 @@ source .venv/bin/activate
pip install -r requirements.txt
```
**Java 11+** richiesto per la conversione (`opendataloader-pdf`):
**Java 11+** richiesto:
```bash
sudo apt install default-jdk # Ubuntu/Debian/WSL
java -version # verifica
java -version
```
Vedi [`ollama/README.md`](ollama/README.md) per l'installazione di Ollama e il download dei modelli.
---
## Workflow
### 1. Converti il PDF
## Utilizzo
```bash
# Singolo PDF
python conversione/pipeline.py --stem <nome>
# Tutti i PDF in sources/
python conversione/pipeline.py
# Forza riesecuzione
python conversione/pipeline.py --stem <nome> --force
```
Produce `conversione/<stem>/clean.md`. Vedi [`conversione/README.md`](conversione/README.md).
### 2. Rivedi il Markdown
```
/prepare-md conversione/<stem>/clean.md
```
Passaggio più importante: la qualità del RAG dipende da questo.
### 3. Chunking
```bash
python step-5/chunker.py --stem <nome>
```
### 4. Verifica e fix chunk
```bash
python step-6/verify_chunks.py --stem <nome>
python step-6/fix_chunks.py --stem <nome> # se ci sono 🔴
python step-6/verify_chunks.py --stem <nome> # ri-verifica
```
Non procedere alla vettorizzazione se ci sono 🔴.
### 5. Vettorizza
```bash
python step-8/ingest.py --stem <nome>
```
Vedi [`step-8/README.md`](step-8/README.md). Usa `--force` se hai cambiato `EMBED_MODEL` o i chunk.
### 6. Interroga
```bash
python rag.py --stem <nome> # risposta LLM
python retrieve.py --stem <nome> # retrieval puro (debug)
```
`--stem` = nome file PDF senza estensione.
Esempio: `sources/analisi1.pdf``--stem analisi1`
---
## Configurazione (`config.py`)
## Output
| Parametro | Default | Descrizione |
|---|---|---|
| `EMBED_MODEL` | `"nomic-embed-text"` | Modello embedding — deve corrispondere tra ingest e retrieval |
| `OLLAMA_MODEL` | `"qwen3.5:0.8b"` | Modello LLM |
| `OLLAMA_URL` | `"http://localhost:11434"` | Endpoint Ollama |
| `TOP_K` | `6` | Chunk recuperati per query |
| `TEMPERATURE` | `0.0` | Deterministico a `0.0` |
| `NO_THINK` | `True` | Disabilita chain-of-thought (Qwen3/Qwen3.5) |
| `SYSTEM_PROMPT` | *(vedi file)* | Istruzioni di comportamento per il LLM |
Per ogni stem in `conversione/<stem>/`:
> Se cambi `EMBED_MODEL`, riesegui `step-8/ingest.py --stem <nome> --force`.
| File | Descrizione |
|------|-------------|
| `raw.md` | Markdown grezzo — **non modificare** |
| `clean.md` | Markdown pulito — copia di lavoro |
| `structure_profile.json` | Struttura rilevata e metriche |
| `report.json` | Statistiche complete della conversione |
---
## Principi
## Validazione batch
**Atomico** — ogni fase fa una cosa sola; se si rompe qualcosa sai esattamente dove.
```bash
python conversione/validate.py
```
**Verificabile** — ogni fase ha un criterio di completamento oggettivo prima di procedere.
Stampa una tabella di stato su tutti gli stem convertiti.
**Reversibile** — puoi tornare indietro senza perdere il lavoro delle altre fasi.
---
**Adattivo** — nessuna assunzione sulla struttura del documento; si adatta automaticamente.
**Locale** — nessuna API esterna, nessun dato trasmesso fuori dalla macchina.
Vedi [`conversione/README.md`](conversione/README.md) per dettagli sulla pipeline e i tipi di documento supportati.
+109
View File
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Pipeline PDF → clean Markdown per vettorizzazione RAG.
Uso:
# Converti
python conversione/ --stem <nome>
python conversione/ --stem <nome> --force
python conversione/ # tutti i PDF in sources/
# Valida
python conversione/ validate
python conversione/ validate <stem> [<stem> ...] --detail
Prerequisiti:
pip install opendataloader-pdf pdfplumber
Java 11+ sul PATH (https://adoptium.net/)
"""
import argparse
import sys
from pathlib import Path
# Rende _pipeline importabile da conversione/
sys.path.insert(0, str(Path(__file__).parent))
from _pipeline import run, validate
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="conversione",
description="PDF → clean Markdown strutturato, pronto per chunking RAG",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Esempi:\n"
" python conversione/ --stem manuale\n"
" python conversione/ --stem manuale --force\n"
" python conversione/ validate\n"
" python conversione/ validate manuale --detail"
),
)
# ── Subcommand: validate ──────────────────────────────────────────────
sub = parser.add_subparsers(dest="cmd", metavar="comando")
val = sub.add_parser(
"validate",
help="valida i report.json prodotti dalla conversione",
description="Legge i report.json e assegna un voto 0-100 (A/B/C/D/F).",
)
val.add_argument(
"stems",
nargs="*",
metavar="STEM",
help="stem da validare. Ometti per tutti.",
)
val.add_argument(
"--detail", "-d",
action="store_true",
help="mostra il dettaglio delle penalità per ogni documento",
)
# ── Opzioni convert (modalità default) ───────────────────────────────
parser.add_argument(
"--stem",
metavar="NOME",
help="nome del PDF in sources/ (senza estensione). Ometti per tutti.",
)
parser.add_argument(
"--force",
action="store_true",
help="riesegui anche se clean.md è già presente",
)
return parser
def main() -> None:
parser = _build_parser()
args = parser.parse_args()
root = Path(__file__).parent.parent
# ── Validate ─────────────────────────────────────────────────────────
if args.cmd == "validate":
validate(args.stems, root, detail=args.detail)
return
# ── Convert (default) ────────────────────────────────────────────────
if args.stem:
stems = [args.stem]
else:
sources_dir = root / "sources"
if not sources_dir.exists():
print("Errore: cartella sources/ non trovata.")
sys.exit(1)
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
if not stems:
print("Errore: nessun PDF trovato in sources/.")
sys.exit(1)
results = [run(s, root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
sys.exit(0 if all(results) else 1)
if __name__ == "__main__":
main()
+30
View File
@@ -0,0 +1,30 @@
from .extract import validate_pdf, extract_metadata
from .structure import analyze
from .report import build_report
from .runner import run
from .validator import validate
from .models import Block, Section, FontProfile
from .stage1_metadata import extract_raw_data
from .stage2_layout import analyze_layout
from .stage3_font import build_font_profile
from .stage4_headers import classify_blocks
from .stage5_hierarchy import infer_hierarchy
from .stage6_tree import build_tree
from .stage7_markdown import serialize_tree
from .stage8_normalize import normalize_hierarchy
from .stage9_validate import validate_markdown, ValidationResult
__all__ = [
"validate_pdf", "extract_metadata",
"analyze", "build_report", "run", "validate",
"Block", "Section", "FontProfile",
"extract_raw_data",
"analyze_layout",
"build_font_profile",
"classify_blocks",
"infer_hierarchy",
"build_tree",
"serialize_tree",
"normalize_hierarchy",
"validate_markdown", "ValidationResult",
]
+169
View File
@@ -0,0 +1,169 @@
"""
Costanti di modulo condivise tra i moduli di trasformazione.
Tutte le regex compilate e le mappe statiche vivono qui.
"""
import re
# ─── Keyword sets ─────────────────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# ─── PUA Symbol font map ──────────────────────────────────────────────────────
_SYMBOL_PUA_MAP: dict[str, str] = {
"": " ",
"": "(",
"": ")",
"": "+",
"": "",
"": ".",
"": "/",
"": "0", "": "1", "": "2", "": "3", "": "4",
"": "5", "": "6", "": "7", "": "8", "": "9",
"": ":", "": ";", "": "<", "": "=", "": ">",
"": "",
"": "Α", "": "Β", "": "Χ", "": "Δ", "": "Ε",
"": "Φ", "": "Γ", "": "Η", "": "Ι", "": "ϑ",
"": "Κ", "": "Λ", "": "Μ", "": "Ν", "": "Ο",
"": "Π", "": "Θ", "": "Ρ", "": "Σ", "": "Τ",
"": "Υ", "": "ς", "": "Ω", "": "Ξ", "": "Ψ",
"": "Ζ",
"": "[",
"": "",
"": "]",
"": "",
"": "α", "": "β", "": "χ", "": "δ", "": "ε",
"": "φ", "": "γ", "": "η", "": "ι", "": "ϕ",
"": "κ", "": "λ", "": "μ", "": "ν", "": "ο",
"": "π", "": "θ", "": "ρ", "": "σ", "": "τ",
"": "υ", "": "ϖ", "": "ω", "": "ξ", "": "ψ",
"": "ζ",
"": "{",
"": "|",
"": "}",
"": "~",
"": "±",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "×",
"": "÷",
"": "×",
"": "",
"": "",
"": "",
"": "",
"": "*",
"": ",",
"": "",
"": "",
"": "",
"": "",
"": "÷",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "",
# TeX Computer Modern bracket/delimiter pieces (U+F8EBF8FE) → stringa vuota
"": "", # TeX large paren left
"": "", # TeX large paren extension
"": "", # TeX large paren right
"": "", # TeX large paren right ext
"": "", # TeX large bracket left
"": "", # TeX large bracket ext
"": "", # TeX brace top-left
"": "", # TeX brace mid
"": "", # TeX brace mid-right
"": "", # TeX brace extension
"": "", # TeX brace right
"": "", # TeX bracket right large
"": "", # TeX bracket right ext
"": "", # TeX bracket right close
"": "", # TeX integral large
"": "", # TeX integral extension
"": "", # TeX integral top
"": "", # TeX radical top
"": "", # TeX radical extension
"": "", # TeX arrowhead
}
_SYMBOL_PUA_RE = re.compile(
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
)
# ─── Regex compilate condivise ────────────────────────────────────────────────
_SUPERSCRIPT_RE = re.compile(r'[¹²³⁰⁴-⁹]+')
_FOOTNOTE_BODY_RE = re.compile(
r'^([¹²³⁰⁴-⁹]+\s+|\[\d{1,3}\]\s+)'
)
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
_BIB_MARKERS_RE = re.compile(
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
r'|\b(19|20)\d{2}\b'
r'|\b(ibid\.?|ibidem|op\.\s*cit\.?|cit\.|cfr\.|ivi[,;\s])\b',
re.IGNORECASE,
)
# Pattern autore accademico: iniziale maiuscola + cognome TUTTO-MAIUSCOLO (es. "A. SMITH")
_FOOTNOTE_AUTHOR_RE = re.compile(r'(?<![A-Z])[A-Z]\.\s+[A-Z]{3,}')
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
_TABSEP_RE = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
_DOTLEADER_RE = re.compile(r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$", re.MULTILINE)
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
_VERSE_NUM_RE = re.compile(
r"([.!?\xbb'\"" + "" + r"]\s+)(\d+)(\s+)(?=[A-Z\xc0-\xd9a-z\xe0-\xf9\xab“”‟])"
)
# Math header demotion
_MATH_SYMBOLS_RE = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_EXERCISE_TRIGGER_RE = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
_MATH_HDR_RE = re.compile(r"^(#{2,3})\s+(.+)$")
_NUMBERED_PREFIX_RE = re.compile(r"^(\d+(?:\.\d+)*[.)])\s+(.+)$", re.DOTALL)
# Orphan TOC: voce di indice senza dot-leader (es. "3. Funzioni 174")
_TOC_ITEM_RE = re.compile(
r"^\d+(\.\d+)*\.?\s+[A-Za-zÀ-ú\'\(][^\n]{2,70}$"
)
_TOC_HDR_WITH_PAGE_RE = re.compile(
r"^#{1,3}\s+\d+\.?\s+.{3,60}\s+\d{1,4}$"
)
# Artefatti PDF: page markers e separatori
_PAGE_MARKER_RE = re.compile(r"(?m)^<!-- page: \d+ -->\s*$")
_STANDALONE_NUM_RE = re.compile(r"(?m)^(?:- )?\d{1,3}$")
_UNDERSCORE_SEP_RE = re.compile(r"(?m)^_{4,}\s*$")
+153
View File
@@ -0,0 +1,153 @@
"""Funzioni helper pure condivise tra i moduli di trasformazione."""
import re
from ._constants import _ORDINALS_IT, _ORDINALS_EN
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|")
)
def _allcaps_to_header(raw_line: str) -> str:
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
_ENVS = (
r"Definizione|Definition|Teorema|Theorem|Lemma|"
r"Proposizione|Proposition|Corollario|Corollary|"
r"Osservazione|Remark|Nota|Note|Esempio|Example"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
title_m = re.match(
r"^([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda].{1,74}?)\.\s+"
r"([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
+82
View File
@@ -0,0 +1,82 @@
"""Validazione PDF e estrazione metadati tramite fitz."""
import re
from pathlib import Path
def validate_pdf(pdf_path: Path) -> tuple[bool, str]:
"""Verifica esistenza, leggibilità e presenza di testo digitale estraibile."""
if not pdf_path.exists():
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
size = pdf_path.stat().st_size
if size == 0:
return False, "File vuoto"
if size < 1024:
return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto"
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
n_pages = len(pdf.pages)
if n_pages == 0:
return False, "PDF senza pagine"
sample = min(5, n_pages)
pages_with_text = sum(
1 for i in range(sample)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
extended = min(15, n_pages)
if extended > sample:
ext_with_text = sum(
1 for i in range(sample, extended)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if ext_with_text > 0:
return True, (
f"{n_pages} pagine — prime {sample} vuote, "
f"testo trovato in pagine successive "
f"(possibile copertina immagine)"
)
return False, (
f"Nessun testo nelle prime {extended} pagine "
f"— probabilmente scansionato (OCR non supportato)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except MemoryError:
return False, "Memoria esaurita durante l'apertura del PDF"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
return False, "PDF protetto da password"
return False, f"Impossibile aprire: {e}"
def extract_metadata(pdf_path: Path) -> dict:
"""
Estrae title, author, year e page count dal PDF tramite fitz.
Restituisce un dict con chiavi sempre presenti (stringa vuota se assenti).
"""
try:
import fitz
doc = fitz.open(str(pdf_path))
meta = doc.metadata
pages = len(doc)
doc.close()
year = ""
creation = meta.get("creationDate", "")
m = re.match(r"D:(\d{4})", creation)
if m:
year = m.group(1)
return {
"source": pdf_path.name,
"title": (meta.get("title") or "").strip(),
"author": (meta.get("author") or "").strip(),
"year": year,
"pages": pages,
}
except Exception:
return {"source": pdf_path.name, "title": "", "author": "", "year": "", "pages": 0}
+44
View File
@@ -0,0 +1,44 @@
"""Strutture dati intermedie della pipeline: Block, Section, FontProfile."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class Block:
text: str
page: int
bbox: tuple[float, float, float, float] # x0, y0, x1, y1
font_size: float
font_name: str
is_bold: bool
block_type: str = "paragraph" # paragraph|header_candidate|list_item|table|ignore
space_before: float = 0.0
level: int = 0 # assegnato da stage5 (0 = non header)
origin_spans: list[dict] = field(default_factory=list, repr=False)
@property
def x0(self) -> float: return self.bbox[0]
@property
def y0(self) -> float: return self.bbox[1]
@property
def x1(self) -> float: return self.bbox[2]
@property
def y1(self) -> float: return self.bbox[3]
@dataclass
class Section:
title: str
level: int # 1, 2, 3
content: list[Block] = field(default_factory=list)
children: list[Section] = field(default_factory=list)
page_start: int = 0
source_block: Block | None = field(default=None, repr=False)
@dataclass
class FontProfile:
body_size: float
cluster_map: dict[float, int] # font_size arrotondato → livello (1/2/3)
header_sizes: list[float] # taglie candidate header, ordinate desc
+135
View File
@@ -0,0 +1,135 @@
import json
import re
from datetime import datetime
from pathlib import Path
from .structure import _parse_sections_with_body
from ._constants import _MATH_SYMBOLS_RE, _EXERCISE_TRIGGER_RE, _MATH_HDR_RE
def build_report(
stem: str,
out_dir: Path,
clean_text: str,
t_stats: dict,
profile: dict,
reduction: float,
) -> Path:
text_lines = clean_text.split("\n")
sections = _parse_sections_with_body(clean_text, 3)
lengths = [len(body) for _, body in sections]
def _pct(data: list[int], p: float) -> int:
if not data:
return 0
s = sorted(data)
return s[max(0, min(len(s) - 1, int(len(s) * p)))]
distribution = {
"min": min(lengths) if lengths else 0,
"p25": _pct(lengths, 0.25),
"mediana": _pct(lengths, 0.50),
"p75": _pct(lengths, 0.75),
"max": max(lengths) if lengths else 0,
}
bare_hdrs = [
{"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")}
for hdr, body in sections
if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30
]
short_secs = [
{"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")}
for (hdr, body), length in zip(sections, lengths)
if 0 < length < 150
]
long_secs = [
{"header": hdr, "chars": length}
for (hdr, _), length in zip(sections, lengths)
if length > 1500
]
def _scan(pattern: str, max_n: int = 10) -> list[dict]:
hits = []
for i, line in enumerate(text_lines):
if re.search(pattern, line) and not re.match(r"^#+ ", line):
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
def _scan_formula_headers(max_n: int = 10) -> list[dict]:
hits = []
for i, line in enumerate(text_lines):
m = _MATH_HDR_RE.match(line)
if not m:
continue
body = m.group(2)
if len(body) <= 100:
continue
has_math = len(_MATH_SYMBOLS_RE.findall(body)) >= 3
has_ex = bool(_EXERCISE_TRIGGER_RE.search(body))
if has_math or has_ex:
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
residui = {
"backtick": _scan(r"`"),
"dotleader": _scan(r"(?:\. ){5,}"),
"url": _scan(r"^(https?://|www\.)\S+"),
"immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"),
"br_inline": _scan(r"<br>"),
"simboli_encoding": _scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'),
"formule_inline": _scan(r"\[\d+\.\d+\]"),
"footnote_markers": _scan(r'[¹²³⁰⁴-⁹]'),
"pua_markers": _scan(r'[-]'),
"formula_headers": _scan_formula_headers(),
}
report = {
"stem": stem,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"transforms": {
**t_stats,
"riduzione_pct": round(reduction),
},
"structure": profile,
"distribution": distribution,
"anomalie": {
"bare_headers": len(bare_hdrs),
"short_sections": len(short_secs),
"long_sections": len(long_secs),
"bare_headers_list": bare_hdrs,
"short_sections_list": short_secs,
"long_sections_list": long_secs,
},
"residui": {
"backtick": len(residui["backtick"]),
"dotleader": len(residui["dotleader"]),
"url": len(residui["url"]),
"immagini": len(residui["immagini"]),
"br_inline": len(residui["br_inline"]),
"simboli_encoding": len(residui["simboli_encoding"]),
"formule_inline": len(residui["formule_inline"]),
"footnote_markers": len(residui["footnote_markers"]),
"pua_markers": len(residui["pua_markers"]),
"backtick_esempi": residui["backtick"],
"dotleader_esempi": residui["dotleader"],
"url_esempi": residui["url"],
"immagini_esempi": residui["immagini"],
"br_inline_esempi": residui["br_inline"],
"simboli_encoding_esempi": residui["simboli_encoding"],
"formule_inline_esempi": residui["formule_inline"],
"footnote_markers_esempi": residui["footnote_markers"],
"pua_markers_esempi": residui["pua_markers"],
"formula_headers": len(residui["formula_headers"]),
"formula_headers_esempi": residui["formula_headers"],
},
}
report_path = out_dir / "report.json"
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
return report_path
+220
View File
@@ -0,0 +1,220 @@
"""Orchestrazione della pipeline PDF → Markdown a 9 stadi."""
import json
import sys
import threading
import time
from pathlib import Path
from .extract import validate_pdf, extract_metadata
from .stage1_metadata import extract_raw_data_with_pdfplumber_fallback as extract_raw_data
from .stage2_layout import analyze_layout
from .stage3_font import build_font_profile
from .stage4_headers import classify_blocks
from .stage5_hierarchy import infer_hierarchy
from .stage6_tree import build_tree
from .stage7_markdown import serialize_tree
from .stage8_normalize import normalize_hierarchy
from .stage9_validate import validate_markdown
from .structure import analyze
from .report import build_report
from .validator import _score, _grade
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
_SPIN_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
def _build_frontmatter(meta: dict) -> str:
lines = ["---", f"source: {meta['source']}"]
if meta.get("title"):
lines.append(f'title: "{meta["title"]}"')
if meta.get("author"):
lines.append(f'author: "{meta["author"]}"')
if meta.get("year"):
lines.append(f"year: {meta['year']}")
if meta.get("pages"):
lines.append(f"pages: {meta['pages']}")
lines += ["---", ""]
return "\n".join(lines) + "\n"
class _Spinner:
def __init__(self, prefix: str):
self._prefix = prefix
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
self._t0 = 0.0
def __enter__(self):
self._t0 = time.perf_counter()
self._thread.start()
return self
def __exit__(self, *_):
self._stop.set()
self._thread.join()
sys.stdout.write("\r" + " " * 72 + "\r")
sys.stdout.flush()
def _run(self):
i = 0
while not self._stop.wait(0.1):
elapsed = time.perf_counter() - self._t0
frame = _SPIN_FRAMES[i % len(_SPIN_FRAMES)]
sys.stdout.write(f"\r {frame} {self._prefix} {elapsed:.0f}s")
sys.stdout.flush()
i += 1
def run(stem: str, project_root: Path, force: bool) -> bool:
pdf_path = project_root / "sources" / f"{stem}.pdf"
out_dir = project_root / "conversione" / stem
raw_out = out_dir / "raw.md"
clean_out = out_dir / "clean.md"
print(f"\n{'' * 52}")
print(f" {stem}")
print(f"{'' * 52}")
if clean_out.exists() and not force:
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
# ── [1] Validazione PDF ───────────────────────────────────────────────────
print(" [1/9] Validazione PDF...")
pdf_mb = pdf_path.stat().st_size / (1024 * 1024) if pdf_path.exists() else 0
print(f" File: {pdf_path.name} ({pdf_mb:.1f} MB)")
ok, msg = validate_pdf(pdf_path)
if not ok:
print(f"{msg}")
return False
print(f"{msg}")
meta = extract_metadata(pdf_path)
meta["source"] = pdf_path.name
if meta.get("title"):
print(f" Titolo: {meta['title']}")
if meta.get("author"):
print(f" Autore: {meta['author']}")
# ── [2] Stage 1: estrazione span ──────────────────────────────────────────
print(" [2/9] Stage 1: Estrazione span PyMuPDF...")
with _Spinner("Lettura PDF con PyMuPDF..."):
try:
raw_blocks, doc_meta = extract_raw_data(pdf_path)
except Exception as e:
print(f" ✗ Estrazione fallita: {e}")
return False
print(f"{len(raw_blocks)} span estratti da {doc_meta['page_count']} pagine")
toc_entries = len(doc_meta.get("toc", []))
if toc_entries:
print(f" TOC: {toc_entries} voci")
# ── [3] Stage 2: layout ───────────────────────────────────────────────────
print(" [3/9] Stage 2: Analisi layout e reading order...")
with _Spinner("Analisi layout..."):
blocks = analyze_layout(raw_blocks, doc_meta)
print(f"{len(blocks)} blocchi dopo layout analysis")
# ── [4] Stage 3: font analysis ────────────────────────────────────────────
print(" [4/9] Stage 3: Font analysis...")
profile = build_font_profile(blocks)
print(f" ✅ Body size: {profile.body_size}pt "
f"Header sizes: {profile.header_sizes}")
# ── [5] Stage 4: header detection ─────────────────────────────────────────
print(" [5/9] Stage 4: Header detection...")
blocks = classify_blocks(blocks, profile)
n_candidates = sum(1 for b in blocks if b.block_type == "header_candidate")
print(f"{n_candidates} header candidate rilevati")
# ── [6] Stage 5: hierarchy inference ─────────────────────────────────────
print(" [6/9] Stage 5: Hierarchy inference...")
blocks = infer_hierarchy(blocks, profile, doc_meta.get("toc", []))
from collections import Counter
level_dist = Counter(b.level for b in blocks if b.block_type == "header_candidate")
print(f" ✅ H1={level_dist.get(1,0)} H2={level_dist.get(2,0)} H3={level_dist.get(3,0)}")
# ── [7] Stage 6: document tree ────────────────────────────────────────────
print(" [7/9] Stage 6: Document tree reconstruction...")
tree = build_tree(blocks)
print(f"{len(tree)} sezioni radice")
# ── [8] Stage 7: markdown generation ─────────────────────────────────────
print(" [8/9] Stage 7: Markdown generation...")
with _Spinner("Serializzazione albero..."):
raw_md = serialize_tree(tree, meta, pdf_path=pdf_path)
size_kb = len(raw_md.encode()) // 1024
n_lines = raw_md.count("\n")
print(f" ✅ raw.md: {size_kb} KB, {n_lines} righe")
# Scrittura raw.md (IMMUTABILE)
try:
out_dir.mkdir(parents=True, exist_ok=True)
if not raw_out.exists() or force:
raw_out.write_text(raw_md, encoding="utf-8")
except PermissionError as e:
print(f" ✗ Permesso negato durante la scrittura: {e}")
return False
# ── [9] Stage 8+9: normalizzazione + validazione ──────────────────────────
print(" [9/9] Stage 8-9: Normalize + validate...")
clean_md, norm_stats = normalize_hierarchy(raw_md)
validation = validate_markdown(clean_md, meta.get("pages", 0))
if norm_stats["n_level_jumps_repaired"]:
print(f" Salti livello riparati: {norm_stats['n_level_jumps_repaired']}")
if norm_stats["n_empty_headers_removed"]:
print(f" Header vuoti rimossi: {norm_stats['n_empty_headers_removed']}")
if norm_stats["n_duplicate_headers_removed"]:
print(f" Header duplicati rimossi: {norm_stats['n_duplicate_headers_removed']}")
for w in validation.warnings:
print(f" ⚠️ {w}")
for e in validation.errors:
print(f"{e}")
# Aggiungi frontmatter a clean.md
frontmatter = _build_frontmatter(meta)
full_clean = frontmatter + clean_md
try:
clean_out.write_text(full_clean, encoding="utf-8")
except PermissionError as e:
print(f" ✗ Permesso negato durante la scrittura di clean.md: {e}")
return False
print(f" ✅ clean.md scritto")
# ── Analisi struttura + report + score ────────────────────────────────────
profile_struct = analyze(clean_out)
(out_dir / "structure_profile.json").write_text(
json.dumps(profile_struct, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f" Struttura: livello {profile_struct['livello_struttura']}"
f"{_LIVELLO_DESC[profile_struct['livello_struttura']]}")
print(f" h1={profile_struct['n_h1']} h2={profile_struct['n_h2']} "
f"h3={profile_struct['n_h3']} paragrafi={profile_struct['n_paragrafi']}")
print(f" Strategia chunking: {profile_struct['strategia_chunking']}")
print(f" Lingua rilevata: {profile_struct['lingua_rilevata']}")
for w in profile_struct["avvertenze"]:
print(f" ⚠️ {w}")
t_stats = {
**norm_stats,
"validation": validation.to_dict(),
}
reduction = 100.0 * (1 - len(clean_md) / len(raw_md)) if raw_md else 0.0
report_path = build_report(stem, out_dir, full_clean, t_stats, profile_struct, reduction)
report_data = json.loads(report_path.read_text(encoding="utf-8"))
score, _ = _score(report_data)
print(f"\n Output → conversione/{stem}/")
print(f" raw.md (immutabile) clean.md report.json")
print(f" Punteggio qualità: {score}/100 {_grade(score)}")
return True
+260
View File
@@ -0,0 +1,260 @@
"""Stage 1: estrazione raw span da PDF con PyMuPDF + metadati documento."""
from pathlib import Path
import fitz # PyMuPDF
from .models import Block
_BOLD_FONT_KEYWORDS = ("bold", "heavy", "black", "demi", "semibold")
# Mappa PUA (U+F000U+F0FF) → Unicode per font Symbol e font math LaTeX.
# Le chiavi sono caratteri nel range PUA come estratti da PyMuPDF.
_SYMBOL_PUA_MAP: dict[str, str] = {
'': ' ', '': '!', '': '', '': '#',
'': '', '': '%', '': '&', '': '',
'': '(', '': ')', '': '', '': '+',
'': ',', '': '', '': '.', '': '/',
'': '0', '': '1', '': '2', '': '3',
'': '4', '': '5', '': '6', '': '7',
'': '8', '': '9', '': ':', '': ';',
'': '<', '': '=', '': '>', '': '?',
'': '', '': 'Α', '': 'Β', '': 'Χ',
'': 'Δ', '': 'Ε', '': 'Φ', '': 'Γ',
'': 'Η', '': 'Ι', '': 'ϑ', '': 'Κ',
'': 'Λ', '': 'Μ', '': 'Ν', '': 'Ο',
'': 'Π', '': 'Θ', '': 'Ρ', '': 'Σ',
'': 'Τ', '': 'Υ', '': 'ς', '': 'Ω',
'': 'Ξ', '': 'Ψ', '': 'Ζ', '': '[',
'': '', '': ']', '': '', '': '_',
'': 'α', '': 'β', '': 'χ', '': 'δ',
'': 'ε', '': 'φ', '': 'γ', '': 'η',
'': 'ι', '': 'ϕ', '': 'κ', '': 'λ',
'': 'μ', '': 'ν', '': 'ο', '': 'π',
'': 'θ', '': 'ρ', '': 'σ', '': 'τ',
'': 'υ', '': 'ϖ', '': 'ω', '': 'ξ',
'': 'ψ', '': 'ζ', '': '{', '': '|',
'': '}', '': '',
'': 'ϒ', '': '', '': '', '': '',
'': '', '': 'ƒ', '': '', '': '',
'': '', '': '', '': '', '': '',
'': '', '': '', '': '',
'': '°', '': '±', '': '', '': '',
'': '×', '': '', '': '', '': '',
'': '÷', '': '', '': '', '': '',
'': '', '': '|', '': '',
'': '', '': '', '': '', '': '',
'': '', '': '', '': '', '': '',
'': '', '': '', '': '', '': '',
'': '', '': '', '': '', '': '',
'': '', '': '', '': '', '': '©',
'': '', '': '', '': '', '': '',
'': '¬', '': '', '': '',
'': '', '': '', '': '',
'': '', '': '', '': '', '': '',
}
# Font che tipicamente contengono caratteri PUA math (LaTeX e Symbol)
_MATH_FONT_KEYWORDS = ("symbol", "cmmi", "cmsy", "msam", "msbm", "eurm", "cmex", "math")
def _clean_pua(text: str) -> str:
"""
Applica la mappatura PUA→Unicode a TUTTI i testi estratti.
Converte i caratteri nel range U+F000U+F0FF usando _SYMBOL_PUA_MAP;
i caratteri PUA non mappati vengono rimossi (sostituiti con stringa vuota).
"""
result = []
for ch in text:
cp = ord(ch)
if 0xF000 <= cp <= 0xF0FF:
mapped = _SYMBOL_PUA_MAP.get(ch)
if mapped is not None:
result.append(mapped)
# carattere PUA non mappato → scarta (artefatto illeggibile)
else:
result.append(ch)
return ''.join(result)
def _is_bold_span(span: dict) -> bool:
if span["flags"] & 16:
return True
return any(kw in span["font"].lower() for kw in _BOLD_FONT_KEYWORDS)
def _extract_page_blocks(page: fitz.Page, page_num: int) -> list[Block]:
page_dict = page.get_text("dict")
blocks: list[Block] = []
prev_y1 = 0.0
for raw_block in page_dict["blocks"]:
if raw_block.get("type") != 0: # ignora blocchi immagine
continue
for line in raw_block.get("lines", []):
spans = line.get("spans", [])
if not spans:
continue
# Aggrega span della stessa riga con stesso font+size in un Block
groups: list[list[dict]] = []
current: list[dict] = []
for sp in spans:
if not current:
current.append(sp)
elif (
round(sp["size"], 1) == round(current[0]["size"], 1)
and sp["font"] == current[0]["font"]
):
current.append(sp)
else:
groups.append(current)
current = [sp]
if current:
groups.append(current)
for group in groups:
text = _clean_pua("".join(s["text"] for s in group).strip())
if not text:
continue
first = group[0]
bbox = (
min(s["bbox"][0] for s in group),
min(s["bbox"][1] for s in group),
max(s["bbox"][2] for s in group),
max(s["bbox"][3] for s in group),
)
y0 = bbox[1]
space_before = max(0.0, y0 - prev_y1)
is_bold = _is_bold_span(first)
font_size = round(first["size"], 2)
# Superscript (flags & 1) → ignore provvisorio
block_type = "ignore" if (first["flags"] & 1) else "paragraph"
block = Block(
text=text,
page=page_num,
bbox=bbox,
font_size=font_size,
font_name=first["font"],
is_bold=is_bold,
block_type=block_type,
space_before=space_before,
origin_spans=group,
)
blocks.append(block)
prev_y1 = bbox[3]
return blocks
def extract_raw_data(pdf_path: Path) -> tuple[list[Block], dict]:
"""
Apre il PDF con PyMuPDF ed estrae tutti i Block + metadati documento.
Ritorna:
blocks — lista di Block ordinati per pagina (poi per y0/x0 in stage2)
doc_meta — dict con: toc, page_count, page_dimensions, title, author, year
"""
doc = fitz.open(str(pdf_path))
toc = doc.get_toc() # [(level, title, page), ...]
page_count = len(doc)
page_dimensions = [(p.rect.width, p.rect.height) for p in doc]
raw_meta = doc.metadata or {}
import re
year = ""
creation = raw_meta.get("creationDate", "")
m = re.match(r"D:(\d{4})", creation)
if m:
year = m.group(1)
doc_meta = {
"toc": toc,
"page_count": page_count,
"page_dimensions": page_dimensions,
"title": (raw_meta.get("title") or "").strip(),
"author": (raw_meta.get("author") or "").strip(),
"year": year,
}
all_blocks: list[Block] = []
for page_num, page in enumerate(doc, start=1):
page_blocks = _extract_page_blocks(page, page_num)
all_blocks.extend(page_blocks)
doc.close()
return all_blocks, doc_meta
def extract_raw_data_with_pdfplumber_fallback(pdf_path: Path) -> tuple[list[Block], dict]:
"""
Estrae i Block con PyMuPDF; per le pagine dove il testo è < 100 caratteri
(ma la pagina non è blank), usa pdfplumber come fallback e aggiunge un
Block "paragraph" sintetico con il testo alternativo.
La funzione `extract_raw_data` originale rimane invariata.
"""
all_blocks, doc_meta = extract_raw_data(pdf_path)
# Raggruppa i blocchi per pagina per misurare quante parole ci sono
from collections import defaultdict
blocks_by_page: dict[int, list[Block]] = defaultdict(list)
for b in all_blocks:
blocks_by_page[b.page].append(b)
page_count = doc_meta["page_count"]
sparse_pages = []
for page_num in range(1, page_count + 1):
page_blocks = blocks_by_page.get(page_num, [])
total_chars = sum(len(b.text) for b in page_blocks if b.block_type != "ignore")
if total_chars < 100:
sparse_pages.append(page_num)
if not sparse_pages:
return all_blocks, doc_meta
try:
import pdfplumber
except ImportError:
return all_blocks, doc_meta
try:
with pdfplumber.open(str(pdf_path)) as pdf:
for page_num in sparse_pages:
page_idx = page_num - 1
if page_idx >= len(pdf.pages):
continue
page = pdf.pages[page_idx]
text = page.extract_text() or ""
text = text.strip()
if not text or len(text) < 20:
continue # pagina davvero vuota
# Costruisci un Block sintetico per il testo fallback
w = page.width or 612
h = page.height or 792
fallback_block = Block(
text=_clean_pua(text),
page=page_num,
bbox=(0.0, 0.0, float(w), float(h)),
font_size=10.0,
font_name="pdfplumber-fallback",
is_bold=False,
block_type="paragraph",
space_before=0.0,
origin_spans=[],
)
all_blocks.append(fallback_block)
except Exception:
pass # se pdfplumber fallisce, usa i block di PyMuPDF già presenti
# Riordina per pagina (i fallback sono stati appesi in coda)
all_blocks.sort(key=lambda b: (b.page, b.bbox[1], b.bbox[0]))
return all_blocks, doc_meta
+184
View File
@@ -0,0 +1,184 @@
"""Stage 2: analisi layout — reading order, multi-colonna, merge header multi-riga."""
from collections import Counter
from .models import Block
_RECURRING_MIN_OCCURRENCES = 3
_RECURRING_MAX_LEN = 100
_RECURRING_PAGE_RATIO = 0.05 # soglia minima: ≥5% delle pagine del documento
def _mark_recurring_lines(blocks: list[Block]) -> list[Block]:
"""
Segna come 'ignore' i blocchi con testo breve che compaiono molte volte
nel documento tipicamente header/footer di pagina ripetuti.
La soglia scala con la lunghezza del documento: max(3, page_count * 5%)
per evitare di marcare come ricorrenti titoli di sezione che appaiono
poche volte in documenti lunghi con struttura a parti (es. I/II/III).
"""
if not blocks:
return blocks
page_count = max(b.page for b in blocks)
threshold = max(_RECURRING_MIN_OCCURRENCES, int(page_count * _RECURRING_PAGE_RATIO))
counts = Counter(
b.text.strip()
for b in blocks
if 3 < len(b.text.strip()) < _RECURRING_MAX_LEN
)
recurring = {t for t, n in counts.items() if n >= threshold}
if not recurring:
return blocks
for b in blocks:
if b.text.strip() in recurring:
b.block_type = "ignore"
return blocks
_COLUMN_GAP_RATIO = 0.15 # gap orizzontale minimo per rilevare colonne (% page_width)
_COLUMN_THRESHOLD = 0.40 # % blocchi per lato per dichiarare layout multi-colonna
_MULTILINE_X_TOL = 5.0 # tolleranza px per allineamento x0 di righe consecutive (testo a sx)
_MULTILINE_CX_TOL = 20.0 # tolleranza px per allineamento centro di righe centrate
def _detect_columns(blocks: list[Block], page_width: float) -> int:
"""Ritorna 1 (singola colonna) o 2 (doppia colonna)."""
if not blocks or page_width <= 0:
return 1
mid = page_width * 0.5
left = sum(1 for b in blocks if b.x0 < mid)
right = sum(1 for b in blocks if b.x0 >= mid)
total = left + right
if total == 0:
return 1
if (left / total >= _COLUMN_THRESHOLD) and (right / total >= _COLUMN_THRESHOLD):
return 2
return 1
def _reorder_two_columns(blocks: list[Block], page_width: float) -> list[Block]:
"""Riordina blocchi in layout a due colonne: prima col. sinistra, poi destra."""
mid = page_width * 0.5
left = sorted([b for b in blocks if b.x0 < mid], key=lambda b: b.y0)
right = sorted([b for b in blocks if b.x0 >= mid], key=lambda b: b.y0)
return left + right
def _merge_multiline_headers(blocks: list[Block]) -> list[Block]:
"""
Unifica coppie di block consecutivi che formano un header multi-riga:
stesso font_size, stesso x0 (±5px), gap verticale < 1.5×font_size.
"""
if not blocks:
return blocks
result: list[Block] = []
i = 0
while i < len(blocks):
cur = blocks[i]
if i + 1 < len(blocks):
nxt = blocks[i + 1]
same_size = round(cur.font_size, 1) == round(nxt.font_size, 1)
same_page = cur.page == nxt.page
same_x = abs(cur.x0 - nxt.x0) <= _MULTILINE_X_TOL
# Titoli centrati: larghezze diverse → x0 diversi; verifica il centro invece
cur_cx = (cur.x0 + cur.x1) / 2
nxt_cx = (nxt.x0 + nxt.x1) / 2
same_cx = abs(cur_cx - nxt_cx) <= _MULTILINE_CX_TOL
aligned = same_x or same_cx
gap = nxt.y0 - cur.y1
# gap >= -3pt: le bbox di righe consecutive possono sovrapporsi leggermente
# per font a tight-leading; -3pt esclude cross-column merge (gap ≈ -800pt)
small_gap = -3 <= gap < 1.5 * cur.font_size
both_short = len(cur.text) < 120 and len(nxt.text) < 120
# Non fondere blocco corpo testuale con titolo: il testo di corpo termina
# con ! o ? e contiene minuscole (fine frase), mentre il titolo è ALLCAPS/breve.
cur_stripped = cur.text.strip()
body_sentence_end = (
cur_stripped.endswith(("!", "?"))
and any(c.islower() for c in cur_stripped)
)
if same_size and same_page and aligned and small_gap and both_short and not body_sentence_end:
merged = Block(
text=cur.text + " " + nxt.text,
page=cur.page,
bbox=(cur.x0, cur.y0, max(cur.x1, nxt.x1), nxt.y1),
font_size=cur.font_size,
font_name=cur.font_name,
is_bold=cur.is_bold or nxt.is_bold,
block_type=cur.block_type,
space_before=cur.space_before,
origin_spans=cur.origin_spans + nxt.origin_spans,
)
result.append(merged)
i += 2
continue
result.append(cur)
i += 1
return result
def _recompute_space_before(blocks: list[Block]) -> list[Block]:
"""Ricalcola space_before dopo eventuali riordinamenti.
Salto di pagina: usa b.y0 come stima del gap dalla cima della nuova pagina
(minimo 50pt) in modo che il primo blocco di ogni pagina ottenga il space_signal
anche quando si trova subito dopo un page break (coordinate y azzerano tra pagine).
"""
for i, b in enumerate(blocks):
if i == 0:
b.space_before = 0.0
elif b.page != blocks[i - 1].page:
b.space_before = max(b.y0, 50.0)
else:
b.space_before = max(0.0, b.y0 - blocks[i - 1].y1)
return blocks
def analyze_layout(raw_blocks: list[Block], doc_meta: dict) -> list[Block]:
"""
Organizza i Block estratti in Stage 1 in reading order corretto.
1. Raggruppa per pagina.
2. Rileva layout multi-colonna riordina.
3. Ordina ogni pagina per (y0, x0).
4. Merge header multi-riga.
5. Ricalcola space_before.
"""
if not raw_blocks:
return []
page_dimensions = doc_meta.get("page_dimensions", [])
# Raggruppa per pagina
pages: dict[int, list[Block]] = {}
for b in raw_blocks:
pages.setdefault(b.page, []).append(b)
ordered: list[Block] = []
for page_num in sorted(pages):
page_blocks = pages[page_num]
page_idx = page_num - 1
page_width = page_dimensions[page_idx][0] if page_idx < len(page_dimensions) else 595.0
# Ordina per (y0, x0) prima della rilevazione colonne
page_blocks.sort(key=lambda b: (b.y0, b.x0))
n_cols = _detect_columns(page_blocks, page_width)
if n_cols == 2:
page_blocks = _reorder_two_columns(page_blocks, page_width)
ordered.extend(page_blocks)
# Merge header multi-riga
ordered = _merge_multiline_headers(ordered)
# Ricalcola space_before
ordered = _recompute_space_before(ordered)
# Segna come ignore i blocchi ricorrenti (header/footer di capitolo)
ordered = _mark_recurring_lines(ordered)
return ordered
+53
View File
@@ -0,0 +1,53 @@
"""Stage 3: analisi font — rileva body size e cluster header per documento."""
from collections import Counter
from .models import Block, FontProfile
def build_font_profile(blocks: list[Block]) -> FontProfile:
"""
Determina body_size (mode dei font size) e costruisce cluster_map
per i livelli header (1=H1, 2=H2, 3=H3), inferiti dinamicamente.
"""
sizes = [
round(b.font_size, 1)
for b in blocks
if b.block_type != "ignore"
]
if not sizes:
return FontProfile(body_size=11.0, cluster_map={}, header_sizes=[])
counter = Counter(sizes)
total = sum(counter.values())
# Body size = font size più frequente
body_size = counter.most_common(1)[0][0]
# Candidati header: size > body + 1pt, frequenza < 30% del totale
raw_candidates = sorted(
{
s for s, c in counter.items()
if s > body_size + 1.0 and c / total < 0.30
},
reverse=True,
)
# Collassa cluster entro ±0.5pt
collapsed: list[float] = []
for s in raw_candidates:
if collapsed and abs(s - collapsed[-1]) <= 0.5:
continue # appartiene al cluster precedente (già più grande)
collapsed.append(s)
header_sizes = collapsed[:3] # max 3 livelli
# cluster_map: size arrotondato → livello (1=grande, 2=medio, 3=piccolo)
cluster_map: dict[float, int] = {}
for i, s in enumerate(header_sizes, start=1):
cluster_map[s] = i
return FontProfile(
body_size=body_size,
cluster_map=cluster_map,
header_sizes=header_sizes,
)
+162
View File
@@ -0,0 +1,162 @@
"""Stage 4: classificazione blocchi — rileva header candidate con segnali combinati."""
import re
from .models import Block, FontProfile
# Numerazione gerarchica con separatore esplicito: "1.", "1.2", "1.2.3" + MAIUSCOLA.
# Non usa \s come separatore per evitare "1 La divisione..." (note a pie' di pagina).
_NUMBERED_SECTION_RE = re.compile(r"^\d+(\.\d+)*[.)]\s*[A-ZÀ-Ÿ]")
_ARTICLE_RE = re.compile(r"^Art(?:icolo|\.)\s+\d+", re.IGNORECASE)
# "CAPITOLO QUARTO." / "CHAPTER FOUR" / "CANTO XII" — keyword strutturale ALLCAPS + ordinale/numero/romano.
# Solo maiuscolo: cattura sezioni dove il font è identico al corpo (PDF letterari/accademici)
# ma lascia intatti i riferimenti in sentence-case nel corpo del testo.
_CHAPTER_WORD_RE = re.compile(
r"^(?:CAPITOLO|CHAPTER|CANTO)\s+(?:[A-ZÀ-Ÿ][A-ZÀ-Ÿ]+|\d+|[IVXLCDM]+)\b"
)
# "Capitolo 1: TITOLO" / "Chapter 3 — ..." in sentence-case + bold.
# Cattura capitoli di PDF tecnici/didattici con body-size identico agli header.
_CHAPTER_WORD_BOLD_RE = re.compile(
r"^(?:Capitolo|Chapter)\s+\d+\b", re.IGNORECASE
)
_PURE_NUMBERS_RE = re.compile(r"^[\d\s\-\./,]+$") # solo numeri/punteggiatura, nessuna lettera
# Simbolo di sezione § seguito da numero o romano: "§ 1", "§ I.", "§ 12"
_SECTION_SYMBOL_RE = re.compile(r"\s*[\dIVXivx]")
# Dot-leader: tipici di TOC e liste figure (". . . . .")
_DOT_LEADER_RE = re.compile(r"(?:\.[ ]){3,}")
# Riferimento di pagina TOC: ", p. 42" (voce indice) — in qualsiasi posizione nel testo
# oppure multipli riferimenti pagina (liste TOC con più voci)
_TOC_PAGE_REF_RE = re.compile(r",?\s+p\.\s+\d+")
# Numerale romano minuscolo standalone: page number preliminari (i, ii, vii, xii…)
_ROMAN_PAGE_RE = re.compile(r"^x{0,3}(?:ix|iv|v?i{0,3})$")
_SHORT_LINE_THRESHOLD = 80 # caratteri
_HEADER_SCORE_THRESHOLD = 3 # punteggio minimo per diventare header_candidate
def _score_block(block: Block, body_size: float) -> int:
score = 0
text = block.text.strip()
# size_signal: font_size significativamente più grande del corpo
if block.font_size >= body_size + 1.5:
score += 2
# bold_signal: bold E font_size almeno pari al corpo.
# Usa round() per evitare falsi positivi da rumore floating point del PDF
# (es. 11.52 vs body_size 11.5 → stesso cluster, non un vero header).
if block.is_bold and round(block.font_size, 1) > round(body_size, 1):
score += 1
# number_signal: numerazione gerarchica SOLO se font > corpo + 0.5pt.
# Evita che paragrafi numerati a font-corpo (es. "1. Lo spazio non è…")
# vengano promossi ad header per il solo fatto di iniziare con un numero.
if _NUMBERED_SECTION_RE.match(text) and block.font_size > body_size + 0.5:
score += 2
# section_symbol_signal: simbolo § (tipico di trattati filosofici/giuridici).
# Threshold body-2.5pt: cattura § a font ridotto (varianti editoriali del PDF)
# ma esclude annotazioni marginali a 8.2pt (§9, §10 come running notes).
if _SECTION_SYMBOL_RE.match(text) and block.font_size >= body_size - 2.5:
score += 2
# allcaps_signal: testo interamente maiuscolo con font ≥ corpo → titolo di parte/capitolo.
# Threshold abbassata a >= body_size: cattura sezioni ALLCAPS nei PDF letterari
# dove il font del titolo è identico al corpo.
# Escluso se bold: bold+ALLCAPS a body_size indica enfasi nel testo (intestazioni di cella,
# etichette), non un titolo di sezione strutturale.
alpha = re.sub(r"[^a-zA-ZÀ-ÿ]", "", text)
if (alpha and alpha == alpha.upper() and len(alpha) > 3
and block.font_size >= body_size and not block.is_bold):
score += 1
# length_signal: riga breve (i titoli sono concisi)
if len(text) < _SHORT_LINE_THRESHOLD:
score += 1
# space_signal: spazio verticale prima del blocco > 1.5× dimensione font
if block.space_before > 1.5 * block.font_size:
score += 1
return score
def classify_blocks(blocks: list[Block], profile: FontProfile) -> list[Block]:
"""
Assegna block_type ad ogni Block in base a segnali combinati.
Guardie aggiuntive che impediscono la promozione a header_candidate:
- testo puramente numerico (numeri di pagina, intervalli TOC)
- testo che inizia con `|` (footer/intestazioni di capitolo stile tabella)
- testo troppo corto (< 2 caratteri)
"""
body_size = profile.body_size
for block in blocks:
# Non toccare classificazioni precedenti protette
if block.block_type in ("table", "ignore"):
continue
text = block.text.strip()
if not text or len(text) < 2:
block.block_type = "ignore"
continue
# Guard legale: articoli di codice → sempre header candidate
if _ARTICLE_RE.match(text):
block.block_type = "header_candidate"
continue
# Guard letterario ALLCAPS: keyword strutturale + ordinale/numero/romano → sempre header candidate.
if _CHAPTER_WORD_RE.match(text) and len(text) < _SHORT_LINE_THRESHOLD:
block.block_type = "header_candidate"
continue
# Guard letterario bold: "Capitolo 1: TITOLO" bold anche al body-size → header candidate.
if block.is_bold and _CHAPTER_WORD_BOLD_RE.match(text) and len(text) < _SHORT_LINE_THRESHOLD:
block.block_type = "header_candidate"
continue
# Guard: testo puramente numerico → numero di pagina standalone, da ignorare
if _PURE_NUMBERS_RE.match(text):
block.block_type = "ignore"
continue
# Guard: numerale romano minuscolo standalone → page number preliminare (vii, xii…)
if _ROMAN_PAGE_RE.match(text) and len(text) >= 2:
block.block_type = "ignore"
continue
# Guard: dot-leader → riga TOC o lista figure, non testo del documento
if _DOT_LEADER_RE.search(text):
block.block_type = "ignore"
continue
# Guard: testo che inizia con pipe → footer/intestazione di capitolo o frammento tabella
if text.startswith("|"):
block.block_type = "ignore"
continue
# Guard: voce di indice con riferimento pagina → "§ 9. Titolo, p. 90."
if _TOC_PAGE_REF_RE.search(text):
block.block_type = "ignore"
continue
score = _score_block(block, body_size)
if score >= _HEADER_SCORE_THRESHOLD:
# Guard: header candidate deve iniziare con lettera maiuscola (dopo eventuali numeri/simboli).
# Filtra frammenti LaTeX come "1 segue", "1 allora", "2) prodotto" che hanno
# font grande ma non sono titoli di sezione.
stripped_nums = re.sub(r"^[§\d\s\.\)\(\-]+", "", text)
if stripped_nums and stripped_nums[0].islower():
block.block_type = "paragraph"
else:
block.block_type = "header_candidate"
else:
# Rilevamento liste: riga che inizia con bullet o numero seguito da punto
stripped = text.lstrip()
if stripped.startswith(("- ", "* ", "", "· ")) or re.match(r"^\d+\.\s", stripped):
block.block_type = "list_item"
else:
block.block_type = "paragraph"
return blocks
+147
View File
@@ -0,0 +1,147 @@
"""Stage 5: inferenza gerarchia — assegna livello (1-3) agli header candidate."""
import re
import unicodedata
from .models import Block, FontProfile
_NUMBERED_RE = re.compile(r"^(\d+(?:\.\d+)*)[.)\s]\s*[A-ZÀ-Ÿ]")
_MIN_NUMBERED_FOR_RULE1 = 3 # soglia per attivare Regola 1
# "Capitolo 3 Titolo" / "Chapter 5 Titolo": sezioni numerate con la parola
# "Capitolo/Chapter" + numero intero (in senso-maiuscolo, tipicamente bold body-size).
# Se ≥3 blocchi corrispondono, vengono promossi a livello 2 come sezioni primarie.
_CHAPTER_NUM_BOLD_RE = re.compile(r"^(?:Capitolo|Chapter)\s+\d+\b", re.IGNORECASE)
_MIN_CHAPTER_NUM_FOR_PROMOTION = 3
def _normalize_title(text: str) -> str:
"""Normalizza un titolo per il confronto fuzzy con il TOC."""
text = unicodedata.normalize("NFKC", text)
text = text.lower().strip()
text = re.sub(r"[^\w\s]", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _fuzzy_match(title: str, toc_map: dict[str, int], threshold: float = 0.75) -> int:
"""
Cerca il livello TOC per un titolo con confronto fuzzy.
Ritorna il livello trovato, o 0 se nessun match.
"""
norm = _normalize_title(title)
if not norm:
return 0
# Match esatto
if norm in toc_map:
return toc_map[norm]
# Match parziale: confronta le prime parole (fino a 8)
norm_words = norm.split()[:8]
norm_prefix = " ".join(norm_words)
best_score = 0.0
best_level = 0
for toc_norm, level in toc_map.items():
toc_words = toc_norm.split()[:8]
toc_prefix = " ".join(toc_words)
# Calcola sovrapposizione su caratteri del prefisso più corto
shorter = min(len(norm_prefix), len(toc_prefix))
if shorter == 0:
continue
matches = sum(
1 for a, b in zip(norm_prefix, toc_prefix) if a == b
)
score = matches / shorter
if score > best_score:
best_score = score
best_level = level
return best_level if best_score >= threshold else 0
def _level_from_numbering(text: str) -> int:
"""Inferisce il livello dall'numerazione gerarchica: "1." → 1, "1.2" → 2, ecc."""
m = _NUMBERED_RE.match(text.strip())
if not m:
return 0
dots = m.group(1).count(".")
return min(dots + 1, 3)
def _level_from_font(font_size: float, cluster_map: dict[float, int]) -> int:
"""Cerca il livello più vicino nel cluster_map in base alla font_size."""
if not cluster_map:
return 2 # fallback: tutti H2
rounded = round(font_size, 1)
if rounded in cluster_map:
return cluster_map[rounded]
# Cerca il cluster più vicino
best = min(cluster_map.keys(), key=lambda s: abs(s - rounded))
return cluster_map[best]
def infer_hierarchy(
blocks: list[Block],
profile: FontProfile,
toc: list,
) -> list[Block]:
"""
Assegna block.level ad ogni header_candidate secondo la priorità:
Regola 1: numerazione gerarchica (3 candidati numerati)
Regola 2: allineamento TOC (se TOC non vuoto)
Regola 3: font size clustering (fallback)
"""
candidates = [b for b in blocks if b.block_type == "header_candidate"]
if not candidates:
return blocks
# ── Regola 1: numerazione ──────────────────────────────────────────────────
numbered = [b for b in candidates if _NUMBERED_RE.match(b.text.strip())]
use_numbering = len(numbered) >= _MIN_NUMBERED_FOR_RULE1
# ── Regola 2: costruisci mappa TOC ────────────────────────────────────────
toc_map: dict[str, int] = {}
for entry in toc:
if len(entry) >= 3:
level, title, _ = entry[0], entry[1], entry[2]
norm = _normalize_title(str(title))
if norm:
toc_map[norm] = min(int(level), 3)
use_toc = bool(toc_map)
# ── Assegna livelli ───────────────────────────────────────────────────────
for block in candidates:
text = block.text.strip()
level = 0
if use_numbering and _NUMBERED_RE.match(text):
level = _level_from_numbering(text)
elif use_numbering:
# Documento numerato ma questo candidato non ha numero →
# usa font size come hint secondario, poi fallback a 2
level = _level_from_font(block.font_size, profile.cluster_map) or 2
elif use_toc:
level = _fuzzy_match(text, toc_map)
if level == 0:
level = _level_from_font(block.font_size, profile.cluster_map) or 2
else:
level = _level_from_font(block.font_size, profile.cluster_map) or 2
block.level = max(1, min(level, 3))
# ── Post-correzione: "Capitolo/Chapter N" bold → sezioni primarie (L2) ────
# Quando il documento usa "Capitolo N" bold a body-size (senza font distinto
# per i titoli), il font clustering assegna L3 perché la dimensione è sotto
# tutti i cluster. Con ≥3 capitoli numerati, li promuoviamo a L2.
if not use_toc and not use_numbering:
chapter_bold = [
b for b in candidates
if b.is_bold and _CHAPTER_NUM_BOLD_RE.match(b.text.strip()) and b.level > 2
]
if len(chapter_bold) >= _MIN_CHAPTER_NUM_FOR_PROMOTION:
for b in chapter_bold:
b.level = 2
return blocks
+54
View File
@@ -0,0 +1,54 @@
"""Stage 6: ricostruzione albero documentale — Section con parent-child stack-based."""
from .models import Block, Section
def build_tree(blocks: list[Block]) -> list[Section]:
"""
Costruisce l'albero di Section dalla lista ordinata di Block.
Algoritmo stack-based:
- header_candidate nuova Section; pop stack finché livello >= nuovo livello.
- Altri block aggiunti al content della Section in cima allo stack.
- Testo prima del primo header sezione implicita (title="", level=0).
"""
roots: list[Section] = []
stack: list[Section] = [] # sezioni aperte, ordinate per livello crescente
def _current() -> Section | None:
return stack[-1] if stack else None
def _push(section: Section) -> None:
"""Inserisce la nuova sezione nell'albero rispettando la gerarchia."""
# Pop sezioni con livello >= al nuovo (nuovo header chiude i predecessori allo stesso livello)
while stack and stack[-1].level >= section.level:
stack.pop()
if stack:
stack[-1].children.append(section)
else:
roots.append(section)
stack.append(section)
for block in blocks:
if block.block_type == "header_candidate" and block.level > 0:
new_section = Section(
title=block.text.strip(),
level=block.level,
page_start=block.page,
source_block=block,
)
_push(new_section)
elif block.block_type == "ignore":
continue
else:
cur = _current()
if cur is None:
# Testo prima del primo header → sezione implicita
implicit = Section(title="", level=0, page_start=block.page)
roots.append(implicit)
stack.append(implicit)
cur = implicit
cur.content.append(block)
return roots
+224
View File
@@ -0,0 +1,224 @@
"""Stage 7: serializzazione del document tree in Markdown valido."""
import re
from pathlib import Path
from .models import Block, Section
# Pulisce artefatti finali nei titoli: " | 30", " |", " | "
# (pipe con eventuale numero di pagina — tipici footer di capitolo nei PDF)
_TITLE_TRAIL_RE = re.compile(r"\s*\|\s*\d*\s*$")
# Sezioni preliminari da omettere interamente dall'output Markdown
# (TOC, lista figure, lista tabelle — non sono contenuto RAG-utile)
_SKIP_SECTION_TITLES = {
"indice", "indice generale", "indice analitico",
"table of contents", "contents",
"elenco delle figure", "lista delle figure", "list of figures",
"elenco delle tabelle", "lista delle tabelle", "list of tables",
"sommario",
}
_LIST_RE = re.compile(r"^(?:[-*•·]\s|\d+\.\s)")
def _split_long_title(title: str) -> tuple[str, str]:
"""
Divide un titolo multi-frase in (titolo_breve, corpo_extra).
Cerca il primo confine di frase ('. ' seguito da maiuscola) dopo il
carattere 15, per non spezzare abbreviazioni brevi all'inizio del titolo.
Ritorna (title, '') se non c'è divisione sensata o il titolo è corto.
"""
if len(title) <= 120:
return title, ''
for i in range(15, len(title) - 2):
if title[i] == '.' and title[i + 1] == ' ' and title[i + 2].isupper():
return title[:i + 1].strip(), title[i + 2:].strip()
return title, ''
def _serialize_block(block: Block, pdf_path: Path | None = None) -> str:
"""Serializza un singolo Block in testo Markdown."""
if block.block_type == "ignore":
return ""
text = block.text.strip()
if not text:
return ""
if block.block_type == "table":
return _serialize_table(block, pdf_path)
if block.block_type == "list_item":
return text # già formattato con bullet/numero
return text # paragraph
def _serialize_table(block: Block, pdf_path: Path | None = None) -> str:
"""
Tenta di estrarre la tabella con pdfplumber; fallback a testo raw.
"""
if pdf_path is not None and block.origin_spans:
try:
import pdfplumber
with pdfplumber.open(str(pdf_path)) as pdf:
page_idx = block.page - 1
if 0 <= page_idx < len(pdf.pages):
page = pdf.pages[page_idx]
x0, y0, x1, y1 = block.bbox
cropped = page.crop((x0 - 2, y0 - 2, x1 + 2, y1 + 2))
table = cropped.extract_table()
if table:
return _table_to_markdown(table)
except Exception:
pass
# Fallback: testo grezzo
return block.text.strip()
def _table_to_markdown(table: list[list[str | None]]) -> str:
"""Converte una tabella pdfplumber in Markdown GFM."""
if not table:
return ""
def _cell(c: str | None) -> str:
return (c or "").replace("\n", " ").strip()
rows = [[_cell(c) for c in row] for row in table]
# Normalizza larghezza colonne
n_cols = max(len(r) for r in rows)
rows = [r + [""] * (n_cols - len(r)) for r in rows]
header = rows[0]
sep = ["---"] * n_cols
body = rows[1:]
lines = [
"| " + " | ".join(header) + " |",
"| " + " | ".join(sep) + " |",
]
for row in body:
lines.append("| " + " | ".join(row) + " |")
return "\n".join(lines)
def _is_para_break(block: Block) -> bool:
"""
Restituisce True se il block inizia un nuovo paragrafo logico.
Soglia: gap verticale > 1× font_size ( una riga intera di margine).
All'interno di un paragrafo il gap è ≈ 0-4pt; tra paragrafi è ≥ font_size.
"""
return block.space_before > block.font_size
def _serialize_section(section: Section, pdf_path: Path | None = None) -> list[str]:
"""Traversal DFS in-order: header → content → children."""
# Salta sezioni preliminari non utili per RAG (TOC, lista figure, ecc.)
# I FIGLI vengono comunque serializzati: se la TOC è genitore errato dei capitoli
# reali (gerarchia piatta nel PDF), i capitoli appaiono ugualmente nel Markdown.
if section.title.strip().lower() in _SKIP_SECTION_TITLES:
parts: list[str] = []
for child in section.children:
parts.extend(_serialize_section(child, pdf_path))
return parts
parts: list[str] = []
# Header (livello 0 = sezione implicita pre-primo-header → no #)
extra_body: str = ''
if section.level > 0:
title = _TITLE_TRAIL_RE.sub("", section.title).strip()
if not title:
pass # titolo vuoto: nessun header, ma il contenuto viene comunque emesso
else:
title, extra_body = _split_long_title(title)
hashes = "#" * section.level
parts.append(f"{hashes} {title}")
parts.append("")
# Content: accumula righe di paragrafo consecutive in un unico blocco di testo
pending: list[str] = [] # pezzi del paragrafo corrente
if extra_body:
pending.append(extra_body)
def _flush() -> None:
if not pending:
return
# Unisci i pezzi riparando la sillabazione inter-riga:
# "de-" + "stino" → "destino" (trattino finale + inizio minuscolo)
joined = pending[0]
for part in pending[1:]:
if joined.endswith("-") and part and part[0].islower():
joined = joined[:-1] + part
else:
joined = joined + " " + part
parts.append(joined)
parts.append("")
pending.clear()
for block in section.content:
text = _serialize_block(block, pdf_path)
if not text:
continue
if block.block_type == "list_item":
_flush()
parts.append(text)
elif block.block_type == "table":
_flush()
parts.append(text)
parts.append("")
else:
# Blocco paragrafo: unisci con il precedente oppure inizia nuovo paragrafo
if pending and _is_para_break(block):
_flush()
pending.append(text)
_flush()
# Figli
for child in section.children:
parts.extend(_serialize_section(child, pdf_path))
return parts
def serialize_tree(
roots: list[Section],
meta: dict,
pdf_path: Path | None = None,
include_frontmatter: bool = False,
) -> str:
"""
Serializza la lista di Section radice in un documento Markdown.
include_frontmatter: se True, inserisce blocco YAML con metadati.
Nota: il frontmatter viene aggiunto dal runner, non qui, per mantenere
raw.md privo di metadata soggetti a variazione.
"""
parts: list[str] = []
if include_frontmatter and meta:
fm_lines = ["---", f"source: {meta.get('source', '')}"]
if meta.get("title"):
fm_lines.append(f'title: "{meta["title"]}"')
if meta.get("author"):
fm_lines.append(f'author: "{meta["author"]}"')
if meta.get("year"):
fm_lines.append(f"year: {meta['year']}")
if meta.get("pages"):
fm_lines.append(f"pages: {meta['pages']}")
fm_lines += ["---", ""]
parts.extend(fm_lines)
for root in roots:
root_parts = _serialize_section(root, pdf_path)
parts.extend(root_parts)
# Normalizza righe vuote consecutive (max 2)
text = "\n".join(parts)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip() + "\n"
+337
View File
@@ -0,0 +1,337 @@
"""Stage 8: normalizzazione gerarchia Markdown — ripara salti livello, header vuoti, duplicati."""
import re
import unicodedata
_HEADER_RE = re.compile(r"^(#{1,6})\s+(.+)$")
# Conversione encoding LaTeX accenti italiani estratti da PDF TeX-compilati
# backtick + vocale → accento grave; ´ + vocale → accento acuto
_GRAVE = {'a': 'à', 'e': 'è', 'i': 'ì', 'o': 'ò', 'u': 'ù', 'ı': 'ì',
'A': 'À', 'E': 'È', 'I': 'Ì', 'O': 'Ò', 'U': 'Ù'}
_ACUTE = {'a': 'á', 'e': 'é', 'i': 'í', 'o': 'ó', 'u': 'ú',
'A': 'Á', 'E': 'É', 'I': 'Í', 'O': 'Ó', 'U': 'Ú'}
def _fix_latex_accents(text: str) -> str:
"""Converte encoding LaTeX degli accenti: \`e→è, ´e→é, ecc."""
text = re.sub(r'`([aeiouAEIOUı])', lambda m: _GRAVE.get(m.group(1), m.group(0)), text)
text = re.sub(r'´([aeiouAEIOU])', lambda m: _ACUTE.get(m.group(1), m.group(0)), text)
# Encoding font: "1'" → "l'" (glifo 'l' letto come cifra '1' prima di apostrofo)
text = re.sub(r"\b1'([a-zA-ZÀ-ÿ])", r"l'\1", text)
return text
# Sillabazione TeX/PDF: "evi- tare" → "evitare" (trattino-spazio tra due frammenti)
_HYPHEN_SPACE_RE = re.compile(r'([a-zà-ÿ])- ([a-zà-ÿ])')
# Bold markup dentro header: ## **Titolo** → ## Titolo
_HEADER_BOLD_RE = re.compile(r'^(#{1,6})\s+\*\*(.+?)\*\*\s*$', re.MULTILINE)
# Pattern header numerato senza punto: "### 5 Titolo" → "### 5. Titolo"
_HDR_NUM_NO_DOT_RE = re.compile(r'^(#{1,6})\s+(\d{1,3})\s+(.+)$')
# Figura/Tabella come header (caption di layout finito nei blocchi strutturali)
_FIGURE_CAPTION_RE = re.compile(
r'^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d', re.IGNORECASE
)
# Numerale romano usato come marcatore di sezione: I, II, IV, VII, XXIII, ecc.
_ROMAN_NUMERAL_RE = re.compile(r'^[IVXLCDM]+\.?$', re.IGNORECASE)
def _sentence_case(s: str) -> str:
if not s:
return s
low = s.lower()
return low[0].upper() + low[1:]
def _is_garbage_header(content: str) -> bool:
"""Rileva header privi di significato strutturale."""
stripped = content.strip()
# Simbolo § — marcatore di sezione valido anche se solo numerico/romano
if stripped.startswith("§"):
return False
if stripped.startswith("..."):
return True
# Testo che termina con parentesi aperta → testo troncato, non un titolo valido
if stripped.endswith("("):
return True
# Testo con caratteri PUA (Symbol/Wingdings font): formula o simbolo matematico
if re.search(r'[-]', stripped):
return True
# Testo che inizia con [ → notazione matematica/vettoriale
if stripped.startswith("["):
return True
# Header troppo breve (≤4 caratteri non-spazio) → formula, variabile o simbolo isolato
if len(stripped.replace(" ", "")) <= 4 and not _ROMAN_NUMERAL_RE.match(stripped):
return True
# Nessuna sequenza di ≥2 lettere → pura punteggiatura/numero
if not re.search(r'[A-Za-zÀ-ÿ]{2,}', stripped):
return True
# Header di 1-4 lettere (es. "(a)", "x") — ma non numerali romani di sezione
if re.fullmatch(r'\(?\s*[A-Za-z]{1,4}\s*\)?', stripped):
if not _ROMAN_NUMERAL_RE.match(stripped.strip("(). ")):
return True
# Equazione breve come header: "x = y", "f(x) ≤"
if re.match(r'^[A-Za-zÀ-ÿ_]{1,3}\s*[=<>≤≥]', stripped):
return True
# Caption di figura o tabella estratta come header
if _FIGURE_CAPTION_RE.match(stripped):
return True
# Header che inizia con lettera minuscola e testo lungo: frammento corpo
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
return False
def _header_level(line: str) -> int:
m = _HEADER_RE.match(line)
return len(m.group(1)) if m else 0
def _norm_title(text: str) -> str:
text = unicodedata.normalize("NFKC", text).lower().strip()
return re.sub(r"\s+", " ", text)
def normalize_hierarchy(text: str) -> tuple[str, dict]:
"""
Ripara il Markdown prodotto da Stage 7 in più passate:
Pass 0 Accenti LaTeX (encoding PDF TeX-compilati)
Pass 0.5 Sillabazione "word- word" (artefatto TeX/PDF)
Pass 1 Bold dentro header: ## **T** → ## T
Pass 1.5 Header spazzatura rimossi PRIMA del repair (caption figure, equazioni, simboli)
Questo evita che simboli chimici/matematici H1/H2 alterino il repair dei salti.
Pass 2 Salti di livello: # A → #### B diventa # A → ## B
Pass 3 Duplicati consecutivi: header identici adiacenti collassati
Pass 4 Header vuoti senza contenuto sezioni figlio rimossi
Pass 5 Running-header prefisso del successivo (es. "§ 4" prima di "§ 4. Titolo")
Pass 6 ALLCAPS sentence case (4 lettere tutte maiuscole)
Pass 7 Demote # → ## se il documento ha ≥5 header H1
Pass 8 Clamp H4+ H3; normalizza "### 5 Titolo" "### 5. Titolo"
Ritorna (testo_riparato, stats_dict).
"""
lines = text.split("\n")
stats = {
"n_level_jumps_repaired": 0,
"n_empty_headers_removed": 0,
"n_duplicate_headers_removed": 0,
"n_hyphenations_repaired": 0,
"n_bold_in_headers_removed": 0,
"n_allcaps_headers_normalized": 0,
"n_h1_demoted": 0,
"n_garbage_headers_removed": 0,
"n_headers_clamped": 0,
}
# ── Pass 0: correggi encoding accenti italiani LaTeX ──────────────────────
lines = [_fix_latex_accents(l) for l in lines]
# ── Pass 0.5: ripara sillabazione "word- word" nei paragrafi ──────────────
repaired_lines: list[str] = []
for line in lines:
if not _HEADER_RE.match(line):
new_line, n = _HYPHEN_SPACE_RE.subn(r'\1\2', line)
stats["n_hyphenations_repaired"] += n
repaired_lines.append(new_line)
else:
repaired_lines.append(line)
lines = repaired_lines
# ── Pass 1: rimuovi bold markup dentro header ─────────────────────────────
no_bold: list[str] = []
for line in lines:
new_line, n = _HEADER_BOLD_RE.subn(r'\1 \2', line)
stats["n_bold_in_headers_removed"] += n
no_bold.append(new_line)
lines = no_bold
# ── Pass 1.5: rimuovi header spazzatura PRIMA del repair ──────────────────
# I simboli chimici/matematici estratti a font grande (H1/H2) alterano il
# repair dei salti di livello se rimossi solo dopo. Rimuovendoli prima, i
# capitoli reali ricevono il livello corretto senza distorsioni.
no_garbage_pre: list[str] = []
for line in lines:
m = _HEADER_RE.match(line)
if m and _is_garbage_header(m.group(2)):
stats["n_garbage_headers_removed"] += 1
continue
no_garbage_pre.append(line)
lines = no_garbage_pre
# ── Pass 2: ripara salti di livello ───────────────────────────────────────
repaired: list[str] = []
last_level = 0
for line in lines:
m = _HEADER_RE.match(line)
if m:
hashes, title = m.group(1), m.group(2)
level = len(hashes)
if last_level > 0 and level > last_level + 1:
new_level = last_level + 1
line = "#" * new_level + " " + title
stats["n_level_jumps_repaired"] += 1
level = new_level
last_level = level
repaired.append(line)
# ── Pass 3: rimuovi duplicati consecutivi ─────────────────────────────────
no_dup: list[str] = []
last_header_norm: str | None = None
for line in repaired:
m = _HEADER_RE.match(line)
if m:
norm = _norm_title(m.group(2))
if norm == last_header_norm:
stats["n_duplicate_headers_removed"] += 1
continue
last_header_norm = norm
else:
if line.strip():
last_header_norm = None # reset su contenuto reale
no_dup.append(line)
# ── Pass 4: rimuovi header vuoti (nessun contenuto E nessuna sezione figlia) ──
no_empty: list[str] = []
i = 0
while i < len(no_dup):
line = no_dup[i]
m = _HEADER_RE.match(line)
if m:
cur_level = len(m.group(1))
j = i + 1
has_content = False
next_level: int | None = None
while j < len(no_dup):
ahead = no_dup[j]
m2 = _HEADER_RE.match(ahead)
if m2:
next_level = len(m2.group(1))
break
if ahead.strip():
has_content = True
break
j += 1
is_empty = not has_content and j < len(no_dup)
is_container = next_level is not None and next_level > cur_level
if is_empty and not is_container:
stats["n_empty_headers_removed"] += 1
i += 1
continue
no_empty.append(line)
i += 1
# ── Pass 5: rimuovi running-header prefisso del successivo ────────────────
# Es. "§ 4" immediatamente seguito (≤3 righe di contenuto) da "§ 4. Titolo reale".
no_prefix: list[str] = []
i = 0
while i < len(no_empty):
line = no_empty[i]
m = _HEADER_RE.match(line)
if m:
cur_norm = _norm_title(m.group(2))
if cur_norm:
j = i + 1
non_blank = 0
next_header_norm: str | None = None
while j < len(no_empty) and non_blank <= 3:
ahead = no_empty[j]
m2 = _HEADER_RE.match(ahead)
if m2:
next_header_norm = _norm_title(m2.group(2))
break
if ahead.strip():
non_blank += 1
j += 1
if (
next_header_norm is not None
and len(cur_norm) < len(next_header_norm)
and next_header_norm.startswith(cur_norm)
):
stats["n_duplicate_headers_removed"] += 1
i += 1
continue
no_prefix.append(line)
i += 1
lines = no_prefix
# ── Pass 6: ALLCAPS → sentence case ───────────────────────────────────────
# Solo header con ≥4 lettere tutte maiuscole; preserva prefissi numerici/simbolici.
normalized: list[str] = []
for line in lines:
m = _HEADER_RE.match(line)
if m:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if len(letters) >= 4 and all(c.isupper() for c in letters):
# Preserva prefisso numerico/simbolico (§, numeri, punteggiatura)
prefix_m = re.match(r'^([§\d\s\.\)\(\-]+\s+)', content)
if prefix_m:
prefix = prefix_m.group(1)
rest = content[len(prefix):]
if rest:
line = f"{hashes} {prefix}{_sentence_case(rest)}"
else:
line = f"{hashes} {_sentence_case(content)}"
stats["n_allcaps_headers_normalized"] += 1
normalized.append(line)
lines = normalized
# ── Pass 7: demote # → ## se il documento ha ≥5 header H1 ───────────────
# Documenti con H1 come sezione principale (non come titolo unico) producono
# una gerarchia piatta ## → ### senza livello intermedio.
# Quando si abbassa di un livello, il cascade è totale: H1→H2, H2→H3, H3→H3
# (clamp: non si scende sotto H3). Questo preserva la gerarchia relativa.
h1_count = sum(1 for l in lines if re.match(r'^# [A-Za-zÀ-ÿ§\d]', l))
if h1_count >= 5:
demoted: list[str] = []
for line in lines:
m = _HEADER_RE.match(line)
if m:
level = len(m.group(1))
if level == 1:
line = f"## {m.group(2)}"
stats["n_h1_demoted"] += 1
elif level == 2:
line = f"### {m.group(2)}"
stats["n_h1_demoted"] += 1
# level 3 resta a 3 (clamp)
demoted.append(line)
lines = demoted
clamped: list[str] = []
for line in lines:
m = _HEADER_RE.match(line)
if m:
level = len(m.group(1))
content = m.group(2)
if level > 3:
line = f"### {content}"
stats["n_headers_clamped"] += 1
else:
# "### 5 Titolo" → "### 5. Titolo" (numerazione senza punto separatore)
nm = _HDR_NUM_NO_DOT_RE.match(line)
if nm and len(nm.group(1)) == 3:
line = f"{nm.group(1)} {nm.group(2)}. {nm.group(3)}"
clamped.append(line)
lines = clamped
result = "\n".join(lines)
result = re.sub(r"\n{3,}", "\n\n", result)
return result, stats
+97
View File
@@ -0,0 +1,97 @@
"""Stage 9: validazione strutturale del Markdown finale."""
import re
from dataclasses import dataclass, field
_HEADER_RE = re.compile(r"^(#{1,6})\s+(.+)$")
_TABLE_ROW_RE = re.compile(r"^\|.+\|$")
@dataclass
class ValidationResult:
is_valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"valid": self.is_valid,
"errors": self.errors,
"warnings": self.warnings,
}
def validate_markdown(text: str, page_count: int = 0) -> ValidationResult:
"""
Valida l'integrità strutturale del Markdown.
Check 1: no salti di livello heading
Check 2: no sezioni vuote eccessive
Check 3: tabelle con colonne inconsistenti
Check 4: ordine heading ragionevole
"""
lines = text.split("\n")
errors: list[str] = []
warnings: list[str] = []
# ── Check 1: salti di livello ─────────────────────────────────────────────
last_level = 0
level_jumps = 0
for i, line in enumerate(lines, 1):
m = _HEADER_RE.match(line)
if m:
level = len(m.group(1))
if last_level > 0 and level > last_level + 1:
level_jumps += 1
last_level = level
if level_jumps > 0:
errors.append(f"Salti di livello heading non riparati: {level_jumps}")
# ── Check 2: sezioni vuote ────────────────────────────────────────────────
header_indices = [i for i, l in enumerate(lines) if _HEADER_RE.match(l)]
total_sections = len(header_indices)
empty_sections = 0
for idx in range(len(header_indices)):
start = header_indices[idx] + 1
end = header_indices[idx + 1] if idx + 1 < len(header_indices) else len(lines)
content_lines = [l for l in lines[start:end] if l.strip() and not _HEADER_RE.match(l)]
if not content_lines:
empty_sections += 1
if total_sections > 0:
empty_ratio = empty_sections / total_sections
if empty_ratio > 0.30:
errors.append(
f"Troppe sezioni vuote: {empty_sections}/{total_sections} "
f"({empty_ratio:.0%})"
)
elif empty_ratio > 0.10:
warnings.append(
f"Sezioni vuote: {empty_sections}/{total_sections} ({empty_ratio:.0%})"
)
# ── Check 3: colonne tabelle inconsistenti ────────────────────────────────
in_table = False
table_cols: int | None = None
inconsistent_tables = 0
for line in lines:
if _TABLE_ROW_RE.match(line.strip()):
cols = line.count("|") - 1
if not in_table:
in_table = True
table_cols = cols
elif table_cols is not None and cols != table_cols:
inconsistent_tables += 1
table_cols = None # non segnalare ulteriori righe della stessa tabella
else:
in_table = False
table_cols = None
if inconsistent_tables > 0:
warnings.append(f"Tabelle con colonne inconsistenti: {inconsistent_tables}")
# ── Check 4: struttura minima ─────────────────────────────────────────────
if total_sections == 0:
warnings.append("Nessun header rilevato — documento non strutturato")
is_valid = len(errors) == 0
return ValidationResult(is_valid=is_valid, errors=errors, warnings=warnings)
+141
View File
@@ -0,0 +1,141 @@
import re
from pathlib import Path
# ─── Rilevamento lingua ───────────────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
_FR_WORDS = frozenset([
"le", "les", "de", "du", "des", "et", "un", "une", "est", "que",
"pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au",
"ce", "se", "ou", "mais", "comme", "aussi",
])
_DE_WORDS = frozenset([
"der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist",
"auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden",
"bei", "nach", "oder", "wenn", "wird", "war",
])
_ES_WORDS = frozenset([
"el", "los", "las", "de", "en", "un", "una", "es", "que", "por",
"con", "del", "para", "como", "pero", "sus", "son", "los", "hay",
"todo", "esta", "este", "ser", "más", "ya",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
scores = {
"it": sum(1 for w in sample if w in _IT_WORDS),
"en": sum(1 for w in sample if w in _EN_WORDS),
"fr": sum(1 for w in sample if w in _FR_WORDS),
"de": sum(1 for w in sample if w in _DE_WORDS),
"es": sum(1 for w in sample if w in _ES_WORDS),
}
best = max(scores, key=scores.get)
return best if scores[best] > 0 else "unknown"
# ─── Analisi struttura ────────────────────────────────────────────────────────
def _count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
def _count_paragraphs(text: str) -> int:
blocks = re.split(r"\n{2,}", text)
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
def _split_sections(text: str, level: int) -> list[str]:
prefix = "#" * level + " "
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
return [p for p in parts[1:] if p.strip()]
def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]:
"""Restituisce lista di (header_line, body_text) per tutti gli header al livello dato."""
prefix = "#" * level + " "
lines = text.split("\n")
sections: list[tuple[str, str]] = []
cur_hdr: str | None = None
cur_body: list[str] = []
for line in lines:
if line.startswith(prefix):
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
cur_hdr = line
cur_body = []
elif cur_hdr is not None:
cur_body.append(line)
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
return sections
def analyze(md_path: Path) -> dict:
text = md_path.read_text(encoding="utf-8")
n_h1 = _count_headers(text, 1)
n_h2 = _count_headers(text, 2)
n_h3 = _count_headers(text, 3)
n_paragrafi = _count_paragraphs(text)
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
# Se h3 sono enormi e h2 più brevi, h2 è il boundary corretto
if n_h2 >= 3:
h2_bodies = _split_sections(text, 2)
avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0
avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0
if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = h2_bodies
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
elif n_paragrafi >= 3:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
else:
livello, boundary, strategia = 0, "nessuno", "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = _detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
+152
View File
@@ -0,0 +1,152 @@
import json
import sys
from pathlib import Path
_GRADES = [(90, "A"), (75, "B"), (60, "C"), (40, "D"), (0, "F")]
def _score(r: dict) -> tuple[int, list[str]]:
"""
Voto 0-100 sulla qualità del clean.md per vettorizzazione.
Penalità struttura:
livello 0 (assente) 40
livello 1 (piatto) 15
Penalità residui (degradano il retrieval):
backtick 2/cad (max 20)
dot-leader 5/cad (max 10)
URL/watermark 5/cad (max 15)
immagini 5/cad (max 10)
<br> inline 2/cad (max 15)
simboli encoding 1/cad (max 10)
formule inline [N.M] 1/cad (max 8)
footnote residui 1/cad (max 8)
caratteri PUA 2/cad (max 20)
Penalità anomalie:
bare headers 3/cad (max 15)
"""
score = 100
detail = []
structure = r.get("structure", {})
anomalie = r.get("anomalie", {})
residui = r.get("residui", {})
livello = structure.get("livello_struttura", 0)
if livello == 0:
score -= 40
detail.append("struttura assente 40")
elif livello == 1:
score -= 15
detail.append("struttura piatta 15")
def _pen(key: str, per_item: int, cap: int, label: str) -> None:
n = residui.get(key, 0)
if n:
p = min(cap, n * per_item)
nonlocal score
score -= p
detail.append(f"{label} ×{n} {p}")
_pen("backtick", 2, 20, "backtick")
_pen("dotleader", 5, 10, "dot-leader")
_pen("url", 5, 15, "url")
_pen("immagini", 5, 10, "immagini")
_pen("br_inline", 2, 15, "<br> inline")
_pen("simboli_encoding", 1, 10, "simboli encoding")
_pen("formule_inline", 1, 8, "formule inline")
_pen("footnote_markers", 1, 8, "footnote residui")
_pen("pua_markers", 2, 20, "caratteri PUA font Symbol")
_pen("formula_headers", 3, 15, "formula/esercizio come header")
n_bare = anomalie.get("bare_headers", 0)
if n_bare:
p = min(15, n_bare * 3)
score -= p
detail.append(f"bare headers ×{n_bare} {p}")
return max(0, score), detail
def _grade(score: int) -> str:
return next(g for threshold, g in _GRADES if score >= threshold)
def validate(stems: list[str], project_root: Path, detail: bool = False) -> None:
conv_dir = project_root / "conversione"
paths = (
[conv_dir / s / "report.json" for s in stems]
if stems
else sorted(conv_dir.glob("*/report.json"))
)
if not paths:
print("Nessun report.json trovato in conversione/*/")
sys.exit(0)
rows = [
json.loads(p.read_text(encoding="utf-8")) if p.exists()
else {"stem": p.parent.name, "_missing": True}
for p in paths
]
col = max(len(r.get("stem", "stem")) for r in rows) + 2
header = (
f"{'stem':<{col}}"
f"{'h2':>4}{'h3':>5} "
f"{'strategia':<18}"
f"{'bare':>5}{'corte':>6}{'lunghe':>7}"
f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}{'fhdr':>5}"
f"{'med':>6}"
f" {'voto':>4} grade"
)
sep = "" * len(header)
print(f"\n{header}\n{sep}")
scores = []
for r in rows:
if r.get("_missing"):
print(f"{r['stem']:<{col}} (report.json non trovato)")
continue
st = r.get("structure", {})
an = r.get("anomalie", {})
res = r.get("residui", {})
dist = r.get("distribution", {})
s, pen = _score(r)
scores.append(s)
print(
f"{r['stem']:<{col}}"
f"{st.get('n_h2', 0):>4}"
f"{st.get('n_h3', 0):>5} "
f"{st.get('strategia_chunking','?'):<18}"
f"{an.get('bare_headers', 0):>5}"
f"{an.get('short_sections', 0):>6}"
f"{an.get('long_sections', 0):>7}"
f"{res.get('backtick', 0):>5}"
f"{res.get('br_inline', 0):>4}"
f"{res.get('simboli_encoding', 0):>4}"
f"{res.get('url', 0):>4}"
f"{res.get('formula_headers', 0):>5}"
f"{dist.get('mediana', 0):>6}"
f" {s:>4} {_grade(s)}"
)
if detail and pen:
for p in pen:
print(f" {'':>{col}}{p}")
print(sep)
if scores:
media = sum(scores) / len(scores)
print(
f"Documenti: {len(scores)} "
f"Media: {media:.0f}/100 {_grade(int(media))} "
f"(A≥90 B≥75 C≥60 D≥40 F<40)"
)
print(
"\nColonne: bare=header vuoti corte=sez<150ch lunghe=sez>1500ch "
"btk=backtick br=<br>inline enc=simboli encoding fhdr=formula-header med=mediana chars\n"
)
+24 -6
View File
@@ -4,10 +4,30 @@ set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
mapfile -t dirs < <(find . -maxdepth 1 -mindepth 1 -type d | sort)
STEM="${1:-}"
if [[ -n "$STEM" ]]; then
# ── Modalità singolo stem ─────────────────────────────────────────────
target="./$STEM"
if [[ ! -d "$target" ]]; then
echo "Errore: cartella '$STEM' non trovata in conversione/."
exit 1
fi
rm -rf "$target"
echo "Rimossa: conversione/$STEM/"
exit 0
fi
# ── Modalità batch: tutti gli output (escluse cartelle infrastruttura) ────
mapfile -t dirs < <(
find . -maxdepth 1 -mindepth 1 -type d \
! -name '_*' \
! -name '__*' \
| sort
)
if [[ ${#dirs[@]} -eq 0 ]]; then
echo "Nessuna cartella da cancellare."
echo "Nessuna cartella di output da cancellare."
exit 0
fi
@@ -16,10 +36,8 @@ for d in "${dirs[@]}"; do
echo " $d"
done
if [[ "${1:-}" != "-f" ]]; then
read -r -p "Confermi? [s/N] " answer
[[ "$answer" =~ ^[sS]$ ]] || { echo "Annullato."; exit 0; }
fi
read -r -p "Confermi? [s/N] " answer
[[ "$answer" =~ ^[sS]$ ]] || { echo "Annullato."; exit 0; }
for d in "${dirs[@]}"; do
rm -rf "$d"
+2 -2
View File
@@ -1,4 +1,4 @@
pdfplumber==0.11.9
pymupdf4llm
opendataloader-pdf
PyMuPDF>=1.24.0
chromadb
pytest>=8.0
View File
+96
View File
@@ -0,0 +1,96 @@
"""Fixture condivise per l'intera test suite."""
import pytest
from conversione._pipeline.models import Block, Section
@pytest.fixture
def make_block():
"""Factory per Block di test con valori di default ragionevoli."""
def _make(
text="testo di prova",
page=1,
font_size=12.0,
font_name="Helvetica",
is_bold=False,
block_type="paragraph",
space_before=5.0,
bbox=(50.0, 100.0, 400.0, 114.0),
level=0,
):
return Block(
text=text,
page=page,
bbox=bbox,
font_size=font_size,
font_name=font_name,
is_bold=is_bold,
block_type=block_type,
space_before=space_before,
level=level,
)
return _make
@pytest.fixture
def mock_fitz_page():
"""Dizionario che simula l'output di page.get_text('dict') per una pagina."""
return {
"width": 595.0,
"height": 842.0,
"blocks": [
{
"type": 0,
"bbox": (50, 50, 450, 70),
"lines": [{
"bbox": (50, 50, 450, 70),
"spans": [{
"text": "1. Capitolo Primo",
"font": "Helvetica-Bold",
"size": 18.0,
"flags": 16,
"bbox": (50, 50, 450, 70),
"origin": (50, 68),
"color": 0,
}],
}],
},
{
"type": 0,
"bbox": (50, 90, 500, 104),
"lines": [{
"bbox": (50, 90, 500, 104),
"spans": [{
"text": "Testo del primo paragrafo del capitolo.",
"font": "Helvetica",
"size": 12.0,
"flags": 0,
"bbox": (50, 90, 500, 104),
"origin": (50, 102),
"color": 0,
}],
}],
},
],
}
@pytest.fixture
def simple_hierarchy_blocks(make_block):
"""Lista di Block con gerarchia semplice H1→H2→H3 numerata."""
return [
make_block("1. Introduzione", font_size=18, is_bold=True, space_before=20.0),
make_block("Testo del paragrafo di introduzione.", font_size=12),
make_block("1.1 Contesto", font_size=15, is_bold=True, space_before=15.0),
make_block("Testo della sezione di contesto.", font_size=12),
make_block("1.1.1 Dettaglio", font_size=13, is_bold=True, space_before=10.0),
make_block("Testo del dettaglio specifico.", font_size=12),
make_block("2. Conclusioni", font_size=18, is_bold=True, space_before=20.0),
make_block("Testo conclusivo.", font_size=12),
]
@pytest.fixture
def sources_dir():
from pathlib import Path
d = Path(__file__).parent.parent / "sources"
return d if d.exists() else None
View File
+68
View File
@@ -0,0 +1,68 @@
"""Test end-to-end: pipeline completa su PDF reali da sources/."""
import json
import shutil
import pytest
from pathlib import Path
from conversione._pipeline import run
PROJECT_ROOT = Path(__file__).parent.parent.parent
def _sources_available(stem: str) -> bool:
return (PROJECT_ROOT / "sources" / f"{stem}.pdf").exists()
@pytest.mark.skipif(not _sources_available("bitcoin"), reason="sources/bitcoin.pdf non disponibile")
def test_bitcoin_produces_clean_md(tmp_path, monkeypatch):
"""Pipeline completa su bitcoin.pdf — verifica output strutturato."""
# Usa tmp_path come output per non inquinare il repo
out_dir = tmp_path / "conversione" / "bitcoin"
out_dir.mkdir(parents=True)
sources_dir = tmp_path / "sources"
sources_dir.mkdir()
shutil.copy(PROJECT_ROOT / "sources" / "bitcoin.pdf", sources_dir / "bitcoin.pdf")
ok = run("bitcoin", tmp_path, force=True)
assert ok, "La pipeline deve completare senza errori"
clean_md = out_dir / "clean.md"
assert clean_md.exists(), "clean.md deve essere creato"
text = clean_md.read_text(encoding="utf-8")
assert len(text) > 1000, "clean.md deve avere contenuto significativo"
assert "#" in text, "clean.md deve avere almeno un header"
report = json.loads((out_dir / "report.json").read_text(encoding="utf-8"))
assert report["structure"]["livello_struttura"] >= 1, "Struttura deve avere almeno livello 1"
@pytest.mark.skipif(not _sources_available("bitcoin"), reason="sources/bitcoin.pdf non disponibile")
def test_determinism(tmp_path):
"""Due run consecutive sullo stesso PDF producono output identico."""
sources_dir = tmp_path / "sources"
sources_dir.mkdir()
shutil.copy(PROJECT_ROOT / "sources" / "bitcoin.pdf", sources_dir / "bitcoin.pdf")
run("bitcoin", tmp_path, force=True)
first = (tmp_path / "conversione" / "bitcoin" / "clean.md").read_text()
run("bitcoin", tmp_path, force=True)
second = (tmp_path / "conversione" / "bitcoin" / "clean.md").read_text()
assert first == second, "Output deve essere deterministico tra due run"
@pytest.mark.skipif(not _sources_available("codice_civile"), reason="sources/codice_civile.pdf non disponibile")
def test_codice_civile_has_articles(tmp_path):
"""Il Codice Civile deve produrre header con 'Art.'."""
sources_dir = tmp_path / "sources"
sources_dir.mkdir()
shutil.copy(PROJECT_ROOT / "sources" / "codice_civile.pdf", sources_dir / "codice_civile.pdf")
ok = run("codice_civile", tmp_path, force=True)
assert ok
text = (tmp_path / "conversione" / "codice_civile" / "clean.md").read_text()
assert "Art." in text, "clean.md del codice civile deve contenere articoli"
+40
View File
@@ -0,0 +1,40 @@
"""Test categoria 8: riparazione automatica gerarchia rotta (todo.md Cat.8)."""
from conversione._pipeline.stage8_normalize import normalize_hierarchy
def test_cat8_invalid_hierarchy_auto_repaired():
"""
Categoria 8 dal todo.md:
Input: # A \\n\\n#### B
Atteso: # A \\n\\n## B (salto riparato a max +1)
"""
md_input = "# A\n\n#### B\n\nContenuto di B.\n"
result, stats = normalize_hierarchy(md_input)
assert "## B" in result, "#### deve diventare ## (salto +1 dal padre #)"
assert "#### B" not in result, "Il livello originale non deve restare"
assert stats["n_level_jumps_repaired"] >= 1
def test_multiple_jumps_all_repaired():
"""Catena di salti: # → #### → ######."""
md_input = "# Root\n\n#### Middle\n\nTesto\n\n###### Deep\n\nTesto\n"
result, stats = normalize_hierarchy(md_input)
lines = [l for l in result.split("\n") if l.startswith("#")]
levels = [len(l) - len(l.lstrip("#")) for l in lines]
# Verifica che non ci siano salti > 1
for i in range(1, len(levels)):
assert levels[i] <= levels[i - 1] + 1, \
f"Salto non riparato: {levels[i-1]}{levels[i]}"
def test_valid_hierarchy_not_touched():
"""Gerarchia valida non deve essere modificata."""
md_valid = "# H1\n\nTesto\n\n## H2\n\nTesto\n\n### H3\n\nTesto\n"
result, stats = normalize_hierarchy(md_valid)
assert stats["n_level_jumps_repaired"] == 0
assert "# H1" in result
assert "## H2" in result
assert "### H3" in result
View File
+47
View File
@@ -0,0 +1,47 @@
"""Test dataclass Block, Section, FontProfile."""
from conversione._pipeline.models import Block, Section, FontProfile
def test_block_creation():
b = Block(
text="Titolo", page=1,
bbox=(0, 0, 100, 14),
font_size=16.0, font_name="Arial-Bold",
is_bold=True,
)
assert b.text == "Titolo"
assert b.is_bold
assert b.block_type == "paragraph"
assert b.level == 0
assert b.x0 == 0.0
assert b.y1 == 14.0
def test_block_properties():
b = Block("x", 1, (10.0, 20.0, 110.0, 34.0), 12.0, "Helvetica", False)
assert b.x0 == 10.0
assert b.y0 == 20.0
assert b.x1 == 110.0
assert b.y1 == 34.0
def test_section_defaults():
s = Section(title="Intro", level=1)
assert s.content == []
assert s.children == []
assert s.page_start == 0
def test_section_nesting():
parent = Section("Parent", level=1)
child = Section("Child", level=2)
parent.children.append(child)
assert len(parent.children) == 1
assert parent.children[0].title == "Child"
def test_font_profile():
fp = FontProfile(body_size=11.0, cluster_map={18.0: 1, 15.0: 2}, header_sizes=[18.0, 15.0])
assert fp.body_size == 11.0
assert fp.cluster_map[18.0] == 1
assert len(fp.header_sizes) == 2
+44
View File
@@ -0,0 +1,44 @@
"""Test Stage 3: font analysis."""
from conversione._pipeline.models import Block
from conversione._pipeline.stage3_font import build_font_profile
def _make_block(font_size, n=1):
return [
Block(f"testo {i}", 1, (0, i*14.0, 100, (i+1)*14.0), font_size, "Helvetica", False)
for i in range(n)
]
def test_body_size_is_most_frequent():
blocks = _make_block(12.0, 20) + _make_block(18.0, 2) + _make_block(15.0, 3)
profile = build_font_profile(blocks)
assert profile.body_size == 12.0
def test_header_sizes_above_body():
blocks = _make_block(12.0, 20) + _make_block(18.0, 2) + _make_block(15.0, 3)
profile = build_font_profile(blocks)
assert all(s > profile.body_size for s in profile.header_sizes)
def test_cluster_map_levels():
blocks = _make_block(12.0, 20) + _make_block(24.0, 2) + _make_block(18.0, 3) + _make_block(14.0, 4)
profile = build_font_profile(blocks)
# Taglia più grande deve avere livello 1
if profile.header_sizes:
assert profile.cluster_map[profile.header_sizes[0]] == 1
def test_empty_blocks():
profile = build_font_profile([])
assert profile.body_size == 11.0
assert profile.header_sizes == []
def test_single_font_size():
blocks = _make_block(11.0, 50)
profile = build_font_profile(blocks)
assert profile.body_size == 11.0
assert profile.header_sizes == []
assert profile.cluster_map == {}
+52
View File
@@ -0,0 +1,52 @@
"""Test Stage 4: header detection — segnali combinati."""
import pytest
from conversione._pipeline.models import Block, FontProfile
from conversione._pipeline.stage4_headers import classify_blocks
def _profile(body=12.0):
return FontProfile(body_size=body, cluster_map={18.0: 1, 15.0: 2}, header_sizes=[18.0, 15.0])
def _block(text, font_size=12.0, is_bold=False, space_before=5.0, block_type="paragraph"):
return Block(text, 1, (50, 100, 400, 114), font_size, "Helvetica", is_bold,
block_type=block_type, space_before=space_before)
def test_numbered_large_bold_short_becomes_header():
# Tutti i segnali positivi
b = _block("1. Introduzione", font_size=18, is_bold=True, space_before=30.0)
classify_blocks([b], _profile())
assert b.block_type == "header_candidate"
def test_body_text_stays_paragraph():
b = _block("Questo è un lungo paragrafo di testo normale che non deve diventare un header.", font_size=12)
classify_blocks([b], _profile())
assert b.block_type == "paragraph"
def test_bold_body_text_not_header():
# Bold ma stesso size del corpo e testo lungo → NON header (bold_signal richiede size > body+0.5)
b = _block("Testo importante in grassetto nel corpo del documento.", font_size=12, is_bold=True)
classify_blocks([b], _profile())
assert b.block_type == "paragraph"
def test_article_forced_header():
# "Art. N" → sempre header candidate
b = _block("Art. 1423. Nullità del contratto.", font_size=12)
classify_blocks([b], _profile())
assert b.block_type == "header_candidate"
def test_table_preserved():
b = _block("Colonna A | Colonna B", font_size=12, block_type="table")
classify_blocks([b], _profile())
assert b.block_type == "table"
def test_list_item_detection():
b = _block("- primo elemento della lista", font_size=12)
classify_blocks([b], _profile())
assert b.block_type == "list_item"
+95
View File
@@ -0,0 +1,95 @@
"""Test Stage 5: hierarchy inference — numerazione, TOC, font fallback."""
from conversione._pipeline.models import Block, FontProfile
from conversione._pipeline.stage5_hierarchy import infer_hierarchy, _level_from_numbering
def _profile():
return FontProfile(body_size=12.0, cluster_map={18.0: 1, 15.0: 2, 13.0: 3}, header_sizes=[18.0, 15.0, 13.0])
def _hblock(text, font_size=18.0, is_bold=True):
b = Block(text, 1, (50, 100, 400, 114), font_size, "Helvetica-Bold", is_bold)
b.block_type = "header_candidate"
return b
def _pblock(text):
b = Block(text, 1, (50, 120, 400, 134), 12.0, "Helvetica", False)
b.block_type = "paragraph"
return b
# ── Test _level_from_numbering ────────────────────────────────────────────────
def test_numbering_level1():
assert _level_from_numbering("1. Titolo") == 1
def test_numbering_level2():
assert _level_from_numbering("1.2 Sottotitolo") == 2
def test_numbering_level3():
assert _level_from_numbering("1.2.3 Dettaglio") == 3
def test_numbering_deep_capped_at_3():
assert _level_from_numbering("1.2.3.4 Troppo profondo") == 3
def test_numbering_no_match():
assert _level_from_numbering("Testo senza numero") == 0
# ── Test infer_hierarchy con numerazione ─────────────────────────────────────
def test_numbered_sections_get_correct_levels():
blocks = [
_hblock("1. Introduzione", font_size=18),
_pblock("Testo."),
_hblock("1.1 Contesto", font_size=15),
_pblock("Testo."),
_hblock("1.1.1 Dettaglio", font_size=13),
_pblock("Testo."),
_hblock("2. Conclusioni", font_size=18),
]
result = infer_hierarchy(blocks, _profile(), toc=[])
headers = [b for b in result if b.block_type == "header_candidate"]
assert headers[0].level == 1 # "1."
assert headers[1].level == 2 # "1.1"
assert headers[2].level == 3 # "1.1.1"
assert headers[3].level == 1 # "2."
# ── Test infer_hierarchy con TOC ─────────────────────────────────────────────
def test_toc_alignment():
toc = [[1, "Introduzione", 1], [2, "Contesto storico", 3], [1, "Conclusioni", 10]]
blocks = [
_hblock("Introduzione", font_size=14),
_hblock("Contesto storico", font_size=13),
_hblock("Conclusioni", font_size=14),
]
result = infer_hierarchy(blocks, _profile(), toc=toc)
headers = [b for b in result if b.block_type == "header_candidate"]
assert headers[0].level == 1
assert headers[1].level == 2
assert headers[2].level == 1
# ── Test infer_hierarchy con font fallback ────────────────────────────────────
def test_font_fallback_no_numbering_no_toc():
blocks = [
_hblock("Capitolo Grande", font_size=18),
_pblock("Testo."),
_hblock("Sezione Media", font_size=15),
_pblock("Testo."),
]
result = infer_hierarchy(blocks, _profile(), toc=[])
headers = [b for b in result if b.block_type == "header_candidate"]
assert headers[0].level == 1 # 18pt → cluster level 1
assert headers[1].level == 2 # 15pt → cluster level 2
def test_empty_cluster_map_defaults_to_2():
profile_empty = FontProfile(body_size=12.0, cluster_map={}, header_sizes=[])
blocks = [_hblock("Titolo qualsiasi", font_size=18)]
result = infer_hierarchy(blocks, profile_empty, toc=[])
assert result[0].level == 2
+98
View File
@@ -0,0 +1,98 @@
"""Test Stage 6: document tree reconstruction."""
import pytest
from conversione._pipeline.models import Block, Section
from conversione._pipeline.stage6_tree import build_tree
def _hblock(text, level, page=1):
b = Block(text, page, (50, 100, 400, 114), 16.0, "Helvetica-Bold", True)
b.block_type = "header_candidate"
b.level = level
return b
def _pblock(text, page=1):
b = Block(text, page, (50, 120, 400, 134), 12.0, "Helvetica", False)
b.block_type = "paragraph"
return b
def test_simple_hierarchy():
blocks = [
_hblock("H1", 1),
_pblock("p1"),
_hblock("H2", 2),
_pblock("p2"),
]
roots = build_tree(blocks)
assert len(roots) == 1
h1 = roots[0]
assert h1.title == "H1"
assert h1.level == 1
assert len(h1.content) == 1
assert h1.content[0].text == "p1"
assert len(h1.children) == 1
h2 = h1.children[0]
assert h2.title == "H2"
assert len(h2.content) == 1
def test_two_siblings():
blocks = [
_hblock("Cap 1", 1),
_pblock("testo 1"),
_hblock("Cap 2", 1),
_pblock("testo 2"),
]
roots = build_tree(blocks)
assert len(roots) == 2
assert roots[0].title == "Cap 1"
assert roots[1].title == "Cap 2"
def test_pre_header_text_gets_implicit_section():
blocks = [
_pblock("Testo introduttivo prima del primo header."),
_hblock("Primo header", 1),
]
roots = build_tree(blocks)
# La sezione implicita (level=0) è la radice; contiene il testo pre-header
# e il primo header diventa suo figlio.
assert len(roots) == 1
implicit = roots[0]
assert implicit.title == ""
assert implicit.level == 0
assert len(implicit.content) == 1
assert len(implicit.children) == 1
assert implicit.children[0].title == "Primo header"
def test_deep_nesting():
blocks = [
_hblock("H1", 1),
_hblock("H2", 2),
_hblock("H3", 3),
_pblock("testo profondo"),
]
roots = build_tree(blocks)
assert len(roots) == 1
h1 = roots[0]
assert len(h1.children) == 1
h2 = h1.children[0]
assert len(h2.children) == 1
h3 = h2.children[0]
assert len(h3.content) == 1
def test_ignore_blocks_skipped():
b_ignore = Block("superscript", 1, (0,0,10,10), 8.0, "Helvetica", False, block_type="ignore")
blocks = [
_hblock("Titolo", 1),
b_ignore,
_pblock("paragrafo"),
]
roots = build_tree(blocks)
h1 = roots[0]
# Il blocco ignore non deve essere nel content
assert all(b.block_type != "ignore" for b in h1.content)
assert len(h1.content) == 1
+62
View File
@@ -0,0 +1,62 @@
"""Test Stage 7: serializzazione Markdown."""
from conversione._pipeline.models import Block, Section
from conversione._pipeline.stage7_markdown import serialize_tree, _table_to_markdown
def _section(title, level, texts=None, children=None):
blocks = []
for t in (texts or []):
b = Block(t, 1, (0,0,100,14), 12.0, "Helvetica", False, block_type="paragraph")
blocks.append(b)
s = Section(title=title, level=level, content=blocks, children=children or [])
return s
def test_h1_header():
roots = [_section("Introduzione", 1, ["Testo."])]
md = serialize_tree(roots, {})
assert "# Introduzione" in md
assert "Testo." in md
def test_h2_nested():
child = _section("Sezione 1.1", 2, ["Contenuto della sezione."])
root = _section("Capitolo 1", 1, [], [child])
md = serialize_tree([root], {})
assert "# Capitolo 1" in md
assert "## Sezione 1.1" in md
assert "Contenuto della sezione." in md
def test_implicit_section_no_hash():
# Sezione implicita level=0 → nessun # header
s = Section(title="", level=0)
b = Block("Testo iniziale.", 1, (0,0,100,14), 12.0, "Helvetica", False)
s.content.append(b)
md = serialize_tree([s], {})
assert not md.startswith("#")
assert "Testo iniziale." in md
def test_ignore_blocks_not_serialized():
s = Section("Titolo", 1)
b_ignore = Block("superscript", 1, (0,0,10,10), 8.0, "Helvetica", False, block_type="ignore")
b_para = Block("Paragrafo valido.", 1, (0,0,100,14), 12.0, "Helvetica", False, block_type="paragraph")
s.content.extend([b_ignore, b_para])
md = serialize_tree([s], {})
assert "superscript" not in md
assert "Paragrafo valido." in md
def test_table_to_markdown():
table = [["Nome", "Età"], ["Alice", "30"], ["Bob", "25"]]
md = _table_to_markdown(table)
assert "| Nome | Età |" in md
assert "| --- | --- |" in md
assert "| Alice | 30 |" in md
def test_no_excessive_blank_lines():
roots = [_section("A", 1, ["p1", "p2", "p3"])]
md = serialize_tree(roots, {})
assert "\n\n\n" not in md
+49
View File
@@ -0,0 +1,49 @@
"""Test Stage 8: normalizzazione gerarchia Markdown."""
from conversione._pipeline.stage8_normalize import normalize_hierarchy
def test_level_jump_repaired():
md = "# A\n\n#### B\n\nTesto\n"
result, stats = normalize_hierarchy(md)
assert "## B" in result
assert "#### B" not in result
assert stats["n_level_jumps_repaired"] == 1
def test_valid_hierarchy_unchanged():
md = "# A\n\n## B\n\nTesto\n\n### C\n\nTesto\n"
result, stats = normalize_hierarchy(md)
assert "# A" in result
assert "## B" in result
assert "### C" in result
assert stats["n_level_jumps_repaired"] == 0
def test_empty_header_removed():
md = "# Titolo\n\n## Vuoto\n\n## Con contenuto\n\nTesto.\n"
result, stats = normalize_hierarchy(md)
assert "## Vuoto" not in result
assert "## Con contenuto" in result
assert stats["n_empty_headers_removed"] == 1
def test_duplicate_consecutive_header_collapsed():
md = "# Titolo\n\n# Titolo\n\nTesto.\n"
result, stats = normalize_hierarchy(md)
assert result.count("# Titolo") == 1
assert stats["n_duplicate_headers_removed"] == 1
def test_multiple_jumps():
md = "# A\n\n### B\n\nTesto B\n\n##### C\n\nTesto C\n"
result, stats = normalize_hierarchy(md)
assert stats["n_level_jumps_repaired"] == 2
assert "## B" in result
assert "### C" in result
def test_no_false_positives():
md = "# A\n\nTesto.\n\n## B\n\nTesto.\n"
result, stats = normalize_hierarchy(md)
assert stats["n_level_jumps_repaired"] == 0
assert stats["n_empty_headers_removed"] == 0
+36
View File
@@ -0,0 +1,36 @@
"""Test Stage 9: validazione strutturale Markdown."""
from conversione._pipeline.stage9_validate import validate_markdown
def test_valid_document():
md = "# Titolo\n\nTesto.\n\n## Sezione\n\nContenuto.\n"
result = validate_markdown(md)
assert result.is_valid
assert not result.errors
def test_level_jump_detected():
md = "# A\n\n### B\n\nTesto.\n"
result = validate_markdown(md)
assert not result.is_valid
assert any("salto" in e.lower() or "livello" in e.lower() for e in result.errors)
def test_no_headers_warning():
md = "Testo senza nessun header.\n\nAltro paragrafo.\n"
result = validate_markdown(md)
assert any("header" in w.lower() or "strutturato" in w.lower() for w in result.warnings)
def test_inconsistent_table_warning():
md = "# Titolo\n\nTesto.\n\n| A | B |\n|---|---|\n| 1 | 2 | 3 |\n"
result = validate_markdown(md)
assert any("tabelle" in w.lower() or "colonne" in w.lower() for w in result.warnings)
def test_to_dict():
md = "# A\n\nTesto.\n"
d = validate_markdown(md).to_dict()
assert "valid" in d
assert "errors" in d
assert "warnings" in d