step-3: add detect_structure.py (structure profile, no ML deps)

This commit is contained in:
2026-04-13 10:16:42 +02:00
parent 346e336f1a
commit ee25adc0a6
2 changed files with 226 additions and 0 deletions
+223
View File
@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
Step 3 — Rilevamento struttura Markdown
Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo.
Copia i file da step-2/<stem>/ e produce structure_profile.json che
guida la revisione manuale (step 4) e il chunker adattivo (step 5).
Output in step-3/<stem>/:
raw.md — copia da step-2 (non modificare mai)
clean.md — copia da step-2 (da revisionare nello step 4)
structure_profile.json — profilo strutturale
Uso:
python step-3/detect_structure.py # tutti i documenti in step-2/
python step-3/detect_structure.py --stem nietzsche # un solo documento
python step-3/detect_structure.py --force # riesegui anche se già presente
"""
import argparse
import json
import re
import shutil
import sys
from pathlib import Path
# ─── Language detection ───────────────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def detect_language(text: str) -> str:
words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
# ─── Markdown parsing ─────────────────────────────────────────────────────────
def split_sections(text: str, header_level: int) -> list[str]:
"""
Split text on headers of the given level (1=h1, 2=h2, 3=h3).
Returns list of body texts for each matching section.
"""
prefix = "#" * header_level + " "
parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text)
# parts[0] is preamble, rest are section bodies
return [p for p in parts[1:] if p.strip()]
def count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf'(?m)^{re.escape(prefix)}', text))
def count_paragraphs(text: str) -> int:
"""Count non-empty, non-header paragraph blocks."""
blocks = re.split(r'\n{2,}', text)
return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip()))
# ─── Core analysis ────────────────────────────────────────────────────────────
def analyze(raw_md_path: Path) -> dict:
text = raw_md_path.read_text(encoding="utf-8")
n_h1 = count_headers(text, 1)
n_h2 = count_headers(text, 2)
n_h3 = count_headers(text, 3)
n_paragrafi = count_paragraphs(text)
# Determine structural level and primary boundary
if n_h3 >= 5:
livello = 3
boundary = "h3"
strategia = "h3_aware"
section_bodies = split_sections(text, 3)
elif n_h2 >= 3:
livello = 2
boundary = "h2"
strategia = "h2_paragraph_split"
section_bodies = split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
if n_paragrafi >= 3:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
livello = 0
boundary = "nessuno"
strategia = "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Per-document processing ─────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
src_dir = project_root / "step-2" / stem
out_dir = project_root / "step-3" / stem
raw_src = src_dir / "raw.md"
clean_src = src_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\nDocumento: {stem}")
if not raw_src.exists():
print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip")
return False
if profile_out.exists() and not force:
print(f" ⚠️ structure_profile.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
# Copy files from step-2
shutil.copy2(raw_src, out_dir / "raw.md")
if clean_src.exists():
shutil.copy2(clean_src, out_dir / "clean.md")
print(f" Copiati raw.md e clean.md da step-2/{stem}/")
# Analyze
print(f" Analisi struttura in corso...")
profile = analyze(out_dir / "raw.md")
profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8")
# Report
_LIVELLO_DESC = {
3: "struttura ricca (###)",
2: "struttura parziale (##)",
1: "solo paragrafi",
0: "testo piatto",
}
print(f" ✅ Livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}")
print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}")
print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
print(f" ✅ structure_profile.json salvato")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step2_dir = project_root / "step-2"
if not step2_dir.exists():
print(f"Errore: cartella step-2/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-2/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati")
sys.exit(0 if all(results) else 1)