chore: rimuovi cartelle step-0..step-4 ora obsolete

La logica è consolidata in conversione/pipeline.py.
This commit is contained in:
2026-04-17 16:04:59 +02:00
parent 368530bc25
commit 82f205faa2
5 changed files with 0 additions and 1164 deletions
-229
View File
@@ -1,229 +0,0 @@
#!/usr/bin/env python3
"""
Step 0 — Verifica idoneità PDF
Legge tutti i PDF in sources/ e salva un report per ognuno in step-0/.
Uso:
python step-0/check_pdf.py
Output:
step-0/<nome_pdf>_step0_report.txt
"""
import sys
import statistics
from datetime import datetime
from pathlib import Path
def check_pdf(pdf_path: str, save: bool = True) -> None:
try:
import pdfplumber
except ImportError:
print("Errore: pdfplumber non è installato.")
print(" pip install pdfplumber")
sys.exit(1)
path = Path(pdf_path)
if not path.exists():
print(f"Errore: file non trovato — {pdf_path}")
sys.exit(1)
if path.suffix.lower() != ".pdf":
print(f"Errore: il file non è un PDF — {pdf_path}")
sys.exit(1)
lines = [] # righe del report
results = [] # (etichetta, stato, messaggio)
def out(text=""):
lines.append(text)
print(text)
out(f"Step 0 — Verifica idoneità PDF")
out(f"File: {path.name}")
out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
out("=" * 50)
# ------------------------------------------------------------------ #
# Criterio 1 — Non protetto da password
# ------------------------------------------------------------------ #
try:
with pdfplumber.open(path) as pdf:
n_pages = len(pdf.pages)
results.append(("Non protetto da password", "PASS", f"{n_pages} pagine"))
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg or "decrypt" in msg:
results.append(("Non protetto da password", "FAIL",
"Il PDF è cifrato — non può essere elaborato"))
else:
results.append(("Non protetto da password", "FAIL",
f"Impossibile aprire il file: {e}"))
_render_results(results, out)
_maybe_save(lines, path, save)
return
# ------------------------------------------------------------------ #
# Lettura pagine — una sola passata
# ------------------------------------------------------------------ #
char_counts = []
line_lengths = []
all_text = ""
empty_pages = 0
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
all_text += text + "\n"
chars = len(text.strip())
char_counts.append(chars)
if chars == 0:
empty_pages += 1
for line in text.splitlines():
stripped = line.strip()
if stripped:
line_lengths.append(len(stripped))
total_pages = len(char_counts)
pages_with_text = sum(1 for c in char_counts if c > 50)
text_coverage = pages_with_text / total_pages if total_pages > 0 else 0
# ------------------------------------------------------------------ #
# Criterio 2 — Testo estraibile
# ------------------------------------------------------------------ #
if text_coverage >= 0.7:
results.append(("Testo estraibile", "PASS",
f"{pages_with_text}/{total_pages} pagine con testo ({text_coverage:.0%})"))
elif text_coverage >= 0.4:
results.append(("Testo estraibile", "WARN",
f"Solo {pages_with_text}/{total_pages} pagine con testo — revisione estesa necessaria"))
else:
results.append(("Testo estraibile", "FAIL",
f"Solo {pages_with_text}/{total_pages} pagine con testo — probabilmente scansionato"))
# ------------------------------------------------------------------ #
# Criterio 3 — Generato digitalmente (non scansionato)
# ------------------------------------------------------------------ #
pages_text_only = [c for c in char_counts if c > 0]
avg_chars = statistics.mean(pages_text_only) if pages_text_only else 0
if avg_chars >= 300:
results.append(("Generato digitalmente (non scansionato)", "PASS",
f"Media {avg_chars:.0f} char/pagina"))
elif avg_chars >= 100:
results.append(("Generato digitalmente (non scansionato)", "WARN",
f"Media bassa: {avg_chars:.0f} char/pagina — alcune pagine potrebbero essere immagini"))
else:
results.append(("Generato digitalmente (non scansionato)", "FAIL",
f"Media molto bassa: {avg_chars:.0f} char/pagina — il PDF sembra scansionato"))
# ------------------------------------------------------------------ #
# Criterio 4 — Pagine vuote
# ------------------------------------------------------------------ #
if empty_pages == 0:
results.append(("Pagine vuote", "PASS", "Nessuna pagina vuota"))
elif empty_pages <= total_pages * 0.05:
results.append(("Pagine vuote", "WARN",
f"{empty_pages} pagine vuote (≤ 5%) — probabilmente copertine o separatori"))
else:
results.append(("Pagine vuote", "WARN",
f"{empty_pages} pagine vuote ({empty_pages/total_pages:.0%}) — controllare"))
# ------------------------------------------------------------------ #
# Criterio desiderabile — Layout a colonne singola
# ------------------------------------------------------------------ #
if line_lengths:
median_len = statistics.median(line_lengths)
short_lines = sum(1 for l in line_lengths if l < median_len * 0.4)
short_ratio = short_lines / len(line_lengths)
if short_ratio < 0.15:
results.append(("Layout a colonne singola (desiderabile)", "PASS",
f"Righe corte: {short_ratio:.0%} — struttura lineare"))
elif short_ratio < 0.35:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
f"Righe corte: {short_ratio:.0%} — possibile layout a colonne parziale"))
else:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
f"Righe corte: {short_ratio:.0%} — probabile layout a colonne multiple"))
else:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
"Impossibile analizzare (nessuna riga estratta)"))
# ------------------------------------------------------------------ #
# Criterio desiderabile — Struttura logica (titoli)
# ------------------------------------------------------------------ #
candidate_headings = [
line.strip() for line in all_text.splitlines()
if 3 <= len(line.strip()) <= 80
and line.strip()[0].isupper()
and not line.strip().endswith(".")
and not line.strip().endswith(",")
and len(line.strip().split()) <= 10
]
heading_density = len(candidate_headings) / total_pages if total_pages > 0 else 0
if heading_density >= 1.0:
results.append(("Struttura logica riconoscibile (desiderabile)", "PASS",
f"~{len(candidate_headings)} possibili titoli rilevati ({heading_density:.1f}/pagina)"))
elif heading_density >= 0.3:
results.append(("Struttura logica riconoscibile (desiderabile)", "WARN",
f"~{len(candidate_headings)} possibili titoli ({heading_density:.1f}/pagina) — struttura parziale"))
else:
results.append(("Struttura logica riconoscibile (desiderabile)", "WARN",
"Pochi titoli rilevati — testo narrativo o struttura non standard"))
_render_results(results, out)
_maybe_save(lines, path, save)
def _render_results(results: list, out) -> None:
icons = {"PASS": "", "WARN": "⚠️ ", "FAIL": ""}
out()
for label, status, message in results:
icon = icons.get(status, " ")
out(f" {icon} {label}")
out(f" {message}")
out()
fails = [r for r in results if r[1] == "FAIL"]
warns = [r for r in results if r[1] == "WARN"]
if fails:
out("ESITO: ❌ PDF NON IDONEO")
out(" Criteri obbligatori non soddisfatti — scegli un PDF diverso.")
elif warns:
out("ESITO: ⚠️ PDF ACCETTABILE CON CAUTELA")
out(" Procedi, ma aspettati più lavoro nella revisione manuale (step 4).")
else:
out("ESITO: ✅ PDF IDONEO")
out(" Tutti i criteri soddisfatti — procedi con lo step 1.")
out()
def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None:
if not save:
return
script_dir = Path(__file__).parent
out_file = script_dir / f"{pdf_path.stem}_step0_report.txt"
out_file.write_text("\n".join(lines), encoding="utf-8")
print(f"Report salvato in: {out_file}")
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
for pdf in pdfs:
check_pdf(str(pdf), save=True)
if len(pdfs) > 1:
print("-" * 50)
-199
View File
@@ -1,199 +0,0 @@
#!/usr/bin/env python3
"""
Step 1 — Ispezione automatica PDF
Analizza il PDF pagina per pagina e produce un report con score (0100)
e lista dei problemi per pagina. Serve per capire la qualità del documento
e mappare i problemi prima della revisione manuale (step 4).
Uso:
python step1/inspect.py
Output:
step1/<nome_pdf>_step1_report.txt
"""
import re
import sys
import statistics
from collections import Counter
from datetime import datetime
from pathlib import Path
# ── Penalità per il calcolo dello score ───────────────────────────────────
SYLLABIF_PENALTY = 0.3 # per occorrenza di sillabazione
COLUMN_PENALTY = 3.0 # per pagina con layout a colonne
UNICODE_PENALTY = 1.5 # per pagina con caratteri anomali
EMPTY_PENALTY = 1.0 # per pagina vuota
HEADER_FOOTER_PEN = 5.0 # fisso se intestazioni/piè ripetitivi rilevati
def inspect_pdf(pdf_path: str, save: bool = True) -> None:
try:
import pdfplumber
except ImportError:
print("Errore: pdfplumber non è installato.")
print(" pip install pdfplumber")
sys.exit(1)
path = Path(pdf_path)
if not path.exists():
print(f"Errore: file non trovato — {pdf_path}")
sys.exit(1)
lines = []
def out(text=""):
lines.append(text)
print(text)
out("Step 1 — Ispezione automatica PDF")
out(f"File: {path.name}")
out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
out("=" * 50)
# ── Lettura pagine ─────────────────────────────────────────────────────
with pdfplumber.open(path) as pdf:
n_pages = len(pdf.pages)
pages_text = [page.extract_text() or "" for page in pdf.pages]
# ── Analisi per pagina ─────────────────────────────────────────────────
issues = [] # (page_num, descrizione) — page_num=0 → problema globale
deductions = 0.0
first_lines = [] # prima riga significativa di ogni pagina (per header)
last_lines = [] # ultima riga significativa di ogni pagina (per footer)
for i, text in enumerate(pages_text):
page_num = i + 1
stripped = text.strip()
# 1. Pagina vuota
if len(stripped) < 50:
issues.append((page_num, "pagina vuota"))
deductions += EMPTY_PENALTY
continue
page_lines = text.splitlines()
nonempty = [l.strip() for l in page_lines if l.strip()]
# Raccogli prima/ultima riga per il controllo header/footer
if nonempty:
first_lines.append(nonempty[0])
last_lines.append(nonempty[-1])
# 2. Sillabazione a fine riga (es. "estra-" + a capo)
syllabif = sum(
1 for line in page_lines
if re.search(r'\b\w{2,}-$', line.rstrip())
)
if syllabif:
label = "occorrenza" if syllabif == 1 else "occorrenze"
issues.append((page_num, f"sillabazione rilevata ({syllabif} {label})"))
deductions += syllabif * SYLLABIF_PENALTY
# 3. Layout a colonne (righe molto corte e numerose)
if len(nonempty) >= 10:
median_len = statistics.median(len(l) for l in nonempty)
short_ratio = sum(1 for l in nonempty if len(l) < median_len * 0.4) / len(nonempty)
if short_ratio > 0.35:
issues.append((page_num, f"possibile layout a colonne ({short_ratio:.0%} righe corte)"))
deductions += COLUMN_PENALTY
# 4. Caratteri Unicode anomali
# (control chars esclusi \n \t \r, replacement char, PUA block)
anomalies = re.findall(
r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f\ufffd\ue000-\uf8ff]', text
)
if anomalies:
issues.append((page_num, f"caratteri Unicode anomali ({len(anomalies)} trovati)"))
deductions += UNICODE_PENALTY
# ── Intestazioni e piè di pagina ripetitivi ────────────────────────────
def _check_repetition(line_list: list, label: str) -> None:
nonlocal deductions
if not line_list:
return
threshold = max(3, len(line_list) * 0.25)
repeated = [
(txt, cnt) for txt, cnt in Counter(line_list).items()
if cnt >= threshold and len(txt) > 3
]
if repeated:
deductions += HEADER_FOOTER_PEN
for txt, cnt in repeated[:3]:
issues.append((0, f"{label} ripetitivo: \"{txt[:45]}\" ({cnt} volte)"))
_check_repetition(first_lines, "intestazione")
_check_repetition(last_lines, "piè di pagina")
# ── Score ──────────────────────────────────────────────────────────────
score = max(0, round(100 - deductions))
# ── Riepilogo ──────────────────────────────────────────────────────────
pages_with_issues = len({p for p, _ in issues if p > 0})
out()
out(f"Score: {score}/100")
out(f"Pagine totali: {n_pages}")
out(f"Pagine con problemi: {pages_with_issues}")
out()
if issues:
global_issues = [(p, d) for p, d in issues if p == 0]
page_issues = sorted([(p, d) for p, d in issues if p > 0])
for _, desc in global_issues:
out(f" ⚠️ {desc}")
for page_num, desc in page_issues:
out(f" Pagina {page_num:>4}: {desc}")
else:
out(" Nessun problema rilevato.")
out()
# ── Prossimi passi ─────────────────────────────────────────────────────
out("PROSSIMI PASSI:")
if score >= 70:
out(" → conversione con marker funzionerà bene")
elif score >= 40:
out(" → conversione possibile, attendi più errori nella revisione")
else:
out(" → qualità bassa — valuta una fonte PDF migliore")
attention_pages = sorted({p for p, _ in issues if p > 0})
if attention_pages:
sample = ", ".join(str(p) for p in attention_pages[:10])
if len(attention_pages) > 10:
sample += f" … e altre {len(attention_pages) - 10}"
out(f" → attenzione alle pagine {sample} nella revisione manuale")
out()
_maybe_save(lines, path, save)
def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None:
if not save:
return
script_dir = Path(__file__).parent
out_file = script_dir / f"{pdf_path.stem}_step1_report.txt"
out_file.write_text("\n".join(lines), encoding="utf-8")
print(f"Report salvato in: {out_file}")
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
for pdf in pdfs:
inspect_pdf(str(pdf), save=True)
if len(pdfs) > 1:
print("-" * 50)
-80
View File
@@ -1,80 +0,0 @@
#!/usr/bin/env python3
"""
Step 2 — Conversione PDF → Markdown grezzo
Usa pymupdf4llm (PyMuPDF puro C, zero modelli ML, ~30-50 MB RAM)
per convertire ogni PDF in sources/ e organizza l'output in:
step-2/<stem>/raw.md — MD grezzo, non modificare mai
step-2/<stem>/clean.md — copia di lavoro per lo step 4
Uso:
python step-2/convert_pdf.py # tutti i PDF in sources/
python step-2/convert_pdf.py --pdf sources/doc.pdf # un solo PDF
"""
import argparse
import shutil
import sys
from pathlib import Path
import pymupdf4llm
def convert_pdf(pdf_path: Path, project_root: Path) -> bool:
stem = pdf_path.stem
out_dir = project_root / "step-2" / stem
raw_md = out_dir / "raw.md"
clean_md = out_dir / "clean.md"
print(f"\nConversione: {pdf_path.name}")
print(f" Output: step-2/{stem}/")
if raw_md.exists():
print(f" ⚠️ raw.md già presente — skip")
print(f" (elimina {raw_md} per riconvertire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
print(f" Conversione in corso...")
md_text = pymupdf4llm.to_markdown(str(pdf_path))
raw_md.write_text(md_text, encoding="utf-8")
shutil.copy2(raw_md, clean_md)
size_kb = raw_md.stat().st_size // 1024
print(f" ✅ raw.md salvato ({size_kb} KB)")
print(f" ✅ clean.md creato (copia di lavoro per step 4)")
return True
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 2 — Conversione PDF → Markdown")
parser.add_argument("--pdf", help="Percorso di un singolo PDF da convertire")
args = parser.parse_args()
if args.pdf:
pdf_path = Path(args.pdf)
if not pdf_path.exists():
print(f"Errore: file non trovato — {args.pdf}")
sys.exit(1)
pdfs = [pdf_path]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
results = [convert_pdf(p, project_root) for p in pdfs]
ok_count = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok_count}/{total} PDF convertiti")
sys.exit(0 if all(results) else 1)
-223
View File
@@ -1,223 +0,0 @@
#!/usr/bin/env python3
"""
Step 3 — Rilevamento struttura Markdown
Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo.
Copia i file da step-2/<stem>/ e produce structure_profile.json che
guida la revisione manuale (step 4) e il chunker adattivo (step 5).
Output in step-3/<stem>/:
raw.md — copia da step-2 (non modificare mai)
clean.md — copia da step-2 (da revisionare nello step 4)
structure_profile.json — profilo strutturale
Uso:
python step-3/detect_structure.py # tutti i documenti in step-2/
python step-3/detect_structure.py --stem nietzsche # un solo documento
python step-3/detect_structure.py --force # riesegui anche se già presente
"""
import argparse
import json
import re
import shutil
import sys
from pathlib import Path
# ─── Language detection ───────────────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def detect_language(text: str) -> str:
words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
# ─── Markdown parsing ─────────────────────────────────────────────────────────
def split_sections(text: str, header_level: int) -> list[str]:
"""
Split text on headers of the given level (1=h1, 2=h2, 3=h3).
Returns list of body texts for each matching section.
"""
prefix = "#" * header_level + " "
parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text)
# parts[0] is preamble, rest are section bodies
return [p for p in parts[1:] if p.strip()]
def count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf'(?m)^{re.escape(prefix)}', text))
def count_paragraphs(text: str) -> int:
"""Count non-empty, non-header paragraph blocks."""
blocks = re.split(r'\n{2,}', text)
return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip()))
# ─── Core analysis ────────────────────────────────────────────────────────────
def analyze(raw_md_path: Path) -> dict:
text = raw_md_path.read_text(encoding="utf-8")
n_h1 = count_headers(text, 1)
n_h2 = count_headers(text, 2)
n_h3 = count_headers(text, 3)
n_paragrafi = count_paragraphs(text)
# Determine structural level and primary boundary
if n_h3 >= 5:
livello = 3
boundary = "h3"
strategia = "h3_aware"
section_bodies = split_sections(text, 3)
elif n_h2 >= 3:
livello = 2
boundary = "h2"
strategia = "h2_paragraph_split"
section_bodies = split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
if n_paragrafi >= 3:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
livello = 0
boundary = "nessuno"
strategia = "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Per-document processing ─────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
src_dir = project_root / "step-2" / stem
out_dir = project_root / "step-3" / stem
raw_src = src_dir / "raw.md"
clean_src = src_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\nDocumento: {stem}")
if not raw_src.exists():
print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip")
return False
if profile_out.exists() and not force:
print(f" ⚠️ structure_profile.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
# Copy files from step-2
shutil.copy2(raw_src, out_dir / "raw.md")
if clean_src.exists():
shutil.copy2(clean_src, out_dir / "clean.md")
print(f" Copiati raw.md e clean.md da step-2/{stem}/")
# Analyze
print(f" Analisi struttura in corso...")
profile = analyze(out_dir / "raw.md")
profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8")
# Report
_LIVELLO_DESC = {
3: "struttura ricca (###)",
2: "struttura parziale (##)",
1: "solo paragrafi",
0: "testo piatto",
}
print(f" ✅ Livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}")
print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}")
print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
print(f" ✅ structure_profile.json salvato")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step2_dir = project_root / "step-2"
if not step2_dir.exists():
print(f"Errore: cartella step-2/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-2/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati")
sys.exit(0 if all(results) else 1)
-433
View File
@@ -1,433 +0,0 @@
#!/usr/bin/env python3
"""
Step 4 — Revisione automatica del Markdown
Trasforma clean.md da step-3 rivelando la struttura latente del documento.
Le trasformazioni sono euristiche universali che funzionano su qualsiasi PDF:
- Normalizza whitespace multiplo (artefatto PDF)
- Riduce righe vuote multiple
- Rimuove marcatori **bold** nelle intestazioni esistenti
- Converte righe ALL-CAPS standalone → ## header (euristico, qualsiasi lingua)
- Converte sezioni numerate "N. testo" → ### N. (qualsiasi numerazione)
- Rimuove blocchi TOC (righe che iniziano con parole-chiave indice)
Per ogni documento viene ricalcolato il profilo strutturale: il livello può
salire (es. livello 1 → 3) se le strutture latenti vengono rilevate.
Output in step-4/<stem>/:
raw.md — copia da step-3 (non modificare mai)
clean.md — MD revisionato
structure_profile.json — profilo aggiornato dopo la revisione
Uso:
python step-4/revise.py # tutti i documenti in step-3/
python step-4/revise.py --stem nietzsche # un solo documento
python step-4/revise.py --force # riesegui anche se già presente
"""
import argparse
import json
import re
import shutil
import sys
from datetime import date
from pathlib import Path
# Riusa la funzione analyze() già scritta nello step 3
sys.path.insert(0, str(Path(__file__).parent.parent / "step-3"))
from detect_structure import analyze # noqa: E402
# ─── Costanti ─────────────────────────────────────────────────────────────────
# Parole-chiave che identificano blocchi TOC (da rimuovere)
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
])
# Preposizioni/articoli da non capitalizzare nel title-case
_STOP_IT_EN = frozenset([
# italiano
"di", "del", "della", "dei", "delle", "da", "in", "e", "il", "la",
"lo", "le", "gli", "un", "una", "per", "a", "al", "alla", "ai",
"alle", "con", "su", "sul", "sulla", "che", "o",
# inglese
"of", "the", "a", "an", "and", "or", "but", "in", "on", "at",
"to", "for", "with", "by", "from", "as",
])
# Ordinali italiani → romani (per titoli come "CAPITOLO PRIMO")
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
# Ordinali inglesi → arabici (per "CHAPTER ONE")
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# ─── Utilità ──────────────────────────────────────────────────────────────────
def _sentence_case(s: str) -> str:
"""
Sentence-case: prima lettera maiuscola, resto minuscolo.
Corretto per l'italiano e accettabile per l'inglese accademico.
"""
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
"""
True se la riga è una candidata per conversione a ## header.
Criterio: tutti i caratteri alfabetici sono maiuscoli, lunghezza >= 3.
"""
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
)
def _allcaps_to_header(raw_line: str) -> str:
"""
Converte una riga ALL-CAPS in un ## header title-case.
Riconosce pattern specifici (CAPITOLO ORDINE, CHAPTER N) come bonus,
ma funziona in modalità generica su qualsiasi testo.
"""
text = raw_line.strip().rstrip('.').rstrip('?').strip()
# ── Pattern italiano: "CAPITOLO PRIMO. TITOLO DEL CAPITOLO"
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf'^CAPITOLO ({_ORD_IT_PAT})\. (.+)', text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip('.').rstrip('?').strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
# ── Pattern inglese: "CHAPTER ONE. TITLE" o "CHAPTER 1. TITLE"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf'^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)', text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip('.').rstrip('?').strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
# ── Pattern generico con numerazione romana o arabica nel prefisso
m = re.match(r'^([IVXLCDM]+|[0-9]+)\. (.+)', text)
if m:
n = m.group(1)
titolo = m.group(2).rstrip('.').strip()
return f"## {n}. {_sentence_case(titolo)}"
# ── Caso generico: tutto maiuscolo senza pattern riconoscibile
return f"## {_sentence_case(text)}"
def _is_toc_line(line: str) -> bool:
"""True se la riga è l'intestazione di un blocco indice/TOC."""
first_word = line.strip().split('.')[0].strip().lower()
return first_word in _TOC_KEYWORDS
# ─── Trasformazioni ────────────────────────────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica tutte le trasformazioni strutturali al testo MD.
Restituisce (testo_modificato, statistiche).
"""
stats = {
"toc_rimosso": False,
"n_header_allcaps": 0,
"n_sezioni_numerate": 0,
"n_paragrafi_uniti": 0,
}
# ── 1. Rimuovi marcatori **bold** nelle intestazioni esistenti
# ## **Titolo** → ## Titolo
text = re.sub(
r'^(#{1,6})\s+\*\*(.+?)\*\*\s*$',
r'\1 \2',
text, flags=re.MULTILINE,
)
# ── 1b. Normalizza header esistenti con contenuto ALL-CAPS → sentence-case
# ## AL DI LA' DEL BENE E DEL MALE → ## Al di la' del bene e del male
def _norm_allcaps_header(m: re.Match) -> str:
hashes = m.group(1)
content = m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(
r'^(#{1,6}) (.+)$',
_norm_allcaps_header,
text, flags=re.MULTILINE,
)
# ── 2. Rimuovi blocco TOC (riga indice + contenuto inline sulla stessa riga)
# "INDICE. Capitolo 1 Capitolo 2 ..." → rimossa
lines = text.split('\n')
new_lines = []
for line in lines:
if _is_toc_line(line):
stats["toc_rimosso"] = True
else:
new_lines.append(line)
text = '\n'.join(new_lines)
# ── 3. Converti righe ALL-CAPS standalone → ## header
# Una riga è "standalone" se è preceduta/seguita da riga vuota
# oppure si trova all'inizio/fine del documento.
blocks = text.split('\n\n')
new_blocks = []
for block in blocks:
stripped = block.strip()
# Blocco standalone = un'unica riga (nessun \n interno rilevante)
if '\n' not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
stats["n_header_allcaps"] += 1
else:
# Controlla riga per riga per righe ALL-CAPS seguite da altri contenuti
sub_lines = block.split('\n')
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
stats["n_header_allcaps"] += 1
else:
converted.append(ln)
new_blocks.append('\n'.join(converted))
text = '\n\n'.join(new_blocks)
# ── 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto"
# Riconosce: "1. Testo", "42. Testo" (due o più spazi dopo il punto)
def _num_repl(m: re.Match) -> str:
num = m.group(1)
testo = m.group(2).strip()
stats["n_sezioni_numerate"] += 1
return f"### {num}.\n\n{testo}"
# Pattern standard: "1. testo" o "1. testo"
text = re.sub(
r'^(\d+)\.\s+(.+)$',
_num_repl,
text, flags=re.MULTILINE,
)
# Pattern con lettera-suffisso: "65 a. testo" o "65a. testo"
def _num_letter_repl(m: re.Match) -> str:
num = m.group(1) + m.group(2)
testo = m.group(3).strip()
stats["n_sezioni_numerate"] += 1
return f"### {num}.\n\n{testo}"
text = re.sub(
r'^(\d+)\s*([a-z])\.\s+(.+)$',
_num_letter_repl,
text, flags=re.MULTILINE,
)
# ── 5. Unisci paragrafi spezzati da salti pagina PDF
# Criterio: blocco A non finisce con punteggiatura di fine frase,
# blocco B non inizia con maiuscola "di sezione" né è un header.
# Unione sicura: mai attraverso confini ###/##.
_SENTENCE_END = set('.?!»)\'"')
blocks = text.split('\n\n')
merged = []
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
# Prova a unire con il successivo se la frase è spezzata
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith('#')
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
# Non unire se il successivo è un header o è vuoto
if not nxt or nxt.startswith('#'):
break
# Non unire se il successivo inizia con una cifra seguita da punto
# (sarebbe l'inizio di un nuovo aforisma non ancora convertito)
if re.match(r'^\d+\.', nxt):
break
b = stripped + ' ' + nxt
stripped = b.strip()
stats["n_paragrafi_uniti"] += 1
i += 1
merged.append(b)
i += 1
text = '\n\n'.join(merged)
# ── 6. Normalizza whitespace multiplo interno alle righe
# "parola parola" → "parola parola" (inclusi gli header)
lines = text.split('\n')
normalized = []
for line in lines:
if not line.strip():
normalized.append(line)
else:
normalized.append(re.sub(r' +', ' ', line))
text = '\n'.join(normalized)
# ── 7. Riduci righe vuote multiple a doppie
text = re.sub(r'\n{3,}', '\n\n', text)
return text, stats
# ─── Aggiornamento revision log ────────────────────────────────────────────────
def update_revision_log(
log_path: Path,
stem: str,
profile_before: dict,
profile_after: dict,
t_stats: dict,
) -> None:
header_exists = log_path.exists() and log_path.stat().st_size > 0
avv = profile_after.get("avvertenze", [])
avv_str = "; ".join(avv) if avv else "nessuna"
entry = f"""
## {stem}{date.today().isoformat()}
**Trasformazioni automatiche:**
- Normalizzazione whitespace multiplo e righe vuote
- Blocco TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}
- Righe ALL-CAPS → ## header: {t_stats['n_header_allcaps']}
- Sezioni numerate → ### header: {t_stats['n_sezioni_numerate']}
- Paragrafi uniti (salti pagina PDF): {t_stats['n_paragrafi_uniti']}
- Livello struttura: {profile_before.get('livello_struttura', '?')}{profile_after.get('livello_struttura', '?')}
**Avvertenze residue:** {avv_str}
**Revisioni manuali pendenti:**
- [ ] Verificare conversioni ALL-CAPS errate
- [ ] Controllare sezioni troppo corte o troppo lunghe
"""
if not header_exists:
log_path.write_text("# Revision log\n" + entry, encoding="utf-8")
else:
existing = log_path.read_text(encoding="utf-8")
log_path.write_text(existing + entry, encoding="utf-8")
# ─── Per-document processing ─────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
src_dir = project_root / "step-3" / stem
out_dir = project_root / "step-4" / stem
raw_src = src_dir / "raw.md"
clean_src = src_dir / "clean.md"
profile_src = src_dir / "structure_profile.json"
clean_out = out_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\nDocumento: {stem}")
if not clean_src.exists():
print(f" ✗ clean.md non trovato in step-3/{stem}/ — skip")
return False
if clean_out.exists() and not force:
print(f" ⚠️ clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
# Copia raw.md immutabile (riferimento)
if raw_src.exists():
shutil.copy2(raw_src, out_dir / "raw.md")
print(f" Copiato raw.md da step-3/{stem}/")
# Leggi profilo step-3 (per confronto nel report)
profile_before: dict = {}
if profile_src.exists():
profile_before = json.loads(profile_src.read_text(encoding="utf-8"))
# Applica trasformazioni
print(f" Applicazione trasformazioni strutturali...")
text = clean_src.read_text(encoding="utf-8")
text_revised, t_stats = apply_transforms(text)
# Salva clean.md revisionato
clean_out.write_text(text_revised, encoding="utf-8")
# Ricalcola profilo sul nuovo clean.md
profile_after = analyze(clean_out)
profile_out.write_text(
json.dumps(profile_after, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# Report
lv_b = profile_before.get("livello_struttura", "?")
lv_a = profile_after["livello_struttura"]
_STRAT = {3: "h3_aware", 2: "h2_paragraph_split", 1: "paragraph", 0: "sliding_window"}
print(f" ✅ Livello struttura: {lv_b}{lv_a} ({_STRAT.get(lv_a, '?')})")
print(f" h2: {profile_before.get('n_h2','?')}{profile_after['n_h2']}")
print(f" h3: {profile_before.get('n_h3','?')}{profile_after['n_h3']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" Righe ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni numerate → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti (salti pagina): {t_stats['n_paragrafi_uniti']}")
for w in profile_after["avvertenze"]:
print(f" ⚠️ {w}")
# Aggiorna revision log (direttamente in step-4/, non in sottocartella)
log_path = project_root / "step-4" / "revision_log.md"
update_revision_log(log_path, stem, profile_before, profile_after, t_stats)
print(f" ✅ step-4/revision_log.md aggiornato")
print(f" ✅ structure_profile.json salvato")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 4 — Revisione automatica Markdown")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-3/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step3_dir = project_root / "step-3"
if not step3_dir.exists():
print(f"Errore: cartella step-3/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step3_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-3/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti revisionati")
sys.exit(0 if all(results) else 1)