This commit is contained in:
2026-04-30 15:26:52 +02:00
parent e41fcae248
commit ab4036591f
34 changed files with 1726 additions and 974 deletions
+1
View File
@@ -30,6 +30,7 @@ Thumbs.db
# Output conversione/ — generati da conversione/pipeline.py
conversione/*/
!conversione/_pipeline/
!conversione/_pipeline/transforms
!conversione/_pipeline/**
# Output chunks/ — generati da chunks/chunker.py e chunks/verify_chunks.py
+1
View File
@@ -71,6 +71,7 @@ def run(stem: str, project_root: Path, force: bool) -> bool:
print(f" Ambienti matematici: {t['n_ambienti_matematici']}")
print(f" Titoli header uniti: {t['n_titoli_uniti']}")
print(f" TOC rimosso: {'' if t['toc_rimosso'] else 'no'}")
print(f" TOC orfani rimossi: {t['n_toc_orfani_rimossi']}")
print(f" Versi poesia riprist.: {t['n_versi_ripristinati']}")
print(f" Header verso demotati: {t['n_header_verso_demotati']}")
print(f" ALL-CAPS → ##: {t['n_header_allcaps']}")
-974
View File
@@ -1,974 +0,0 @@
import re
from collections import Counter
from functools import partial
# ─── Costanti ────────────────────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# Mapping PUA Unicode (U+F020-U+F0FF) → simboli Unicode standard.
# Font Symbol di Windows codifica lettere greche e operatori matematici
# nel range Private Use Area invece dei codepoint Unicode standard.
_SYMBOL_PUA_MAP: dict[str, str] = {
"": " ",
"": "(",
"": ")",
"": "+",
"": "", # minus
"": ".",
"": "/",
"": "0", "": "1", "": "2", "": "3", "": "4",
"": "5", "": "6", "": "7", "": "8", "": "9",
"": ":", "": ";", "": "<", "": "=", "": ">",
"": "", # congruent
"": "Α", # Alpha
"": "Β", # Beta
"": "Χ", # Chi
"": "Δ", # Delta
"": "Ε", # Epsilon
"": "Φ", # Phi
"": "Γ", # Gamma
"": "Η", # Eta
"": "Ι", # Iota
"": "ϑ", # theta variant
"": "Κ", # Kappa
"": "Λ", # Lambda
"": "Μ", # Mu
"": "Ν", # Nu
"": "Ο", # Omicron
"": "Π", # Pi
"": "Θ", # Theta
"": "Ρ", # Rho
"": "Σ", # Sigma
"": "Τ", # Tau
"": "Υ", # Upsilon
"": "ς", # sigma final
"": "Ω", # Omega
"": "Ξ", # Xi
"": "Ψ", # Psi
"": "Ζ", # Zeta
"": "[",
"": "", # therefore
"": "]",
"": "", # perpendicular
"": "α", # alpha
"": "β", # beta
"": "χ", # chi
"": "δ", # delta
"": "ε", # epsilon
"": "φ", # phi
"": "γ", # gamma
"": "η", # eta
"": "ι", # iota
"": "ϕ", # phi variant
"": "κ", # kappa
"": "λ", # lambda
"": "μ", # mu
"": "ν", # nu
"": "ο", # omicron
"": "π", # pi
"": "θ", # theta
"": "ρ", # rho
"": "σ", # sigma
"": "τ", # tau
"": "υ", # upsilon
"": "ϖ", # pi symbol
"": "ω", # omega
"": "ξ", # xi
"": "ψ", # psi
"": "ζ", # zeta
"": "{",
"": "|",
"": "}",
"": "~",
"": "±", # plus-minus
"": "", # bullet
"": "", # square root
"": "", # less or equal
"": "", # greater or equal
"": "", # proportional
"": "×", # multiplication
"": "÷", # division
"": "×", # alternate multiply
"": "", # not equal
"": "", # not equal alternate
"": "", # greater or equal alternate
"": "", # prime
"": "*",
"": ",",
"": "", # less or equal (Symbol 0xA3)
"": "", # bullet (Wingdings 0xA7)
"": "", # bullet variant
"": "", # right arrow (Symbol 0xAE)
"": "÷", # division / range separator
"": "", # Wingdings decorative icon (rimosso)
"": "", # right arrow variant
"": "", # bracket extension piece (non ricostruibile)
"": "",
"": "",
"": "",
"": "",
"": "", # TeX large paren left U+F8EB
"": "", # TeX large paren extension U+F8EC
"": "", # TeX large paren right U+F8ED
"": "", # TeX large paren right ext U+F8EE
"": "", # TeX large bracket left U+F8EF
"": "", # TeX large bracket ext U+F8F0
"": "", # TeX brace top-left U+F8F1
"": "", # TeX brace mid U+F8F2
"": "", # TeX brace mid-right U+F8F3
"": "", # TeX brace extension U+F8F4
"": "", # TeX brace right U+F8F5
"": "", # TeX bracket right large U+F8F6
"": "", # TeX bracket right ext U+F8F7
"": "", # TeX bracket right close U+F8F8
"": "", # TeX integral large U+F8F9
"": "", # TeX integral extension U+F8FA
"": "", # TeX integral top U+F8FB
"": "", # TeX radical top U+F8FC
"": "", # TeX radical extension U+F8FD
"": "", # TeX arrowhead U+F8FE
}
_SYMBOL_PUA_RE = re.compile(
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
)
_SUPERSCRIPT_RE = re.compile(r'[¹²³⁰⁴-⁹]+')
_FOOTNOTE_BODY_RE = re.compile(
r'^([¹²³⁰⁴-⁹]+\s+|\[\d{1,3}\]\s+)'
)
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
_BIB_MARKERS_RE = re.compile(
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
r'|\b(19|20)\d{2}\b',
re.IGNORECASE,
)
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
_MATH_SYMBOLS_RE = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_EXERCISE_TRIGGER_RE = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
_MATH_HDR_RE = re.compile(r"^(#{2,3})\s+(.+)$")
_NUMBERED_PREFIX_RE = re.compile(r"^(\d+(?:\.\d+)*[.)])\s+(.+)$", re.DOTALL)
# Erano compilati dentro le funzioni a ogni chiamata — ora costanti di modulo
_TABSEP_RE = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
_VERSE_NUM_RE = re.compile(
r'([.!?\xbb\'\"]\s+)(\d+)(\s+)(?=[A-Z\xc0-\xd9a-z\xe0-\xf9\xab"“‟])'
)
# ─── Helper puri ─────────────────────────────────────────────────────────────
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|")
)
def _allcaps_to_header(raw_line: str) -> str:
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
_ENVS = (
r"Definizione|Definition|Teorema|Theorem|Lemma|"
r"Proposizione|Proposition|Corollario|Corollary|"
r"Osservazione|Remark|Nota|Note|Esempio|Example"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
title_m = re.match(
r"^([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda].{1,74}?)\.\s+"
r"([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
# ─── Trasformazioni atomiche ──────────────────────────────────────────────────
def _t_fix_symbol_font(text: str) -> tuple[str, int]:
count = [0]
def _repl(m: re.Match) -> str:
count[0] += 1
return _SYMBOL_PUA_MAP[m.group(0)]
result = _SYMBOL_PUA_RE.sub(_repl, text)
return result, count[0]
def _t_remove_images(text: str) -> tuple[str, int]:
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
return text, n
def _t_remove_footnotes(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
stripped = line.strip()
if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300:
count += 1
continue
cleaned = _SUPERSCRIPT_RE.sub("", line)
if cleaned != line:
count += 1
result.append(cleaned)
return "\n".join(result), count
def _t_fix_br(text: str) -> tuple[str, int]:
n = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
return text, n
def _t_fix_tabsep(text: str) -> tuple[str, int]:
n = len(_TABSEP_RE.findall(text))
text = _TABSEP_RE.sub("", text)
return text, n
def _t_fix_accents(text: str) -> tuple[str, int]:
_ACCENT_MAP = {
"e": "\xe8", "E": "\xc8", "a": "\xe0", "A": "\xc0",
"u": "\xf9", "U": "\xd9", "i": "\xec", "I": "\xcc",
"o": "\xf2", "O": "\xd2",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
n_accenti = n_bt_before - text.count("`")
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
n_accenti += n_bt_orfani
return text, n_accenti
def _t_fix_multiplication(text: str) -> tuple[str, int]:
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
return text, n
def _t_fix_micro(text: str) -> tuple[str, int]:
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
return text, n
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
n = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
return text, n
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
return text, n
def _t_fix_header_concat(text: str) -> tuple[str, int]:
count = 0
def _fix(m: re.Match) -> str:
nonlocal count
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa\xe4])(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
count += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
return text, count
def _t_extract_capitolo(text: str) -> tuple[str, int]:
def _repl(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L]"
r"[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_repl,
text,
)
return text, 0
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
if not all_matches:
return text, 0
pairs = [(m.group(2).count(".") + 1, len(m.group(1))) for m in all_matches]
depths = [d for d, _ in pairs]
min_depth = min(depths)
max_depth = max(depths)
if max_depth == min_depth:
return text, 0
base_level = min(lv for d, lv in pairs if d == min_depth)
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
hashes, num, title = m.group(1), m.group(2), m.group(3)
depth = num.count(".") + 1
new_level = min(base_level + (depth - min_depth), 6)
if new_level == len(hashes):
return m.group(0)
count += 1
return f"{'#' * new_level} {num}. {title}"
return _NUMBERED_HDR_RE.sub(_repl, text), count
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
return text, 0
def _t_extract_articles(text: str) -> tuple[str, int]:
return _extract_article_headers(text)
def _t_remove_header_bold(text: str) -> tuple[str, int]:
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
return text, 0
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
def _norm(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
return text, 0
def _t_remove_toc(text: str) -> tuple[str, int]:
lines = text.split("\n")
new_lines = []
_in_toc = False
removed = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
removed = True
_in_toc = True
continue
if _in_toc:
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line):
continue
if len(line.strip()) > 200:
_in_toc = False
new_lines.append(line)
continue
_in_toc = False
new_lines.append(line)
return "\n".join(new_lines), 1 if removed else 0
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
count += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
count += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
return "\n\n".join(new_blocks), count
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
count = 0
def _num_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
if not has_exercises:
def _aphorism_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"\n\n### {m.group(1)}.\n\n{content}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
def _list_section_repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
split = re.search(r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa])\s+(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end():].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
return text, count
def _t_extract_math(text: str) -> tuple[str, int]:
return _extract_math_environments(text)
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
_SENTENCE_END = set(".?!\xbb)\"'")
blocks = text.split("\n\n")
merged = []
count = 0
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and not stripped.startswith("|")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if (
not nxt
or nxt.startswith("#")
or nxt.startswith("|")
or re.match(r"^\d+\.", nxt)
or re.match(r"^[-*+]\s", nxt)
):
break
b = stripped + " " + nxt
stripped = b.strip()
count += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
text = re.sub(r"(?m)^\|---\|\s*", "", text)
return text, count
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
return text, 0
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
count = 0
def _demote(m: re.Match) -> str:
nonlocal count
hashes, content = m.group(1), m.group(2).strip()
if not re.search(r"\s\d{1,4}\s*$", content):
return m.group(0)
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
if not re.search(r'[,;:.!?\xbb"\'][\ ]+[A-Za-z\xc0-\xff\xab"“]', inner):
return m.group(0)
count += 1
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
return clean
text = re.sub(r"^(#{1,6})\s+(.{20,})$", _demote, text, flags=re.MULTILINE)
return text, count
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
matches = list(_VERSE_NUM_RE.finditer(stripped))
if len(matches) < 2:
result.append(block)
continue
nums = [int(m.group(2)) for m in matches]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
result.append(block)
continue
step = diffs[0]
def _replace_verse_num(m: re.Match) -> str:
n = int(m.group(2))
sep = "\n\n" if n % (step * 3) == 0 else "\n"
return m.group(1).rstrip() + sep
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
if new_block != stripped:
count += len(matches)
result.append(new_block)
return "\n\n".join(result), count
def _t_remove_urls(text: str) -> tuple[str, int]:
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
next_is_long_hdr = (
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
)
if not next_stripped or (
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_hdr
):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
def _t_merge_title_headers(text: str) -> tuple[str, int]:
return _merge_title_headers(text)
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
def _is_garbage(content: str) -> bool:
if content.lstrip().startswith("..."):
return True
if not re.search(r"[A-Za-z\xc0-\xffΑ-ω]{2,}", content):
return True
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
if re.match(r"^[A-Za-zΑ-ω_]{1,3}\s*[=<>≤≥]", content.strip()):
return True
if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE):
return True
return False
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage(m.group(1)):
count += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text, count
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
count = 0
total = len(blocks)
cutoff = max(5, min(15, int(total * 0.20)))
for i, block in enumerate(blocks):
stripped = block.strip()
if i >= cutoff:
cleaned.append(block)
continue
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
count += 1
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
def _t_remove_watermarks(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if _WATERMARK_RE.match(line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if line.strip() and re.match(r"^[\s■-◿☐-☒•▪▫◆◇●○•]+$", line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
lines = text.split("\n")
short_lines = [
ln.strip() for ln in lines
if 3 < len(ln.strip()) < 80
and not ln.strip().startswith("#")
and not ln.strip().startswith("|")
]
freq = Counter(short_lines)
recurring = {ln for ln, c in freq.items() if c >= 5}
if not recurring:
return text, 0
result, count = [], 0
for line in lines:
if line.strip() in recurring:
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_math_header_demotion(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
m = _MATH_HDR_RE.match(line)
if not m:
result.append(line)
continue
body = m.group(2)
if len(body) <= 100:
result.append(line)
continue
has_math = len(_MATH_SYMBOLS_RE.findall(body)) >= 3
has_exercise = bool(_EXERCISE_TRIGGER_RE.search(body))
if not (has_math or has_exercise):
result.append(line)
continue
nm = _NUMBERED_PREFIX_RE.match(body)
if nm:
result.append(f"**{nm.group(1)}** {nm.group(2)}")
else:
result.append(body)
count += 1
return "\n".join(result), count
# ─── Orchestratore ───────────────────────────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
L'ordine è semantico: encoding → struttura header → costruzione struttura → testo → rifinitura.
"""
_has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE))
_transforms: list[tuple[str | None, object]] = [
("n_simboli_pua_corretti", _t_fix_symbol_font),
("n_immagini_rimosse", _t_remove_images),
("n_br_rimossi", _t_fix_br),
("n_tabsep_rimossi", _t_fix_tabsep),
("n_note_rimosse", _t_remove_footnotes),
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
("n_simboli_math_rimossi", _t_fix_math_symbols),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
(None, _t_normalize_header_levels),
("n_articoli_estratti", _t_extract_articles),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
("toc_rimosso", _t_remove_toc),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
("n_versi_ripristinati", _t_restore_poetry_lines),
("n_header_verso_demotati", _t_demote_verse_headers),
(None, _t_remove_urls),
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
(None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_formula_headers_demotati", _t_math_header_demotion),
("n_frontmatter_rimossi", _t_remove_frontmatter),
("n_watermark_rimossi", _t_remove_watermarks),
]
stats: dict = {}
for stat_key, fn in _transforms:
text, n = fn(text)
if stat_key:
stats[stat_key] = stats.get(stat_key, 0) + n
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
return text, stats
@@ -0,0 +1,4 @@
"""Package transforms: pipeline di pulizia strutturale per Markdown RAG."""
from ._apply import apply_transforms
__all__ = ["apply_transforms"]
@@ -0,0 +1,96 @@
"""Orchestratore: applica le trasformazioni in ordine semantico."""
import re
from functools import partial
from ._encoding import (
_t_fix_symbol_font, _t_fix_accents,
_t_fix_multiplication, _t_fix_micro,
)
from ._artifacts import (
_t_remove_images, _t_fix_br, _t_fix_tabsep, _t_remove_footnotes,
_t_remove_formula_labels, _t_remove_dotleaders, _t_remove_recurring_lines,
_t_fix_math_symbols, _t_remove_watermarks, _t_remove_urls,
)
from ._headers import (
_t_fix_header_concat, _t_extract_capitolo,
_t_normalize_numbered_headings, _t_normalize_header_levels,
_t_remove_header_bold, _t_normalize_allcaps_headers,
)
from ._structure import (
_t_remove_toc, _t_remove_orphan_toc, _t_allcaps_to_headers,
_t_numbered_sections, _t_extract_math, _t_extract_articles,
)
from ._text import (
_t_merge_paragraphs, _t_normalize_whitespace, _t_collapse_blank_lines,
_t_restore_poetry_lines, _t_demote_verse_headers,
)
from ._finish import (
_t_remove_empty_headers, _t_merge_title_headers,
_t_remove_garbage_headers, _t_math_header_demotion,
_t_remove_frontmatter,
)
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
L'ordine è semantico: encoding → artefatti → struttura header →
costruzione struttura → testo → rifinitura.
"""
_has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE))
_transforms: list[tuple[str | None, object]] = [
# 1. Encoding
("n_simboli_pua_corretti", _t_fix_symbol_font),
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
# 2. Pulizia artefatti
("n_immagini_rimosse", _t_remove_images),
("n_br_rimossi", _t_fix_br),
("n_tabsep_rimossi", _t_fix_tabsep),
("n_note_rimosse", _t_remove_footnotes),
("n_simboli_math_rimossi", _t_fix_math_symbols),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
# 3. Struttura header
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
(None, _t_normalize_header_levels),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
# 4. Costruzione struttura
("toc_rimosso", _t_remove_toc),
("n_toc_orfani_rimossi", _t_remove_orphan_toc),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_articoli_estratti", _t_extract_articles),
# 5. Testo
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
("n_versi_ripristinati", _t_restore_poetry_lines),
("n_header_verso_demotati", _t_demote_verse_headers),
(None, _t_remove_urls),
# 6. Rifinitura
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
(None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_formula_headers_demotati", _t_math_header_demotion),
("n_frontmatter_rimossi", _t_remove_frontmatter),
("n_watermark_rimossi", _t_remove_watermarks),
]
stats: dict = {}
for stat_key, fn in _transforms:
text, n = fn(text)
if stat_key:
stats[stat_key] = stats.get(stat_key, 0) + n
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
return text, stats
@@ -0,0 +1,106 @@
"""Rimozione artefatti: immagini, BR, footnote, URL, righe ricorrenti, watermark."""
import re
from collections import Counter
from ._constants import (
_WATERMARK_RE, _TABSEP_RE, _SUPERSCRIPT_RE, _FOOTNOTE_BODY_RE,
)
def _t_remove_images(text: str) -> tuple[str, int]:
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
return text, n
def _t_fix_br(text: str) -> tuple[str, int]:
n = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
return text, n
def _t_fix_tabsep(text: str) -> tuple[str, int]:
n = len(_TABSEP_RE.findall(text))
text = _TABSEP_RE.sub("", text)
return text, n
def _t_remove_footnotes(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
stripped = line.strip()
if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300:
count += 1
continue
cleaned = _SUPERSCRIPT_RE.sub("", line)
if cleaned != line:
count += 1
result.append(cleaned)
return "\n".join(result), count
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
n = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
return text, n
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
return text, n
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
lines = text.split("\n")
short_lines = [
ln.strip() for ln in lines
if 3 < len(ln.strip()) < 80
and not ln.strip().startswith("#")
and not ln.strip().startswith("|")
]
freq = Counter(short_lines)
recurring = {ln for ln, c in freq.items() if c >= 5}
if not recurring:
return text, 0
result, count = [], 0
for line in lines:
if line.strip() in recurring:
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if line.strip() and re.match(r"^[\s■-◿☐-☒•▪▫◆◇●○•]+$", line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_watermarks(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if _WATERMARK_RE.match(line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_urls(text: str) -> tuple[str, int]:
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
@@ -0,0 +1,161 @@
"""
Costanti di modulo condivise tra i moduli di trasformazione.
Tutte le regex compilate e le mappe statiche vivono qui.
"""
import re
# ─── Keyword sets ─────────────────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# ─── PUA Symbol font map ──────────────────────────────────────────────────────
_SYMBOL_PUA_MAP: dict[str, str] = {
"": " ",
"": "(",
"": ")",
"": "+",
"": "",
"": ".",
"": "/",
"": "0", "": "1", "": "2", "": "3", "": "4",
"": "5", "": "6", "": "7", "": "8", "": "9",
"": ":", "": ";", "": "<", "": "=", "": ">",
"": "",
"": "Α", "": "Β", "": "Χ", "": "Δ", "": "Ε",
"": "Φ", "": "Γ", "": "Η", "": "Ι", "": "ϑ",
"": "Κ", "": "Λ", "": "Μ", "": "Ν", "": "Ο",
"": "Π", "": "Θ", "": "Ρ", "": "Σ", "": "Τ",
"": "Υ", "": "ς", "": "Ω", "": "Ξ", "": "Ψ",
"": "Ζ",
"": "[",
"": "",
"": "]",
"": "",
"": "α", "": "β", "": "χ", "": "δ", "": "ε",
"": "φ", "": "γ", "": "η", "": "ι", "": "ϕ",
"": "κ", "": "λ", "": "μ", "": "ν", "": "ο",
"": "π", "": "θ", "": "ρ", "": "σ", "": "τ",
"": "υ", "": "ϖ", "": "ω", "": "ξ", "": "ψ",
"": "ζ",
"": "{",
"": "|",
"": "}",
"": "~",
"": "±",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "×",
"": "÷",
"": "×",
"": "",
"": "",
"": "",
"": "",
"": "*",
"": ",",
"": "",
"": "",
"": "",
"": "",
"": "÷",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "",
# TeX Computer Modern bracket/delimiter pieces (U+F8EBF8FE) → stringa vuota
"": "", # TeX large paren left
"": "", # TeX large paren extension
"": "", # TeX large paren right
"": "", # TeX large paren right ext
"": "", # TeX large bracket left
"": "", # TeX large bracket ext
"": "", # TeX brace top-left
"": "", # TeX brace mid
"": "", # TeX brace mid-right
"": "", # TeX brace extension
"": "", # TeX brace right
"": "", # TeX bracket right large
"": "", # TeX bracket right ext
"": "", # TeX bracket right close
"": "", # TeX integral large
"": "", # TeX integral extension
"": "", # TeX integral top
"": "", # TeX radical top
"": "", # TeX radical extension
"": "", # TeX arrowhead
}
_SYMBOL_PUA_RE = re.compile(
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
)
# ─── Regex compilate condivise ────────────────────────────────────────────────
_SUPERSCRIPT_RE = re.compile(r'[¹²³⁰⁴-⁹]+')
_FOOTNOTE_BODY_RE = re.compile(
r'^([¹²³⁰⁴-⁹]+\s+|\[\d{1,3}\]\s+)'
)
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
_BIB_MARKERS_RE = re.compile(
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
r'|\b(19|20)\d{2}\b',
re.IGNORECASE,
)
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
_TABSEP_RE = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
_VERSE_NUM_RE = re.compile(
r"([.!?\xbb'\"" + "" + r"]\s+)(\d+)(\s+)(?=[A-Z\xc0-\xd9a-z\xe0-\xf9\xab“”‟])"
)
# Math header demotion
_MATH_SYMBOLS_RE = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_EXERCISE_TRIGGER_RE = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
_MATH_HDR_RE = re.compile(r"^(#{2,3})\s+(.+)$")
_NUMBERED_PREFIX_RE = re.compile(r"^(\d+(?:\.\d+)*[.)])\s+(.+)$", re.DOTALL)
# Orphan TOC: voce di indice senza dot-leader (es. "3. Funzioni 174")
_TOC_ITEM_RE = re.compile(
r"^\d+(\.\d+)*\.?\s+[A-Za-zÀ-ú\'\(][^\n]{2,70}$"
)
_TOC_HDR_WITH_PAGE_RE = re.compile(
r"^#{1,3}\s+\d+\.?\s+.{3,60}\s+\d{1,4}$"
)
@@ -0,0 +1,45 @@
"""Trasformazioni di encoding: PUA font Symbol, accenti LaTeX, simboli SI."""
import re
from ._constants import _SYMBOL_PUA_MAP, _SYMBOL_PUA_RE
def _t_fix_symbol_font(text: str) -> tuple[str, int]:
count = [0]
def _repl(m: re.Match) -> str:
count[0] += 1
return _SYMBOL_PUA_MAP[m.group(0)]
result = _SYMBOL_PUA_RE.sub(_repl, text)
return result, count[0]
def _t_fix_accents(text: str) -> tuple[str, int]:
_ACCENT_MAP = {
"e": "\xe8", "E": "\xc8", "a": "\xe0", "A": "\xc0",
"u": "\xf9", "U": "\xd9", "i": "\xec", "I": "\xcc",
"o": "\xf2", "O": "\xd2",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
n_accenti = n_bt_before - text.count("`")
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
n_accenti += n_bt_orfani
return text, n_accenti
def _t_fix_multiplication(text: str) -> tuple[str, int]:
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
return text, n
def _t_fix_micro(text: str) -> tuple[str, int]:
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
return text, n
+116
View File
@@ -0,0 +1,116 @@
"""Trasformazioni di rifinitura: header vuoti, garbage, demozione formula-header, frontmatter."""
import re
from ._constants import (
_FM_RE, _MATH_HDR_RE, _MATH_SYMBOLS_RE,
_EXERCISE_TRIGGER_RE, _NUMBERED_PREFIX_RE,
)
from ._helpers import _merge_title_headers
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
next_is_long_hdr = (
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
)
if not next_stripped or (
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_hdr
):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
def _t_merge_title_headers(text: str) -> tuple[str, int]:
return _merge_title_headers(text)
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
def _is_garbage(content: str) -> bool:
if content.lstrip().startswith("..."):
return True
if not re.search(r"[A-Za-z\xc0-\xffΑ-ω]{2,}", content):
return True
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
if re.match(r"^[A-Za-zΑ-ω_]{1,3}\s*[=<>≤≥]", content.strip()):
return True
if re.match(
r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d",
content.strip(), re.IGNORECASE,
):
return True
return False
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage(m.group(1)):
count += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text, count
def _t_math_header_demotion(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
m = _MATH_HDR_RE.match(line)
if not m:
result.append(line)
continue
body = m.group(2)
if len(body) <= 100:
result.append(line)
continue
has_math = len(_MATH_SYMBOLS_RE.findall(body)) >= 3
has_exercise = bool(_EXERCISE_TRIGGER_RE.search(body))
if not (has_math or has_exercise):
result.append(line)
continue
nm = _NUMBERED_PREFIX_RE.match(body)
if nm:
result.append(f"**{nm.group(1)}** {nm.group(2)}")
else:
result.append(body)
count += 1
return "\n".join(result), count
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
count = 0
total = len(blocks)
cutoff = max(5, min(15, int(total * 0.20)))
for i, block in enumerate(blocks):
stripped = block.strip()
if i >= cutoff:
cleaned.append(block)
continue
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
count += 1
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
@@ -0,0 +1,110 @@
"""Trasformazioni sulla struttura degli header: normalizzazione livelli, concat, bold."""
import re
from ._constants import _NUMBERED_HDR_RE
from ._helpers import _sentence_case
def _t_fix_header_concat(text: str) -> tuple[str, int]:
count = 0
def _fix(m: re.Match) -> str:
nonlocal count
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
skip = min(10, len(full) // 3)
split = re.search(
r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa\xe4])"
r"(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])",
full[skip:],
)
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
count += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
return text, count
def _t_extract_capitolo(text: str) -> tuple[str, int]:
def _repl(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L]"
r"[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_repl,
text,
)
return text, 0
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
if not all_matches:
return text, 0
pairs = [(m.group(2).count(".") + 1, len(m.group(1))) for m in all_matches]
depths = [d for d, _ in pairs]
min_depth = min(depths)
max_depth = max(depths)
if max_depth == min_depth:
return text, 0
base_level = min(lv for d, lv in pairs if d == min_depth)
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
hashes, num, title = m.group(1), m.group(2), m.group(3)
depth = num.count(".") + 1
new_level = min(base_level + (depth - min_depth), 6)
if new_level == len(hashes):
return m.group(0)
count += 1
return f"{'#' * new_level} {num}. {title}"
return _NUMBERED_HDR_RE.sub(_repl, text), count
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
return text, 0
def _t_remove_header_bold(text: str) -> tuple[str, int]:
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
return text, 0
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
def _norm(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
return text, 0
@@ -0,0 +1,153 @@
"""Funzioni helper pure condivise tra i moduli di trasformazione."""
import re
from ._constants import _ORDINALS_IT, _ORDINALS_EN
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|")
)
def _allcaps_to_header(raw_line: str) -> str:
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
_ENVS = (
r"Definizione|Definition|Teorema|Theorem|Lemma|"
r"Proposizione|Proposition|Corollario|Corollary|"
r"Osservazione|Remark|Nota|Note|Esempio|Example"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
title_m = re.match(
r"^([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda].{1,74}?)\.\s+"
r"([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
@@ -0,0 +1,184 @@
"""Costruzione struttura: TOC, ALLCAPS→##, sezioni numerate, ambienti matematici, articoli."""
import re
from ._constants import (
_TOC_KEYWORDS, _BIB_MARKERS_RE,
_TOC_ITEM_RE, _TOC_HDR_WITH_PAGE_RE,
)
from ._helpers import (
_is_allcaps_line, _allcaps_to_header,
_extract_math_environments, _extract_article_headers,
)
def _t_remove_toc(text: str) -> tuple[str, int]:
lines = text.split("\n")
new_lines = []
_in_toc = False
removed = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
removed = True
_in_toc = True
continue
if _in_toc:
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line):
continue
if len(line.strip()) > 200:
_in_toc = False
new_lines.append(line)
continue
_in_toc = False
new_lines.append(line)
return "\n".join(new_lines), 1 if removed else 0
def _t_remove_orphan_toc(text: str) -> tuple[str, int]:
"""
Rimuove voci di sommario senza dot-leader che sfuggono a _t_remove_toc.
Rileva: (a) blocchi di 3+ righe consecutive che matchano il pattern TOC
nei primi 25% del documento; (b) header ### N. Titolo PAGINA il cui corpo
è una lista di voci numerate.
"""
blocks = re.split(r"\n{2,}", text)
total = len(blocks)
cutoff = max(10, min(40, int(total * 0.25)))
to_drop = set()
i = 0
while i < cutoff and i < total:
b = blocks[i].strip()
# (a) Sequenza di 3+ blocchi TOC consecutivi
if _TOC_ITEM_RE.match(b):
j = i
while j < min(cutoff, i + 60) and j < len(blocks) and _TOC_ITEM_RE.match(blocks[j].strip()):
j += 1
if j - i >= 3:
for k in range(i, j):
to_drop.add(k)
# Rimuovi anche l'header ### precedente se ha numero di pagina
if i > 0 and _TOC_HDR_WITH_PAGE_RE.match(blocks[i - 1].strip()):
to_drop.add(i - 1)
i = j
continue
# (b) Header ### N. Titolo PAGINA con corpo che è lista di voci numerate
if _TOC_HDR_WITH_PAGE_RE.match(b):
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
# Il corpo contiene 2+ occorrenze di "N. Titolo"
toc_hits = re.findall(r"\d+\.?\s+[A-Za-zÀ-ú]", body)
if len(toc_hits) >= 2 and len(body) < 300:
to_drop.add(i)
if i + 1 < total:
to_drop.add(i + 1)
i += 2
continue
i += 1
if not to_drop:
return text, 0
kept = [b for idx, b in enumerate(blocks) if idx not in to_drop]
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(kept)), len(to_drop)
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
count += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
count += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
return "\n\n".join(new_blocks), count
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
count = 0
def _num_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
if not has_exercises:
def _aphorism_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"\n\n### {m.group(1)}.\n\n{content}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
def _list_section_repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
split = re.search(
r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa])\s+"
r"(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])",
content,
)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end():].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
return text, count
def _t_extract_math(text: str) -> tuple[str, int]:
return _extract_math_environments(text)
def _t_extract_articles(text: str) -> tuple[str, int]:
return _extract_article_headers(text)
+109
View File
@@ -0,0 +1,109 @@
"""Trasformazioni sul testo: merge paragrafi, whitespace, poesia, versi."""
import re
from ._constants import _VERSE_NUM_RE
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
_SENTENCE_END = set(".?!\xbb)\"'")
blocks = text.split("\n\n")
merged = []
count = 0
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and not stripped.startswith("|")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if (
not nxt
or nxt.startswith("#")
or nxt.startswith("|")
or re.match(r"^\d+\.", nxt)
or re.match(r"^[-*+]\s", nxt)
):
break
b = stripped + " " + nxt
stripped = b.strip()
count += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
text = re.sub(r"(?m)^\|---\|\s*", "", text)
return text, count
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
return text, 0
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
matches = list(_VERSE_NUM_RE.finditer(stripped))
if len(matches) < 2:
result.append(block)
continue
nums = [int(m.group(2)) for m in matches]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
result.append(block)
continue
step = diffs[0]
def _replace_verse_num(m: re.Match) -> str:
n = int(m.group(2))
sep = "\n\n" if n % (step * 3) == 0 else "\n"
return m.group(1).rstrip() + sep
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
if new_block != stripped:
count += len(matches)
result.append(new_block)
return "\n\n".join(result), count
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
count = 0
def _demote(m: re.Match) -> str:
nonlocal count
hashes, content = m.group(1), m.group(2).strip()
if not re.search(r"\s\d{1,4}\s*$", content):
return m.group(0)
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
if not re.search(r'[,;:.!?\xbb"\'][\ ]+[A-Za-z\xc0-\xff\xab""]', inner):
return m.group(0)
count += 1
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
return clean
text = re.sub(r"^(#{1,6})\s+(.{20,})$", _demote, text, flags=re.MULTILINE)
return text, count
@@ -0,0 +1,560 @@
# Pipeline ottimizzazione PDF→Markdown — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Eliminare la necessità di revisione manuale del `clean.md` ottimizzando i parametri di opendataloader-pdf e aggiungendo trasformazioni mirate per tutti i tipi di PDF.
**Architecture:** Quattro file modificati: `converter.py` (parametri adattivi + rilevamento PDF taggato), `transforms.py` (PUA bracket TeX + demozione header-formula), `report.py` (nuova metrica residua), `validator.py` (nuova penalità). Nessun cambio all'API pubblica di `_pipeline`.
**Tech Stack:** Python 3.12, opendataloader-pdf (Java), PyMuPDF (fitz), regex
---
## File modificati
| File | Tipo | Responsabilità |
|------|------|----------------|
| `conversione/_pipeline/converter.py` | Modify | `_is_tagged_pdf()` + nuovi parametri convert |
| `conversione/_pipeline/transforms.py` | Modify | PUA bracket TeX + `_t_math_header_demotion` |
| `conversione/_pipeline/report.py` | Modify | `formula_headers_residui` nella sezione residui |
| `conversione/_pipeline/validator.py` | Modify | Penalità formula headers |
---
## Task 1: Converter adattivo — `_is_tagged_pdf()` + nuovi parametri
**Files:**
- Modify: `conversione/_pipeline/converter.py`
- [ ] **Step 1: Leggi il file attuale**
```bash
cat conversione/_pipeline/converter.py
```
- [ ] **Step 2: Sostituisci interamente il contenuto**
Il nuovo `converter.py` aggiunge `_is_tagged_pdf()` (usa fitz per controllare `StructTreeRoot` nel catalog del PDF) e passa i nuovi parametri a `opendataloader_pdf.convert()`:
- `table_method="cluster"` — sempre attivo, migliora tabelle senza bordi
- `content_safety_off=["tiny", "hidden-ocg"]` — evita filtraggio di footnote e layer OCG
- `use_struct_tree=tagged` — attivo solo se PDF è taggato
```python
from pathlib import Path
def _is_tagged_pdf(pdf_path: Path) -> bool:
try:
import fitz
doc = fitz.open(str(pdf_path))
tagged = "StructTreeRoot" in doc.pdf_catalog()
doc.close()
return tagged
except Exception:
return False
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
"""
Converte il PDF in Markdown tramite opendataloader-pdf.
Scrive il file nella out_dir e restituisce il percorso.
Parametri scelti per output RAG-ottimale:
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
- sanitize=False → preserva il testo originale
- image_output="off" → nessuna immagine estratta né referenziata
- table_method="cluster" → rileva tabelle senza bordi visibili
- content_safety_off → evita filtraggio di footnote e layer OCG
- use_struct_tree → attivo se PDF è taggato (Word/InDesign)
"""
import opendataloader_pdf
out_dir.mkdir(parents=True, exist_ok=True)
tagged = _is_tagged_pdf(pdf_path)
opendataloader_pdf.convert(
input_path=str(pdf_path),
output_dir=str(out_dir),
format="markdown",
keep_line_breaks=False,
reading_order="xycut",
sanitize=False,
image_output="off",
table_method="cluster",
content_safety_off=["tiny", "hidden-ocg"],
use_struct_tree=tagged,
quiet=True,
)
md_file = out_dir / f"{pdf_path.stem}.md"
if not md_file.exists():
candidates = list(out_dir.glob("*.md"))
if not candidates:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
content = md_file.read_text(encoding="utf-8", errors="replace").strip()
if len(content) < 100:
raise RuntimeError(
f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) "
f"— il PDF potrebbe essere corrotto o non supportato"
)
return md_file
```
- [ ] **Step 3: Verifica sintattica**
```bash
.venv/bin/python -c "from conversione._pipeline.converter import convert_pdf, _is_tagged_pdf; print('OK')"
```
Atteso: `OK`
- [ ] **Step 4: Commit**
```bash
git add conversione/_pipeline/converter.py
git commit -m "feat(converter): parametri adattivi — use_struct_tree, cluster tables, content-safety"
```
---
## Task 2: Aggiunta PUA bracket TeX (U+F8EBU+F8FE)
**Files:**
- Modify: `conversione/_pipeline/transforms.py` (sezione `_SYMBOL_PUA_MAP`, righe ~28127)
Questi codepoint sono pezzi di parentesi/bracket grandi del font Computer Modern (TeX), non ricostruibili come singolo simbolo → mappati a `""`.
- [ ] **Step 1: Aggiungi le entries mancanti alla fine di `_SYMBOL_PUA_MAP`**
Individua la riga `"": "", # bracket extension piece (non ricostruibile)` (circa riga 122) e aggiungi **dopo** l'ultima entry esistente della mappa (prima della `}`):
```python
"": "", # TeX large paren left
"": "", # TeX large paren extension
"": "", # TeX large paren right
"": "", # TeX large paren right extension
"": "", # TeX large bracket left
"": "", # TeX large bracket extension
"": "", # TeX brace top-left
"": "", # TeX brace mid
"": "", # TeX brace mid-right
"": "", # TeX brace extension
"": "", # TeX brace right
"": "", # TeX bracket right large
"": "", # TeX bracket right extension
"": "", # TeX bracket right close
"": "", # TeX integral large
"": "", # TeX integral extension
"": "", # TeX integral top
"": "", # TeX radical top
"": "", # TeX radical extension
"": "", # TeX arrowhead
```
- [ ] **Step 2: Verifica che _SYMBOL_PUA_RE si aggiorni automaticamente**
```bash
.venv/bin/python -c "
from conversione._pipeline.transforms import _SYMBOL_PUA_MAP, _SYMBOL_PUA_RE
pua_chars = ['', '', '', '']
for c in pua_chars:
assert c in _SYMBOL_PUA_MAP, f'Manca {repr(c)}'
assert _SYMBOL_PUA_RE.search(c), f'Regex non cattura {repr(c)}'
print(f'OK — {len(_SYMBOL_PUA_MAP)} PUA chars mappati')
"
```
Atteso: `OK — N PUA chars mappati` (N > 90)
- [ ] **Step 3: Verifica sostituzione su testo di esempio**
```bash
.venv/bin/python -c "
from conversione._pipeline.transforms import apply_transforms
testo = 'Sia x = f(n) e n la parentesi grande.'
pulito, stats = apply_transforms(testo)
assert '' not in pulito
assert '' not in pulito
print('Testo pulito:', repr(pulito))
print('PUA corretti:', stats['n_simboli_pua_corretti'])
"
```
Atteso: nessun PUA nel testo pulito, `n_simboli_pua_corretti` > 0.
- [ ] **Step 4: Commit**
```bash
git add conversione/_pipeline/transforms.py
git commit -m "feat(transforms): aggiungi PUA bracket TeX U+F8EB-F8FE alla mappa simboli"
```
---
## Task 3: Nuova trasformazione `_t_math_header_demotion`
**Files:**
- Modify: `conversione/_pipeline/transforms.py`
Demota a testo semplice gli header `##`/`###` che sono enunciati di esercizi o formule lunghe (non titoli di sezione reali).
**Criteri di demozione** (almeno uno tra math e exercise deve valere):
- Livello `##` o `###`
- Lunghezza testo (senza `#`) > 100 caratteri
- `math`: ≥ 3 simboli matematici nell'header (da set: `=`, `+`, `∈`, `∀`, `∃`, `≤`, `≥`, `∞`, `∑`, `∫`, `∂`, `→`, `↔`, `⊂`, `⊃`, `∩`, ``, lettere greche Unicode U+03B1U+03C9 e U+0391U+03A9)
- `exercise`: matcha pattern traccia (`\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that|Compute|Calculate|Dimostrare|Verificare)\b`)
**Output**: rimuove `#+ `. Se la riga inizia con `N. ` (numero + punto), converte in `**N.** resto`. Altrimenti testo plain.
- [ ] **Step 1: Aggiungi costante regex a livello di modulo** (dopo le costanti esistenti, prima di `_SYMBOL_PUA_MAP`)
Trova la riga `_VERSE_NUM_RE = re.compile(` (circa riga 160) e aggiungi **dopo**:
```python
_MATH_SYMBOLS_RE = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_EXERCISE_TRIGGER_RE = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
_MATH_HDR_RE = re.compile(r"^(#{2,3})\s+(.+)$")
_NUMBERED_PREFIX_RE = re.compile(r"^(\d+(?:\.\d+)*[.)])\s+(.+)$", re.DOTALL)
```
- [ ] **Step 2: Aggiungi la funzione `_t_math_header_demotion`** (prima dell'orchestratore `apply_transforms`)
Trova la riga `# ─── Orchestratore` e aggiungi **prima**:
```python
def _t_math_header_demotion(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
m = _MATH_HDR_RE.match(line)
if not m:
result.append(line)
continue
body = m.group(2)
if len(body) <= 100:
result.append(line)
continue
has_math = len(_MATH_SYMBOLS_RE.findall(body)) >= 3
has_exercise = bool(_EXERCISE_TRIGGER_RE.search(body))
if not (has_math or has_exercise):
result.append(line)
continue
nm = _NUMBERED_PREFIX_RE.match(body)
if nm:
result.append(f"**{nm.group(1)}** {nm.group(2)}")
else:
result.append(body)
count += 1
return "\n".join(result), count
```
- [ ] **Step 3: Registra la trasformazione in `_transforms`**
Nell'orchestratore `apply_transforms`, trova la riga:
```python
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
```
e aggiungi **dopo**:
```python
("n_formula_headers_demotati", _t_math_header_demotion),
```
- [ ] **Step 4: Aggiungi la stat key al print in `runner.py`**
Trova in `conversione/_pipeline/runner.py` il blocco di print delle statistiche (dopo `apply_transforms`) e aggiungi:
```python
print(f" Formula-hdr demotati: {t['n_formula_headers_demotati']}")
```
- [ ] **Step 5: Verifica su caso sintetico**
```bash
.venv/bin/python -c "
from conversione._pipeline.transforms import apply_transforms
# Caso 1: header esercizio lungo → deve essere demotato
testo = '### 3. Si dimostri la formula per le equazioni di secondo grado ax^2 + bx + c = 0 e si analizzi il segno del discriminante b^2 - 4ac per tutti i valori reali.'
pulito, stats = apply_transforms(testo)
assert '###' not in pulito, f'Header non demotato: {pulito!r}'
print('Caso 1 OK:', pulito[:80])
# Caso 2: header titolo corto → NON deve essere demotato
testo2 = '### Teorema di Cauchy'
pulito2, _ = apply_transforms(testo2)
assert '###' in pulito2, f'Header legittimo demotato: {pulito2!r}'
print('Caso 2 OK:', pulito2)
# Caso 3: header con molti simboli math + lungo → demotato
testo3 = '### Sia f: R→R tale che ∀x∈R si abbia f(x) = ∑_{n=0}^{∞} aₙxⁿ con ∫f dx = g(x) + C per ogni x∈[a,b].'
pulito3, stats3 = apply_transforms(testo3)
print('Caso 3:', '###' not in pulito3, stats3.get('n_formula_headers_demotati'))
print('Stats:', stats.get('n_formula_headers_demotati'))
"
```
Atteso: Caso 1 e 3 demotati, Caso 2 intatto.
- [ ] **Step 6: Commit**
```bash
git add conversione/_pipeline/transforms.py conversione/_pipeline/runner.py
git commit -m "feat(transforms): aggiungi _t_math_header_demotion per header esercizi e formule"
```
---
## Task 4: `report.py` — metrica `formula_headers_residui`
**Files:**
- Modify: `conversione/_pipeline/report.py`
- [ ] **Step 1: Aggiungi funzione di scan formula-header e integrala nel report**
Nella funzione `build_report()`, dopo la definizione di `_scan()` (circa riga 53), aggiungi:
```python
def _scan_formula_headers(max_n: int = 10) -> list[dict]:
_math_sym = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_ex_trigger = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
hits = []
for i, line in enumerate(text_lines):
m = re.match(r"^(#{2,3})\s+(.+)$", line)
if not m:
continue
body = m.group(2)
if len(body) <= 100:
continue
has_math = len(_math_sym.findall(body)) >= 3
has_ex = bool(_ex_trigger.search(body))
if has_math or has_ex:
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
```
- [ ] **Step 2: Aggiungi la metrica ai `residui`**
Trova nel dict `residui` la riga:
```python
"pua_markers": _scan(r'[-]'),
```
e aggiungi **dopo**:
```python
"formula_headers": _scan_formula_headers(),
```
Poi nel dict principale `report["residui"]`, trova la riga:
```python
"pua_markers_esempi": residui["pua_markers"],
```
e aggiungi **dopo**:
```python
"formula_headers": len(residui["formula_headers"]),
"formula_headers_esempi": residui["formula_headers"],
```
- [ ] **Step 3: Verifica**
```bash
.venv/bin/python -c "
import json
from pathlib import Path
from conversione._pipeline.report import build_report
from conversione._pipeline.transforms import apply_transforms
testo = open('conversione/analisi1/raw.md').read()
clean, t = apply_transforms(testo)
from conversione._pipeline.structure import analyze
tmp = Path('/tmp/test_report')
tmp.mkdir(exist_ok=True)
(tmp / 'clean.md').write_text(clean)
profile = analyze(tmp / 'clean.md')
rp = build_report('test', tmp, clean, t, profile, 5.0)
r = json.loads(rp.read_text())
print('formula_headers residui:', r['residui']['formula_headers'])
print('formula_headers esempi:', len(r['residui']['formula_headers_esempi']))
"
```
Atteso: count numerico (può essere 0 se la demozione ha funzionato bene), nessun errore.
- [ ] **Step 4: Commit**
```bash
git add conversione/_pipeline/report.py
git commit -m "feat(report): aggiungi metrica formula_headers_residui"
```
---
## Task 5: `validator.py` — penalità formula headers
**Files:**
- Modify: `conversione/_pipeline/validator.py`
- [ ] **Step 1: Aggiungi la penalità in `_score()`**
Trova in `_score()` la riga:
```python
_pen("pua_markers", 2, 20, "caratteri PUA font Symbol")
```
e aggiungi **dopo**:
```python
_pen("formula_headers", 3, 15, "formula/esercizio come header")
```
- [ ] **Step 2: Aggiungi colonna `fhdr` nell'output tabellare di `validate()`**
Trova in `validate()` la riga che costruisce `header`:
```python
header = (
f"{'stem':<{col}}"
f"{'h2':>4}{'h3':>5} "
f"{'strategia':<18}"
f"{'bare':>5}{'corte':>6}{'lunghe':>7}"
f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}"
f"{'med':>6}"
f" {'voto':>4} grade"
)
```
Sostituiscila con:
```python
header = (
f"{'stem':<{col}}"
f"{'h2':>4}{'h3':>5} "
f"{'strategia':<18}"
f"{'bare':>5}{'corte':>6}{'lunghe':>7}"
f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}{'fhdr':>5}"
f"{'med':>6}"
f" {'voto':>4} grade"
)
```
Trova il `print(...)` dentro il loop `for r in rows:` e aggiungi `fhdr`:
```python
print(
f"{r['stem']:<{col}}"
f"{st.get('n_h2', 0):>4}"
f"{st.get('n_h3', 0):>5} "
f"{st.get('strategia_chunking','?'):<18}"
f"{an.get('bare_headers', 0):>5}"
f"{an.get('short_sections', 0):>6}"
f"{an.get('long_sections', 0):>7}"
f"{res.get('backtick', 0):>5}"
f"{res.get('br_inline', 0):>4}"
f"{res.get('simboli_encoding', 0):>4}"
f"{res.get('url', 0):>4}"
f"{res.get('formula_headers', 0):>5}"
f"{dist.get('mediana', 0):>6}"
f" {s:>4} {_grade(s)}"
)
```
Aggiorna anche la riga finale `print("\nColonne: ...")`:
```python
print(
"\nColonne: bare=header vuoti corte=sez<150ch lunghe=sez>1500ch "
"btk=backtick br=<br>inline enc=simboli encoding fhdr=formula-header med=mediana chars\n"
)
```
- [ ] **Step 3: Verifica**
```bash
.venv/bin/python -c "
from conversione._pipeline.validator import _score
r = {'structure': {'livello_struttura': 3}, 'anomalie': {}, 'residui': {'formula_headers': 5}}
score, detail = _score(r)
print(score, detail)
assert any('formula' in d for d in detail), 'Penalità formula non applicata'
print('OK')
"
```
Atteso: penalità `formula/esercizio come header ×5 15` nel detail.
- [ ] **Step 4: Commit**
```bash
git add conversione/_pipeline/validator.py
git commit -m "feat(validator): aggiungi penalità formula_headers, colonna fhdr nel report"
```
---
## Task 6: Test di integrazione su analisi1
- [ ] **Step 1: Riesegui la pipeline su analisi1**
```bash
.venv/bin/python conversione/ --stem analisi1 --force 2>&1
```
Atteso: completamento senza errori, print `Formula-hdr demotati: N` visibile.
- [ ] **Step 2: Valida e confronta con il report precedente**
```bash
.venv/bin/python conversione/ validate analisi1 --detail
```
Confronta con il vecchio voto del `report.json` originale. Il voto deve essere ≥ al precedente.
- [ ] **Step 3: Verifica riduzione PUA bracket**
```bash
python3 -c "
import json
r = json.load(open('conversione/analisi1/report.json'))
pua = r['residui']['pua_markers']
fhdr = r['residui'].get('formula_headers', 'N/A')
print(f'PUA residui: {pua} (era 10+ prima)')
print(f'Formula headers residui: {fhdr}')
"
```
Atteso: `pua_markers` ridotto rispetto al run precedente (era 10 nel report originale).
- [ ] **Step 4: Commit finale se tutto OK**
```bash
git add conversione/analisi1/
git commit -m "chore: rigenera output analisi1 con pipeline ottimizzata"
```
@@ -0,0 +1,80 @@
# Pipeline ottimizzazione — Design Spec
*2026-04-30*
## Obiettivo
Eliminare la necessità di revisione manuale del `clean.md` per tutti i tipi di PDF (accademici/matematici, giuridici, tecnici) ottimizzando i parametri di opendataloader-pdf e aggiungendo trasformazioni mirate.
## Scope
Nessun hybrid backend. Solo Java + trasformazioni Python.
---
## 1. `converter.py` — Parametri adattivi
### 1.1 Rilevamento PDF taggato
Funzione `_is_tagged_pdf(pdf_path) -> bool` usando PyMuPDF (`fitz`):
```python
doc = fitz.open(str(pdf_path))
tagged = "StructTreeRoot" in doc.pdf_catalog()
doc.close()
```
### 1.2 Nuovi parametri fissi (tutti i PDF)
- `table_method="cluster"` — tabelle senza bordi visibili
- `content_safety_off=["tiny", "hidden-ocg"]` — evita filtraggio di footnote e layer OCG
### 1.3 Parametro condizionale
- `use_struct_tree=tagged` — attivo solo se il PDF è taggato
Una sola conversione Java, zero overhead per PDF non taggati.
---
## 2. `transforms.py` — Due aggiunte
### 2.1 PUA bracket TeX (U+F8EBF8F8)
Aggiunge al `_SYMBOL_PUA_MAP` i glifoni bracket di Computer Modern font che appaiono come PUA:
`U+F8EB, U+F8EC, U+F8ED, U+F8EE, U+F8EF, U+F8F0, U+F8F1, U+F8F2, U+F8F3, U+F8F4, U+F8F5, U+F8F6, U+F8F7, U+F8F8, U+F8F9, U+F8FA, U+F8FB, U+F8FC, U+F8FD, U+F8FE`
→ tutti mappati a `""` (pezzi di parentesi non ricostruibili come singolo glifo)
Il `_SYMBOL_PUA_RE` si aggiorna automaticamente essendo costruito dalla mappa.
### 2.2 Nuova trasformazione `_t_math_header_demotion`
Demota a testo semplice gli header `##`/`###` che sono in realtà enunciati di esercizi o formule lunghe.
**Criteri di demozione** (tutti devono valere):
- Livello `##` o `###`
- Lunghezza testo > 100 caratteri
- Almeno uno tra:
- ≥ 3 simboli matematici (`=`, `+`, `∈`, `∀`, `∃`, `≤`, `≥`, `∞`, lettere greche Unicode, `lim`, `sup`, `inf`, `∑`, `∫`)
- Matcha pattern traccia esercizio: `(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show|Compute|Calculate)\b`
**Output**: rimuove `#+ ` iniziale. Se numerata (`N. testo`), converte in `**N.** testo`. Altrimenti testo plain.
**Posizione in `_transforms`**: gruppo "Rifinitura", dopo `_t_garbage_headers`.
**Stat key**: `n_formula_headers_demotati`
---
## 3. `report.py` — Nuova metrica residua
`build_report()` aggiunge contatore `formula_headers_residui`:
- Conta header `##`/`###` nel `clean.md` finale che superano ancora i criteri math (sopra)
- Mostra fino a 3 esempi in `formula_headers_esempi`
---
## 4. `validator.py` — Nuova penalità
| Problema | Penalità | Cap |
|----------|----------|-----|
| Formula/esercizio come header residuo | 3/cad | 15 |
---
## File modificati
1. `conversione/_pipeline/converter.py``_is_tagged_pdf()` + nuovi parametri
2. `conversione/_pipeline/transforms.py` — PUA map + `_t_math_header_demotion`
3. `conversione/_pipeline/report.py``formula_headers_residui`
4. `conversione/_pipeline/validator.py` — nuova penalità