Files
rag-from-scratch/conversione/_pipeline/transforms.py
T
davide faa8acae84 feat(pipeline): ottimizzazione completa PDF→Markdown senza revisione manuale
- converter: parametri adattivi (use_struct_tree per PDF taggati, table_method=cluster, content_safety_off)
- transforms: +20 PUA bracket TeX U+F8EB-F8FE (290 simboli corretti su analisi1)
- transforms: _t_math_header_demotion — demota header ##/### che sono enunciati esercizi o formule
- report: metrica formula_headers_residui con esempi
- validator: penalità formula_headers (−3/cad, cap −15), colonna fhdr nel report tabellare

Risultato su analisi1: voto 92/A, PUA residui 0, formula-hdr residui 0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 14:58:15 +02:00

975 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from collections import Counter
from functools import partial
# ─── Costanti ────────────────────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# Mapping PUA Unicode (U+F020-U+F0FF) → simboli Unicode standard.
# Font Symbol di Windows codifica lettere greche e operatori matematici
# nel range Private Use Area invece dei codepoint Unicode standard.
_SYMBOL_PUA_MAP: dict[str, str] = {
"": " ",
"": "(",
"": ")",
"": "+",
"": "", # minus
"": ".",
"": "/",
"": "0", "": "1", "": "2", "": "3", "": "4",
"": "5", "": "6", "": "7", "": "8", "": "9",
"": ":", "": ";", "": "<", "": "=", "": ">",
"": "", # congruent
"": "Α", # Alpha
"": "Β", # Beta
"": "Χ", # Chi
"": "Δ", # Delta
"": "Ε", # Epsilon
"": "Φ", # Phi
"": "Γ", # Gamma
"": "Η", # Eta
"": "Ι", # Iota
"": "ϑ", # theta variant
"": "Κ", # Kappa
"": "Λ", # Lambda
"": "Μ", # Mu
"": "Ν", # Nu
"": "Ο", # Omicron
"": "Π", # Pi
"": "Θ", # Theta
"": "Ρ", # Rho
"": "Σ", # Sigma
"": "Τ", # Tau
"": "Υ", # Upsilon
"": "ς", # sigma final
"": "Ω", # Omega
"": "Ξ", # Xi
"": "Ψ", # Psi
"": "Ζ", # Zeta
"": "[",
"": "", # therefore
"": "]",
"": "", # perpendicular
"": "α", # alpha
"": "β", # beta
"": "χ", # chi
"": "δ", # delta
"": "ε", # epsilon
"": "φ", # phi
"": "γ", # gamma
"": "η", # eta
"": "ι", # iota
"": "ϕ", # phi variant
"": "κ", # kappa
"": "λ", # lambda
"": "μ", # mu
"": "ν", # nu
"": "ο", # omicron
"": "π", # pi
"": "θ", # theta
"": "ρ", # rho
"": "σ", # sigma
"": "τ", # tau
"": "υ", # upsilon
"": "ϖ", # pi symbol
"": "ω", # omega
"": "ξ", # xi
"": "ψ", # psi
"": "ζ", # zeta
"": "{",
"": "|",
"": "}",
"": "~",
"": "±", # plus-minus
"": "", # bullet
"": "", # square root
"": "", # less or equal
"": "", # greater or equal
"": "", # proportional
"": "×", # multiplication
"": "÷", # division
"": "×", # alternate multiply
"": "", # not equal
"": "", # not equal alternate
"": "", # greater or equal alternate
"": "", # prime
"": "*",
"": ",",
"": "", # less or equal (Symbol 0xA3)
"": "", # bullet (Wingdings 0xA7)
"": "", # bullet variant
"": "", # right arrow (Symbol 0xAE)
"": "÷", # division / range separator
"": "", # Wingdings decorative icon (rimosso)
"": "", # right arrow variant
"": "", # bracket extension piece (non ricostruibile)
"": "",
"": "",
"": "",
"": "",
"": "", # TeX large paren left U+F8EB
"": "", # TeX large paren extension U+F8EC
"": "", # TeX large paren right U+F8ED
"": "", # TeX large paren right ext U+F8EE
"": "", # TeX large bracket left U+F8EF
"": "", # TeX large bracket ext U+F8F0
"": "", # TeX brace top-left U+F8F1
"": "", # TeX brace mid U+F8F2
"": "", # TeX brace mid-right U+F8F3
"": "", # TeX brace extension U+F8F4
"": "", # TeX brace right U+F8F5
"": "", # TeX bracket right large U+F8F6
"": "", # TeX bracket right ext U+F8F7
"": "", # TeX bracket right close U+F8F8
"": "", # TeX integral large U+F8F9
"": "", # TeX integral extension U+F8FA
"": "", # TeX integral top U+F8FB
"": "", # TeX radical top U+F8FC
"": "", # TeX radical extension U+F8FD
"": "", # TeX arrowhead U+F8FE
}
_SYMBOL_PUA_RE = re.compile(
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
)
_SUPERSCRIPT_RE = re.compile(r'[¹²³⁰⁴-⁹]+')
_FOOTNOTE_BODY_RE = re.compile(
r'^([¹²³⁰⁴-⁹]+\s+|\[\d{1,3}\]\s+)'
)
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
_BIB_MARKERS_RE = re.compile(
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
r'|\b(19|20)\d{2}\b',
re.IGNORECASE,
)
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
_MATH_SYMBOLS_RE = re.compile(
r"[=+∈∀∃≤≥∞∑∫∂→↔⊂⊃∩∪αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ]"
)
_EXERCISE_TRIGGER_RE = re.compile(
r"\b(Si dimostri|Si calcoli|Si provi|Si trovi|Trovare|Find|Prove|Show that"
r"|Compute|Calculate|Dimostrare|Verificare)\b",
re.IGNORECASE,
)
_MATH_HDR_RE = re.compile(r"^(#{2,3})\s+(.+)$")
_NUMBERED_PREFIX_RE = re.compile(r"^(\d+(?:\.\d+)*[.)])\s+(.+)$", re.DOTALL)
# Erano compilati dentro le funzioni a ogni chiamata — ora costanti di modulo
_TABSEP_RE = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
_VERSE_NUM_RE = re.compile(
r'([.!?\xbb\'\"]\s+)(\d+)(\s+)(?=[A-Z\xc0-\xd9a-z\xe0-\xf9\xab"“‟])'
)
# ─── Helper puri ─────────────────────────────────────────────────────────────
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|")
)
def _allcaps_to_header(raw_line: str) -> str:
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
_ENVS = (
r"Definizione|Definition|Teorema|Theorem|Lemma|"
r"Proposizione|Proposition|Corollario|Corollary|"
r"Osservazione|Remark|Nota|Note|Esempio|Example"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
title_m = re.match(
r"^([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda].{1,74}?)\.\s+"
r"([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
# ─── Trasformazioni atomiche ──────────────────────────────────────────────────
def _t_fix_symbol_font(text: str) -> tuple[str, int]:
count = [0]
def _repl(m: re.Match) -> str:
count[0] += 1
return _SYMBOL_PUA_MAP[m.group(0)]
result = _SYMBOL_PUA_RE.sub(_repl, text)
return result, count[0]
def _t_remove_images(text: str) -> tuple[str, int]:
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
return text, n
def _t_remove_footnotes(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
stripped = line.strip()
if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300:
count += 1
continue
cleaned = _SUPERSCRIPT_RE.sub("", line)
if cleaned != line:
count += 1
result.append(cleaned)
return "\n".join(result), count
def _t_fix_br(text: str) -> tuple[str, int]:
n = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
return text, n
def _t_fix_tabsep(text: str) -> tuple[str, int]:
n = len(_TABSEP_RE.findall(text))
text = _TABSEP_RE.sub("", text)
return text, n
def _t_fix_accents(text: str) -> tuple[str, int]:
_ACCENT_MAP = {
"e": "\xe8", "E": "\xc8", "a": "\xe0", "A": "\xc0",
"u": "\xf9", "U": "\xd9", "i": "\xec", "I": "\xcc",
"o": "\xf2", "O": "\xd2",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
n_accenti = n_bt_before - text.count("`")
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
n_accenti += n_bt_orfani
return text, n_accenti
def _t_fix_multiplication(text: str) -> tuple[str, int]:
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
return text, n
def _t_fix_micro(text: str) -> tuple[str, int]:
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
return text, n
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
n = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
return text, n
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
return text, n
def _t_fix_header_concat(text: str) -> tuple[str, int]:
count = 0
def _fix(m: re.Match) -> str:
nonlocal count
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa\xe4])(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
count += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
return text, count
def _t_extract_capitolo(text: str) -> tuple[str, int]:
def _repl(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L]"
r"[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_repl,
text,
)
return text, 0
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
if not all_matches:
return text, 0
pairs = [(m.group(2).count(".") + 1, len(m.group(1))) for m in all_matches]
depths = [d for d, _ in pairs]
min_depth = min(depths)
max_depth = max(depths)
if max_depth == min_depth:
return text, 0
base_level = min(lv for d, lv in pairs if d == min_depth)
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
hashes, num, title = m.group(1), m.group(2), m.group(3)
depth = num.count(".") + 1
new_level = min(base_level + (depth - min_depth), 6)
if new_level == len(hashes):
return m.group(0)
count += 1
return f"{'#' * new_level} {num}. {title}"
return _NUMBERED_HDR_RE.sub(_repl, text), count
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
return text, 0
def _t_extract_articles(text: str) -> tuple[str, int]:
return _extract_article_headers(text)
def _t_remove_header_bold(text: str) -> tuple[str, int]:
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
return text, 0
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
def _norm(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
return text, 0
def _t_remove_toc(text: str) -> tuple[str, int]:
lines = text.split("\n")
new_lines = []
_in_toc = False
removed = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
removed = True
_in_toc = True
continue
if _in_toc:
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line):
continue
if len(line.strip()) > 200:
_in_toc = False
new_lines.append(line)
continue
_in_toc = False
new_lines.append(line)
return "\n".join(new_lines), 1 if removed else 0
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
count += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
count += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
return "\n\n".join(new_blocks), count
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
count = 0
def _num_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
if not has_exercises:
def _aphorism_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"\n\n### {m.group(1)}.\n\n{content}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
def _list_section_repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
split = re.search(r"(?<=[a-z\xe0\xe8\xe9\xec\xed\xf2\xf3\xf9\xfa])\s+(?=[A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end():].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-Z\xc0\xc8\xc9\xcc\xcd\xd2\xd3\xd9\xda\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
return text, count
def _t_extract_math(text: str) -> tuple[str, int]:
return _extract_math_environments(text)
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
_SENTENCE_END = set(".?!\xbb)\"'")
blocks = text.split("\n\n")
merged = []
count = 0
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and not stripped.startswith("|")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if (
not nxt
or nxt.startswith("#")
or nxt.startswith("|")
or re.match(r"^\d+\.", nxt)
or re.match(r"^[-*+]\s", nxt)
):
break
b = stripped + " " + nxt
stripped = b.strip()
count += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
text = re.sub(r"(?m)^\|---\|\s*", "", text)
return text, count
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
return text, 0
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
count = 0
def _demote(m: re.Match) -> str:
nonlocal count
hashes, content = m.group(1), m.group(2).strip()
if not re.search(r"\s\d{1,4}\s*$", content):
return m.group(0)
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
if not re.search(r'[,;:.!?\xbb"\'][\ ]+[A-Za-z\xc0-\xff\xab"“]', inner):
return m.group(0)
count += 1
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
return clean
text = re.sub(r"^(#{1,6})\s+(.{20,})$", _demote, text, flags=re.MULTILINE)
return text, count
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
matches = list(_VERSE_NUM_RE.finditer(stripped))
if len(matches) < 2:
result.append(block)
continue
nums = [int(m.group(2)) for m in matches]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
result.append(block)
continue
step = diffs[0]
def _replace_verse_num(m: re.Match) -> str:
n = int(m.group(2))
sep = "\n\n" if n % (step * 3) == 0 else "\n"
return m.group(1).rstrip() + sep
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
if new_block != stripped:
count += len(matches)
result.append(new_block)
return "\n\n".join(result), count
def _t_remove_urls(text: str) -> tuple[str, int]:
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
next_is_long_hdr = (
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
)
if not next_stripped or (
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_hdr
):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
def _t_merge_title_headers(text: str) -> tuple[str, int]:
return _merge_title_headers(text)
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
def _is_garbage(content: str) -> bool:
if content.lstrip().startswith("..."):
return True
if not re.search(r"[A-Za-z\xc0-\xffΑ-ω]{2,}", content):
return True
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
if re.match(r"^[A-Za-zΑ-ω_]{1,3}\s*[=<>≤≥]", content.strip()):
return True
if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE):
return True
return False
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage(m.group(1)):
count += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text, count
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
blocks = re.split(r"\n{2,}", text)
cleaned = []
count = 0
total = len(blocks)
cutoff = max(5, min(15, int(total * 0.20)))
for i, block in enumerate(blocks):
stripped = block.strip()
if i >= cutoff:
cleaned.append(block)
continue
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
count += 1
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
def _t_remove_watermarks(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if _WATERMARK_RE.match(line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
if line.strip() and re.match(r"^[\s■-◿☐-☒•▪▫◆◇●○•]+$", line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
lines = text.split("\n")
short_lines = [
ln.strip() for ln in lines
if 3 < len(ln.strip()) < 80
and not ln.strip().startswith("#")
and not ln.strip().startswith("|")
]
freq = Counter(short_lines)
recurring = {ln for ln, c in freq.items() if c >= 5}
if not recurring:
return text, 0
result, count = [], 0
for line in lines:
if line.strip() in recurring:
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_math_header_demotion(text: str) -> tuple[str, int]:
lines = text.split("\n")
result, count = [], 0
for line in lines:
m = _MATH_HDR_RE.match(line)
if not m:
result.append(line)
continue
body = m.group(2)
if len(body) <= 100:
result.append(line)
continue
has_math = len(_MATH_SYMBOLS_RE.findall(body)) >= 3
has_exercise = bool(_EXERCISE_TRIGGER_RE.search(body))
if not (has_math or has_exercise):
result.append(line)
continue
nm = _NUMBERED_PREFIX_RE.match(body)
if nm:
result.append(f"**{nm.group(1)}** {nm.group(2)}")
else:
result.append(body)
count += 1
return "\n".join(result), count
# ─── Orchestratore ───────────────────────────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
L'ordine è semantico: encoding → struttura header → costruzione struttura → testo → rifinitura.
"""
_has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE))
_transforms: list[tuple[str | None, object]] = [
("n_simboli_pua_corretti", _t_fix_symbol_font),
("n_immagini_rimosse", _t_remove_images),
("n_br_rimossi", _t_fix_br),
("n_tabsep_rimossi", _t_fix_tabsep),
("n_note_rimosse", _t_remove_footnotes),
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
("n_simboli_math_rimossi", _t_fix_math_symbols),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
(None, _t_normalize_header_levels),
("n_articoli_estratti", _t_extract_articles),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
("toc_rimosso", _t_remove_toc),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
("n_versi_ripristinati", _t_restore_poetry_lines),
("n_header_verso_demotati", _t_demote_verse_headers),
(None, _t_remove_urls),
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
(None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_formula_headers_demotati", _t_math_header_demotion),
("n_frontmatter_rimossi", _t_remove_frontmatter),
("n_watermark_rimossi", _t_remove_watermarks),
]
stats: dict = {}
for stat_key, fn in _transforms:
text, n = fn(text)
if stat_key:
stats[stat_key] = stats.get(stat_key, 0) + n
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
return text, stats