Compare commits

...

20 Commits

Author SHA1 Message Date
davide 5a8cd3ef46 fix: normalizza x in forward() dividendo per L
Rende la normalizzazione degli input simmetrica: x/L e t/T_END
entrambi in [0,1], indipendentemente dal valore di L in config.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 16:08:18 +02:00
davide f94fd51942 fix: corregge normalizzazione BC loss con _bc_scale analitico
Il termine convettivo H_CONV/K*(T-T_AMB) domina il residuo Robin di un
fattore H*L/K=10 rispetto al gradiente, rendendo L_bc ~100x sovrastimata
rispetto a L_pde. Sostituisce grad_char=(Q/K)² con _bc_scale=max(Q/K,
H*T_char/K)² (~2.25e6) per bilanciare correttamente i termini della loss.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 16:07:49 +02:00
davide b8301a4329 test: aggiunge suite completa — unit, integration ed e2e (42 test)
- pytest.ini: configura testpaths, marker slow, output verboso
- tests/conftest.py: fixture condivise (device, small_data, pinn_model)
- tests/test_config.py: sanità parametri fisici e numerici, CFL, _pde_scale
- tests/test_model.py: HeatPINN.forward e heat_pinn_loss (shape, finiti,
  zero-weight analytici per IC e BC, scaling dei pesi)
- tests/test_engine_data.py: set_seed, _get_device, prepare_data
  (shape, bounds, device consistency, determinismo)
- tests/test_integration_pinn.py: pipeline dati→modello→loss→backward
- tests/test_e2e.py: FDM completo, visualizer FDM/PINN con tmp_path,
  training breve (2 test @slow)
- requirements.txt: aggiunge pytest>=7.0.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:50:54 +02:00
davide 1237a57290 clear.sh: aggiunge menu per ripulire risultati PINN oltre a FDM
Estrae la logica interattiva in cleanup_dir() e aggiunge un menu di primo
livello con le opzioni: FDM, PINN, Entrambi, Annulla.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:50:42 +02:00
davide e868c47190 PINN: normalizza t in forward(), corregge _pde_scale e aumenta patience
- forward(): divide t per T_END prima di passarlo alla rete, evita saturazione
  di Tanh per t∈[0,10] e migliora la sensibilità temporale del modello
- _pde_scale: include il picco gaussiano della sorgente come denominatore;
  con GAUSS_SIGMA=0.01 il picco (~60 °C/s) supera T_char/T_END (15), rendendo
  la loss PDE non normalizzata senza questa correzione
- PATIENCE 100→500, SCHED_PATIENCE 30→150: il training ha ora spazio per
  convergere prima che l'early stopping o lo scheduler blocchino l'ottimizzatore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:50:36 +02:00
davide 649d26cfd4 config: W_BC 1→5, N_F 4000→6000, GAUSS_SIGMA 0.02→0.01
W_BC=1 causava temperatura sub-ambiente (11.93°C < T_AMB=20°C) perché
i bordi erano poco vincolati; 5 è bilanciamento tra evitare trivial solution
(W_BC=10) e rispettare le BC di Robin (W_BC=1).
N_F aumentato per coprire meglio il dominio temporale tardo dove l'errore
cresceva (max 12.87°C a t=10s).
GAUSS_SIGMA ridotto per avvicinarsi alla sorgente puntuale del FDM e
ridurre il mismatch fisico che causava L2=11.6%.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:21:48 +02:00
davide 256945ada3 PINN: ripristina forward() originale, aumenta LBFGS_STEPS e corregge pesi loss
Il training collassava alla soluzione banale T=T_AMB perché W_BC=10 spingeva
Adam a soddisfare le Robin BC (trivialmente, con gradiente zero) sacrificando
la PDE. Fix: W_PDE=10, W_BC=1 così la PDE domina il gradiente fin dal primo
epoch. LBFGS_STEPS: 20→200 perché L-BFGS era l'unico ottimizzatore a fare
progressi reali. forward(): rimossa moltiplicazione t_norm che causava
vanishing gradient su dT/dx e d²T/dx².

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:50:18 +02:00
davide f02c5f2bbe PINN: vincolo IC hard — moltiplica output per t_norm per evitare trivial solution
La normalizzazione introdotta (x,t in [0,1]²) rendeva il minimo banale
net=0 (T=T_AMB ovunque) troppo accessibile, causando il collasso del training.
Soluzione: T = T_AMB + T_char * (t/T_END) * net(x_norm, t_norm).
Così T(x,0) = T_AMB per costruzione (vincolo hard) e la rete deve trovare
soluzioni non banali per t>0. La loss IC resta ma è sempre 0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:28:00 +02:00
davide 9e77deffd5 PINN: risolve problemi minori — sigma in config, scale precompilate, closure fuori loop
- config.py: aggiunge GAUSS_SIGMA = 0.02 nella sezione parametri fisici
- model.py: T_char, grad_char, pde_scale diventano costanti di modulo (_T_char,
  _grad_char, _pde_scale) calcolate una sola volta all'import
- engine.py: closure L-BFGS definita una volta sola fuori dal loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:23:33 +02:00
davide bca829bd7e PINN: sposta model.train() fuori dal loop e aggiunge weights_only a torch.load
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:21:17 +02:00
davide 98bfc78651 PINN: normalizza input in [0,1]² e ottimizza autograd
- forward(): divide (x,t) per (L, T_END) prima di passare alla rete,
  così le due dimensioni hanno la stessa scala indipendentemente da T_END
- heat_pinn_loss: calcola dT_dt e dT_dx in un singolo backward pass
  usando autograd.grad con lista [t_f, x_f]

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:20:20 +02:00
davide fbb0458f69 PINN: allinea output a results/pinn/ e centralizza parametri in config
- visualizer.py: sostituisce animations/ con results/pinn/TIMESTAMP/,
  nomi fissi (heatmap.html, animation.html, comparison.html) come FDM
- config.py: aggiunge sezioni architettura, sampling, Adam, L-BFGS, loss weights
- model.py: costruisce HeatPINN dinamicamente da HIDDEN_SIZE/N_HIDDEN_LAYERS;
  heat_pinn_loss legge pesi W_PDE/W_IC/W_BC da config
- engine.py: tutti i parametri di training letti da config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 14:14:11 +02:00
davide 4f050e80df Estende scope di lavoro a tutto il repository
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 13:59:34 +02:00
davide b5553691e8 Merge branch 'fdm': output timestampato, heatmap animata, menu semplificato
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 13:19:50 +02:00
davide a655e53551 FDM: heatmap.html con due grafici indipendenti, statico e striscia animata
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:29:27 +02:00
davide fa5ce5d19c Aggiunge clear.sh per pulizia interattiva di results/fdm/
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:17:44 +02:00
davide 1a530e86ba FDM: semplifica menu da 4 voci a Risolvi/Visualizza/Esci
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:16:17 +02:00
davide 54a7e9aed9 FDM: salva risultati in results/fdm/<timestamp>/ invece di animations/fdm/
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:16:13 +02:00
davide 19207cabe4 Aggiunge vincolo scope FDM-only in CLAUDE.md
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:16:09 +02:00
davide b663a89abd Allinea PINN alla fisica FDM: sorgente interna e BC Robin bilaterali
- model.py: aggiunge termine sorgente Gaussiana (σ=0.02) nella PDE loss
  per approssimare δ(x − X_SRC); sostituisce BC Neumann a x=0 con Robin
- engine.py: clustering collocation vicino X_SRC anziché x=0;
  downsample FDM su entrambi gli assi spaziale e temporale in evaluate_model()
- visualizer.py: downsample FDM su entrambi gli assi prima del plot
- app.py: aggiorna header con fisica corrente
- CLAUDE.md: aggiorna PDE, BC e note architetturali

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 12:07:14 +02:00
18 changed files with 762 additions and 130 deletions
+1
View File
@@ -98,6 +98,7 @@ StyleCopReport.xml
*.svclog
*.scc
.venv
.venv*
# Chutzpah Test files
_Chutzpah*
+14 -8
View File
@@ -6,16 +6,22 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
Write all git commit messages in Italian.
## Scope of Work
Lavora su tutto il repository: `fdm/`, `model.py`, `engine.py`, `visualizer.py`, `app.py`, `config.py`. Aiuta con migliorie, bugfix e ottimizzazioni su qualsiasi file del progetto.
## Project Overview
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with physical boundary conditions:
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with an internal point heat source:
```
∂T/∂t = α ∂²T/∂x² x ∈ [0, L], t ∈ [0, T_END]
∂T/∂t = α ∂²T/∂x² + (α/k) Q(t) δ(x X_SRC) x ∈ [0, L], t ∈ [0, T_END]
```
where `Q(t) = Q_VAL` if `t ≥ T_STEP` else `0`.
- **IC:** `T(x, 0) = T0` (uniform)
- **BC x=0:** Neumann — heat flux step: `k ∂T/∂x = Q(t)` where `Q = Q_VAL` if `t ≥ T_STEP` else `0`
- **BC x=0:** Robin — convection: `k ∂T/∂x = h (T T_AMB)`
- **BC x=L:** Robin — convection: `k ∂T/∂x = h (T T_AMB)`
No experimental data is needed. A `fdm/` module provides a reference numerical solution (FTCS explicit scheme) used for evaluation and visualization comparison.
@@ -59,7 +65,7 @@ model.py ← HeatPINN + heat_pinn_loss()
engine.py ← data sampling, Adam+L-BFGS training, evaluation vs FDM, visualization call
visualizer.py ← PINN vs FDM: heatmap, animated T(x), time-series at fixed points
fdm/
solver.py ← FTCS explicit scheme, ghost-cell Neumann, explicit Robin
solver.py ← FTCS explicit scheme, Robin on both ends, point source at X_SRC
visualizer.py ← same 3 plot types for FDM-only output
app.py ← FDM CLI menu
```
@@ -74,11 +80,11 @@ T = T_AMB + (Q_VAL · L / K) · net(x, t)
```
This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are O(1) for the network to learn. Do not remove this scaling.
`heat_pinn_loss()` normalizes all three loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. Changing physical parameters in `config.py` does not require re-tuning loss weights.
`heat_pinn_loss()` normalizes all four loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. The PDE residual includes the Gaussian-smoothed source term (σ=0.02) as a continuous approximation to δ(x X_SRC). Changing physical parameters in `config.py` does not require re-tuning loss weights.
### Training (`engine.py`)
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=0` (steep Neumann gradient) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low.
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=X_SRC` (steep gradient at source) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low.
`train_model()` runs **Adam first, then L-BFGS fine-tuning**. L-BFGS uses a closure that captures loss components in `_last` dict (avoids calling `heat_pinn_loss` outside an active grad context).
@@ -87,8 +93,8 @@ This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are
### FDM Solver (`fdm/solver.py`)
Returns `(T_matrix[NX, NT], x_vals, t_vals)`. Uses:
- Ghost cell for Neumann: `T_ghost = T[1] + 2·dx·Q(t)/k`
- Explicit Robin at x=L: `T[N] = (T[N1] + dx·h/k·T_amb) / (1 + dx·h/k)` — uses `T_cur[-2]`, not `T_new[-2]`
- Robin BC on both ends: `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`
- Point source injected at node `i_src = argmin|x - X_SRC|` after BCs: `T[i_src] += Q·α·dt/(k·dx)`
- CFL check at startup (warns, does not crash)
### Loss Scaling Notes
+1 -1
View File
@@ -5,7 +5,7 @@ import engine
def print_header():
print("=" * 55)
print(" Heat Equation PINN — ∂T/∂t = α ∂²T/∂x²")
print(" Neumann BC (x=0) + Robin BC (x=L)")
print(" Robin BC (x=0, x=L) + point source @ X_SRC")
print("=" * 55)
Executable
+96
View File
@@ -0,0 +1,96 @@
#!/usr/bin/env bash
set -euo pipefail
BASE_DIR="$(dirname "$0")"
cleanup_dir() {
local LABEL="$1"
local DIR="$2"
if [[ ! -d "$DIR" ]]; then
echo "Nessuna cartella $DIR trovata."
return
fi
mapfile -t RUNS < <(find "$DIR" -mindepth 1 -maxdepth 1 -type d | sort)
if [[ ${#RUNS[@]} -eq 0 ]]; then
echo "Nessun risultato $LABEL da cancellare."
return
fi
echo ""
echo "Risultati $LABEL disponibili:"
for i in "${!RUNS[@]}"; do
printf " %2d. %s\n" "$((i+1))" "$(basename "${RUNS[$i]}")"
done
echo ""
echo "a. Cancella tutti"
echo "s. Selezione manuale"
echo "0. Annulla"
echo ""
read -rp "Scelta [$LABEL]: " MODE
case "$MODE" in
a|A)
read -rp "Confermi la cancellazione di ${#RUNS[@]} cartelle $LABEL? [s/N] " CONFIRM
if [[ "${CONFIRM,,}" == "s" ]]; then
rm -rf "${RUNS[@]}"
echo "Cancellati ${#RUNS[@]} risultati $LABEL."
else
echo "Annullato."
fi
;;
s|S)
read -rp "Numeri da cancellare (es. 1 3 5): " -a CHOICES
TO_DELETE=()
for N in "${CHOICES[@]}"; do
if [[ "$N" =~ ^[0-9]+$ ]] && (( N >= 1 && N <= ${#RUNS[@]} )); then
TO_DELETE+=("${RUNS[$((N-1))]}")
else
echo "Indice non valido: $N (ignorato)"
fi
done
if [[ ${#TO_DELETE[@]} -eq 0 ]]; then
echo "Nessuna selezione valida."
return
fi
echo "Verranno cancellati:"
for D in "${TO_DELETE[@]}"; do
echo " - $(basename "$D")"
done
read -rp "Confermi? [s/N] " CONFIRM
if [[ "${CONFIRM,,}" == "s" ]]; then
rm -rf "${TO_DELETE[@]}"
echo "Cancellati ${#TO_DELETE[@]} risultati $LABEL."
else
echo "Annullato."
fi
;;
0|"")
echo "Annullato."
;;
*)
echo "Scelta non valida."
;;
esac
}
echo "Cosa vuoi ripulire?"
echo " 1. Risultati FDM (results/fdm/)"
echo " 2. Risultati PINN (results/pinn/)"
echo " 3. Entrambi"
echo " 0. Annulla"
echo ""
read -rp "Scelta: " CHOICE
case "$CHOICE" in
1) cleanup_dir "FDM" "$BASE_DIR/results/fdm" ;;
2) cleanup_dir "PINN" "$BASE_DIR/results/pinn" ;;
3)
cleanup_dir "FDM" "$BASE_DIR/results/fdm"
cleanup_dir "PINN" "$BASE_DIR/results/pinn"
;;
0|"") echo "Annullato." ;;
*) echo "Scelta non valida." ; exit 1 ;;
esac
+29
View File
@@ -19,3 +19,32 @@ T_END = 10.0 # fine simulazione [s]
# Griglia FDM
NX = 250 # nodi spaziali
NT = 15000 # passi temporali (verifica CFL automatica)
# Sorgente gaussiana (approssimazione continua del delta di Dirac)
GAUSS_SIGMA = 0.01 # larghezza del picco gaussiano [m]
# Architettura PINN
HIDDEN_SIZE = 128 # neuroni per layer nascosto
N_HIDDEN_LAYERS = 4 # numero di layer nascosti
# Sampling punti di collocazione
N_F = 6000 # punti PDE (+ 50% clustering automatico vicino a X_SRC e T_STEP)
N_IC = 400 # punti condizione iniziale
N_BC = 400 # punti condizioni al contorno
# Training Adam
EPOCHS = 5000 # epoche massime
PATIENCE = 500 # early stopping
LR_ADAM = 1e-3 # learning rate iniziale
SCHED_FACTOR = 0.5 # ReduceLROnPlateau: fattore di riduzione
SCHED_PATIENCE = 150 # ReduceLROnPlateau: patience
SCHED_MIN_LR = 1e-6 # ReduceLROnPlateau: lr minimo
# Fine-tuning L-BFGS
LR_LBFGS = 0.1 # learning rate L-BFGS
LBFGS_STEPS = 200 # numero di step L-BFGS
# Pesi della loss
W_PDE = 10.0 # peso residuo PDE
W_IC = 1.0 # peso condizione iniziale
W_BC = 5.0 # peso condizioni al contorno
+38 -28
View File
@@ -35,7 +35,10 @@ def _get_device():
return torch.device('cpu')
def prepare_data(N_f=4000, N_ic=400, N_bc=400):
def prepare_data(N_f=None, N_ic=None, N_bc=None):
if N_f is None: N_f = config.N_F
if N_ic is None: N_ic = config.N_IC
if N_bc is None: N_bc = config.N_BC
set_seed(42)
device = _get_device()
@@ -43,16 +46,17 @@ def prepare_data(N_f=4000, N_ic=400, N_bc=400):
x_f = torch.rand(N_f, device=device) * config.L
t_f = torch.rand(N_f, device=device) * config.T_END
# Extra points near x=0 (steep Neumann gradient) and t=T_STEP (flux step)
# Extra points near X_SRC (steep gradient at source) and t=T_STEP (flux step)
n_extra = N_f // 4
x_near0 = torch.rand(n_extra, device=device) * config.L * 0.1 # x in [0, 0.1]
t_near0 = torch.rand(n_extra, device=device) * config.T_END
x_near_src = config.X_SRC + (torch.rand(n_extra, device=device) - 0.5) * config.L * 0.1
x_near_src = x_near_src.clamp(0, config.L)
t_near_src = torch.rand(n_extra, device=device) * config.T_END
x_step = torch.rand(n_extra, device=device) * config.L
t_step = config.T_STEP + (torch.rand(n_extra, device=device) - 0.5) * 0.1 # t near T_STEP
t_step = t_step.clamp(0, config.T_END)
x_f = torch.cat([x_f, x_near0, x_step])
t_f = torch.cat([t_f, t_near0, t_step])
x_f = torch.cat([x_f, x_near_src, x_step])
t_f = torch.cat([t_f, t_near_src, t_step])
x_ic = torch.rand(N_ic, device=device) * config.L
t_bc = torch.rand(N_bc, device=device) * config.T_END
@@ -60,19 +64,24 @@ def prepare_data(N_f=4000, N_ic=400, N_bc=400):
return {'device': device, 'x_f': x_f, 't_f': t_f, 'x_ic': x_ic, 't_bc': t_bc}
def train_model(data, epochs=5000, patience=100):
def train_model(data, epochs=None, patience=None):
if epochs is None: epochs = config.EPOCHS
if patience is None: patience = config.PATIENCE
device = data['device']
model = HeatPINN().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=30, min_lr=1e-6)
optimizer = optim.Adam(model.parameters(), lr=config.LR_ADAM)
scheduler = ReduceLROnPlateau(optimizer, mode='min',
factor=config.SCHED_FACTOR,
patience=config.SCHED_PATIENCE,
min_lr=config.SCHED_MIN_LR)
os.makedirs(MODELS_DIR, exist_ok=True)
best_loss = float('inf')
patience_counter = 0
print(f"\n--- Heat PINN Training (Adam) on {device} ---")
model.train()
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
@@ -100,33 +109,33 @@ def train_model(data, epochs=5000, patience=100):
# L-BFGS fine-tuning phase (standard PINN practice for convergence to better minima)
print("\n--- L-BFGS fine-tuning ---")
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
model.load_state_dict(ckpt['state_dict'])
lbfgs = optim.LBFGS(model.parameters(), lr=0.1, max_iter=50,
lbfgs = optim.LBFGS(model.parameters(), lr=config.LR_LBFGS, max_iter=50,
history_size=50, tolerance_grad=1e-7, line_search_fn='strong_wolfe')
_last = {}
for step in range(20):
def closure():
lbfgs.zero_grad()
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
loss.backward()
_last['loss'] = loss.item()
_last['pde'] = L_pde.item()
_last['ic'] = L_ic.item()
_last['bc'] = L_bc.item()
return loss
def closure():
lbfgs.zero_grad()
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
loss.backward()
_last['loss'] = loss.item()
_last['pde'] = L_pde.item()
_last['ic'] = L_ic.item()
_last['bc'] = L_bc.item()
return loss
for step in range(config.LBFGS_STEPS):
lbfgs.step(closure)
if _last['loss'] < best_loss:
best_loss = _last['loss']
torch.save({'state_dict': model.state_dict()}, MODEL_SAVE_PATH)
if (step + 1) % 5 == 0:
print(f"L-BFGS step {step+1}/20 | Loss: {_last['loss']:.6f} "
print(f"L-BFGS step {step+1}/{config.LBFGS_STEPS} | Loss: {_last['loss']:.6f} "
f"| PDE: {_last['pde']:.6f} | IC: {_last['ic']:.6f} | BC: {_last['bc']:.6f}")
print("Training complete! Model saved.")
@@ -136,7 +145,7 @@ def _load_model(device):
if not os.path.exists(MODEL_SAVE_PATH):
print("Error: model not found. Train the model first.")
return None
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
model = HeatPINN().to(device)
model.load_state_dict(ckpt['state_dict'])
model.eval()
@@ -163,10 +172,11 @@ def evaluate_model(data):
from fdm.solver import solve as fdm_solve
T_fdm, _, _ = fdm_solve()
# Downsample FDM time axis (NX=100, NT=5000) to match PINN grid (100x100)
# Downsample FDM grid (NX, NT) to match PINN prediction grid (100x100)
nx, nt = T_pred.shape
x_indices = np.linspace(0, T_fdm.shape[0] - 1, nx, dtype=int)
t_indices = np.linspace(0, T_fdm.shape[1] - 1, nt, dtype=int)
T_fdm_ds = T_fdm[:, t_indices] # (100, 100)
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)] # (100, 100)
l2_rel = np.sqrt(np.mean((T_pred - T_fdm_ds) ** 2)) / np.sqrt(np.mean(T_fdm_ds ** 2))
max_err = np.max(np.abs(T_pred - T_fdm_ds))
+4 -7
View File
@@ -24,26 +24,23 @@ def main_menu():
print("\n" + "-" * 30)
print(" MAIN MENU")
print("-" * 30)
print("1. Risolvi e salva risultati")
print("2. Heatmap T(x,t)")
print("3. Animazione T(x) nel tempo")
print("4. Grafico T(t) in punti fissi")
print("1. Risolvi")
print("2. Visualizza")
print("0. Esci")
print("-" * 30)
choice = input("Select an option (0-4): ").strip()
choice = input("Select an option (0-2): ").strip()
if choice == "1":
T, x_vals, t_vals = solve()
print(f"Soluzione completata. Shape T: {T.shape}")
print(f"T range: [{T.min():.2f}, {T.max():.2f}] °C")
elif choice in ("2", "3", "4"):
elif choice == "2":
if T is None:
print("Eseguire prima l'opzione 1.")
else:
visualize_fdm(T, x_vals, t_vals)
print("Grafici salvati in animations/fdm/")
elif choice == "0":
print("Uscita.")
+102 -39
View File
@@ -1,5 +1,6 @@
import sys
import os
from datetime import datetime
import numpy as np
import plotly.graph_objects as go
@@ -7,16 +8,6 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
FDM_ANIM_DIR = os.path.join(BASE_DIR, 'animations', 'fdm')
def _next_path(base_dir, prefix, ext):
i = 1
while True:
path = os.path.join(base_dir, f'{prefix}_{i:03d}{ext}')
if not os.path.exists(path):
return path
i += 1
def visualize_fdm(T_matrix, x_vals, t_vals):
@@ -31,55 +22,127 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
t_vals : np.ndarray, shape (NT,)
Time values.
"""
os.makedirs(FDM_ANIM_DIR, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_dir = os.path.join(BASE_DIR, 'results', 'fdm', timestamp)
os.makedirs(out_dir, exist_ok=True)
# ------------------------------------------------------------------
# 1. Heatmap
# 1. heatmap.html: plot statico T(x,t) + striscia 1D animata
# Due figure Plotly indipendenti nello stesso file HTML per
# evitare conflitti tra animazioni e assi multipli.
# ------------------------------------------------------------------
zmax = float(np.max(np.abs(T_matrix - config.T0)))
# Use symmetric range centred on T0 if there is any variation,
# otherwise fall back to the raw range.
z_data = T_matrix.T # shape (NT, NX) — rows=time, cols=space
zmin_val = float(np.min(T_matrix))
zmax_val = float(np.max(T_matrix))
if zmax > 0:
z_center = float(np.mean(T_matrix))
z_abs = float(np.max(np.abs(T_matrix - z_center)))
zmin_sym = z_center - z_abs
zmax_sym = z_center + z_abs
else:
zmin_sym = float(np.min(T_matrix))
zmax_sym = float(np.max(T_matrix))
# Subsample frames (shared with animation below)
n_frames = len(t_vals)
max_frames = 200
step = max(1, n_frames // max_frames)
frame_indices = list(range(0, n_frames, step))
fig_heatmap = go.Figure(go.Heatmap(
# --- Figura A: heatmap 2D statica (identica all'originale) ---
z_data = T_matrix.T # (NT, NX)
z_center = float(np.mean(T_matrix))
z_abs = float(np.max(np.abs(T_matrix - z_center))) or 1.0
fig_static = go.Figure(go.Heatmap(
z=z_data,
x=x_vals,
y=t_vals,
colorscale='RdBu_r',
zmin=zmin_sym,
zmax=zmax_sym,
zmin=z_center - z_abs,
zmax=z_center + z_abs,
colorbar=dict(title='T [°C]'),
))
fig_heatmap.update_layout(
fig_static.update_layout(
title='FDM — Heat Equation T(x,t)',
xaxis_title='x [m]',
yaxis_title='t [s]',
height=500,
height=480,
margin=dict(t=50, b=50, l=70, r=90),
)
heatmap_path = _next_path(FDM_ANIM_DIR, 'heatmap', '.html')
fig_heatmap.write_html(heatmap_path)
# --- Figura B: striscia 1D animata ---
strip_frames = [
go.Frame(
data=[go.Heatmap(
z=T_matrix[:, idx].reshape(1, -1),
x=x_vals,
y=[0],
colorscale='Jet',
zmin=zmin_val,
zmax=zmax_val,
showscale=True,
colorbar=dict(title='T [°C]', thickness=18),
)],
name=str(idx),
layout=go.Layout(title_text=f't = {t_vals[idx]:.3f} s'),
)
for idx in frame_indices
]
fig_strip = go.Figure(
data=[go.Heatmap(
z=T_matrix[:, frame_indices[0]].reshape(1, -1),
x=x_vals,
y=[0],
colorscale='Jet',
zmin=zmin_val,
zmax=zmax_val,
showscale=True,
colorbar=dict(title='T [°C]', thickness=18),
)],
frames=strip_frames,
layout=go.Layout(
title=f't = {t_vals[frame_indices[0]]:.3f} s',
xaxis=dict(title='x [m]'),
yaxis=dict(showticklabels=False, showgrid=False,
zeroline=False, fixedrange=True),
height=300,
margin=dict(t=80, b=130, l=70, r=110),
updatemenus=[dict(
type='buttons',
showactive=False,
y=1.22, x=0.5, xanchor='center',
buttons=[
dict(label='▶ Play', method='animate',
args=[None, dict(frame=dict(duration=40, redraw=True),
fromcurrent=True, mode='immediate')]),
dict(label='⏸ Pause', method='animate',
args=[[None], dict(frame=dict(duration=0, redraw=False),
mode='immediate')]),
],
)],
sliders=[dict(
steps=[
dict(method='animate',
args=[[str(idx)], dict(mode='immediate',
frame=dict(duration=0, redraw=True))],
label=f'{t_vals[idx]:.2f}')
for idx in frame_indices
],
transition=dict(duration=0),
x=0.05, y=0, len=0.9,
currentvalue=dict(prefix='t = ', font=dict(size=14)),
)],
),
)
# Scrivi entrambe le figure in un unico file HTML
html_static = fig_static.to_html(full_html=False, include_plotlyjs='cdn')
html_strip = fig_strip.to_html(full_html=False, include_plotlyjs=False)
heatmap_path = os.path.join(out_dir, 'heatmap.html')
with open(heatmap_path, 'w', encoding='utf-8') as _f:
_f.write('<!DOCTYPE html><html><head><meta charset="utf-8"></head><body>\n')
_f.write(html_static)
_f.write('\n')
_f.write(html_strip)
_f.write('\n</body></html>')
print(f"Heatmap saved → {heatmap_path}")
# ------------------------------------------------------------------
# 2. Animated profile T(x) evolving in time
# ------------------------------------------------------------------
L = config.L
n_frames = len(t_vals)
# Subsample frames for a manageable animation (max ~200 frames)
max_frames = 200
step = max(1, n_frames // max_frames)
frame_indices = list(range(0, n_frames, step))
y_min = float(np.min(T_matrix)) - 1.0
y_max = float(np.max(T_matrix)) + 1.0
@@ -168,7 +231,7 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
frames=frames,
)
anim_path = _next_path(FDM_ANIM_DIR, 'animation', '.html')
anim_path = os.path.join(out_dir, 'animation.html')
fig_anim.write_html(anim_path)
print(f"Animation saved → {anim_path}")
@@ -222,6 +285,6 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
height=480,
)
ts_path = _next_path(FDM_ANIM_DIR, 'time_series', '.html')
ts_path = os.path.join(out_dir, 'time_series.html')
fig_ts.write_html(ts_path)
print(f"Time-series saved → {ts_path}")
+39 -28
View File
@@ -6,55 +6,66 @@ import config
class HeatPINN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(2, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 1),
)
h = config.HIDDEN_SIZE
layers = [nn.Linear(2, h), nn.Tanh()]
for _ in range(config.N_HIDDEN_LAYERS - 1):
layers += [nn.Linear(h, h), nn.Tanh()]
layers.append(nn.Linear(h, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
# Output scaled to physical range: T_AMB + (Q*L/K) * net
# net learns dimensionless perturbation in [0,1] range
T_scale = config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(x)
return T_scale
def forward(self, xt):
x = xt[:, :1] / config.L
t = xt[:, 1:] / config.T_END
return config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(torch.cat([x, t], dim=1))
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc, w_pde=1.0, w_ic=1.0, w_bc=10.0):
# Characteristic scales for normalization
T_char = config.Q_VAL * config.L / config.K # ~50 °C — temperature scale
grad_char = (config.Q_VAL / config.K) ** 2 # ~2500 — gradient scale²
# Precomputed loss scales (depend only on config constants)
_T_char = config.Q_VAL * config.L / config.K # ~150 °C — temperature scale
# _pde_scale covers both dT/dt and the Gaussian source peak (dominant with small sigma)
_src_peak = config.ALPHA * config.Q_VAL / (config.K * config.GAUSS_SIGMA * (2 * 3.141592653589793) ** 0.5)
_pde_scale = max((_T_char / config.T_END) ** 2, _src_peak ** 2) + 1e-8
# Robin BC residual scale: max(dT/dx, H_CONV/K * T_char) — convective term dominates when H*L/K >> 1
_bc_scale = max(config.Q_VAL / config.K,
config.H_CONV * _T_char / config.K) ** 2
# PDE residual: dT/dt - alpha * d2T/dx2 = 0 (normalized by T_char/t_char)
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc,
w_pde=None, w_ic=None, w_bc=None):
if w_pde is None: w_pde = config.W_PDE
if w_ic is None: w_ic = config.W_IC
if w_bc is None: w_bc = config.W_BC
T_char = _T_char
# PDE residual: dT/dt - alpha * d2T/dx2 - source(x,t) = 0 (normalized by T_char/t_char)
x_f = x_f.detach().requires_grad_(True)
t_f = t_f.detach().requires_grad_(True)
T_f = model(torch.stack([x_f, t_f], dim=1))
dT_dt = torch.autograd.grad(T_f.sum(), t_f, create_graph=True)[0]
dT_dx = torch.autograd.grad(T_f.sum(), x_f, create_graph=True)[0]
dT_dt, dT_dx = torch.autograd.grad(T_f.sum(), [t_f, x_f], create_graph=True)
d2T_dx2 = torch.autograd.grad(dT_dx.sum(), x_f, create_graph=True)[0]
pde_scale = (T_char / config.T_END) ** 2 + 1e-8
L_pde = ((dT_dt - config.ALPHA * d2T_dx2) ** 2).mean() / pde_scale
Q_t_f = torch.where(t_f >= config.T_STEP,
torch.tensor(config.Q_VAL, device=t_f.device, dtype=t_f.dtype),
torch.tensor(0.0, device=t_f.device, dtype=t_f.dtype))
sigma = config.GAUSS_SIGMA
gauss = torch.exp(-0.5 * ((x_f - config.X_SRC) / sigma) ** 2) / (sigma * (2 * torch.pi) ** 0.5)
source_term = (config.ALPHA / config.K) * Q_t_f * gauss
L_pde = ((dT_dt - config.ALPHA * d2T_dx2 - source_term) ** 2).mean() / _pde_scale
# IC: T(x, 0) = T0 — normalized by T_char²
T_ic_pred = model(torch.stack([x_ic, torch.zeros_like(x_ic)], dim=1))
T_ic_true = torch.full_like(T_ic_pred, config.T0)
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (T_char ** 2 + 1e-8)
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (_T_char ** 2 + 1e-8)
# BC x=0: Neumann — dT/dx + Q(t)/K = 0
# BC x=0: Robin — dT/dx + H_CONV/K * (T(0,t) - T_AMB) = 0
x_left = torch.zeros(t_bc.shape[0], device=t_bc.device).requires_grad_(True)
T_left = model(torch.stack([x_left, t_bc.detach()], dim=1))
dT_dx_left = torch.autograd.grad(T_left.sum(), x_left, create_graph=True)[0]
Q_t = torch.where(t_bc >= config.T_STEP,
torch.tensor(config.Q_VAL, device=t_bc.device, dtype=t_bc.dtype),
torch.tensor(0.0, device=t_bc.device, dtype=t_bc.dtype))
L_bc_left = ((dT_dx_left + Q_t / config.K) ** 2).mean() / grad_char
L_bc_left = ((dT_dx_left + (config.H_CONV / config.K) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
# BC x=L: Robin — dT/dx + H_CONV/K * (T(L,t) - T_AMB) = 0
x_right = torch.full((t_bc.shape[0],), config.L, device=t_bc.device).requires_grad_(True)
T_right = model(torch.stack([x_right, t_bc.detach()], dim=1))
dT_dx_right = torch.autograd.grad(T_right.sum(), x_right, create_graph=True)[0]
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / grad_char
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
L_bc = L_bc_left + L_bc_right
total = w_pde * L_pde + w_ic * L_ic + w_bc * L_bc
+5
View File
@@ -0,0 +1,5 @@
[pytest]
testpaths = tests
addopts = -v --tb=short
markers =
slow: test lenti (training completo) — escludi con -m "not slow"
+1
View File
@@ -1,4 +1,5 @@
torch>=2.0.0
pytest>=7.0.0
pandas>=2.0.0
numpy>=1.24.0
scikit-learn>=1.3.0
+24
View File
@@ -0,0 +1,24 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
import torch
@pytest.fixture(scope="session")
def device():
from engine import _get_device
return _get_device()
@pytest.fixture
def small_data():
from engine import prepare_data
return prepare_data(N_f=200, N_ic=50, N_bc=50)
@pytest.fixture
def pinn_model(device):
from model import HeatPINN
return HeatPINN().to(device)
+73
View File
@@ -0,0 +1,73 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import math
import config
def test_x_src_within_domain():
assert 0.0 <= config.X_SRC <= config.L
def test_t_step_before_t_end():
assert 0.0 < config.T_STEP < config.T_END
def test_gauss_sigma_positive():
assert config.GAUSS_SIGMA > 0.0
def test_physics_positive():
assert config.ALPHA > 0.0
assert config.K > 0.0
assert config.H_CONV > 0.0
assert config.L > 0.0
assert config.T_END > 0.0
def test_training_hyperparameters_positive():
assert config.PATIENCE > 0
assert config.EPOCHS > 0
assert config.LR_ADAM > 0.0
assert config.SCHED_MIN_LR > 0.0
assert config.SCHED_FACTOR > 0.0
assert config.SCHED_PATIENCE > 0
def test_lr_ordering():
"""Min LR deve essere inferiore all'LR iniziale."""
assert config.SCHED_MIN_LR < config.LR_ADAM
def test_sched_patience_lt_patience():
"""Lo scheduler deve poter agire prima che scatti l'early stopping."""
assert config.SCHED_PATIENCE < config.PATIENCE
def test_cfl_stability():
"""La griglia FDM deve soddisfare la condizione CFL (r ≤ 0.5)."""
dx = config.L / (config.NX - 1)
dt = config.T_END / (config.NT - 1)
r = config.ALPHA * dt / dx ** 2
assert r <= 0.5, f"CFL violata: r={r:.4f} > 0.5"
def test_grid_dimensions():
assert config.NX >= 2
assert config.NT >= 2
assert config.N_F >= 1
assert config.N_IC >= 1
assert config.N_BC >= 1
def test_pde_scale_covers_source_peak():
"""_pde_scale in model.py deve coprire il picco gaussiano della sorgente."""
from model import _pde_scale
src_peak = config.ALPHA * config.Q_VAL / (
config.K * config.GAUSS_SIGMA * math.sqrt(2 * math.pi)
)
assert _pde_scale >= src_peak ** 2 - 1e-6, (
f"_pde_scale={_pde_scale:.1f} < src_peak²={src_peak**2:.1f}: "
"la loss PDE non è normalizzata correttamente"
)
+87
View File
@@ -0,0 +1,87 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
import numpy as np
import torch
import config
# ── FDM ───────────────────────────────────────────────────────────────────────
def test_fdm_full_run():
"""Il solver FDM produce un campo di temperatura fisicamente corretto."""
from fdm.solver import solve
T, x, t = solve()
assert T.shape == (config.NX, config.NT)
assert np.isfinite(T).all()
assert T[:, -1].mean() > config.T0 # la sorgente ha scaldato il dominio
assert T.max() > config.T0 # picco sopra la IC
assert T.min() >= config.T0 - 1e-6 # nessun raffreddamento sotto T0 (no sorgenti fredde)
def test_fdm_visualizer_creates_html(tmp_path, monkeypatch):
"""Il visualizer FDM scrive almeno un file HTML senza errori."""
import fdm.visualizer as fdm_vis
monkeypatch.setattr(fdm_vis, 'BASE_DIR', str(tmp_path))
from fdm.solver import solve
T, x, t = solve()
fdm_vis.visualize_fdm(T, x, t)
html_files = list(tmp_path.rglob('*.html'))
assert len(html_files) >= 1, "Nessun file HTML generato dal visualizer FDM"
def test_pinn_visualizer_creates_html(tmp_path, monkeypatch):
"""Il visualizer PINN scrive i tre file HTML senza errori."""
import visualizer as pinn_vis
monkeypatch.setattr(pinn_vis, 'BASE_DIR', str(tmp_path))
from fdm.solver import solve as fdm_solve
T_fdm, _, _ = fdm_solve()
nx, nt = 20, 20
x_vals = np.linspace(0, config.L, nx)
t_vals = np.linspace(0, config.T_END, nt)
T_pred = np.full((nx, nt), config.T_AMB) # predizione costante (dummy)
pinn_vis.visualize_heat_field(T_pred, x_vals, t_vals, T_fdm)
html_files = list(tmp_path.rglob('*.html'))
assert len(html_files) == 3, f"Attesi 3 HTML, trovati {len(html_files)}"
# ── PINN training (lento) ─────────────────────────────────────────────────────
@pytest.mark.slow
def test_pinn_training_saves_checkpoint(tmp_path, monkeypatch):
"""Training per 30 epoche: il checkpoint viene salvato."""
import engine
save_path = str(tmp_path / 'model.pth')
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
from engine import prepare_data, train_model
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
train_model(data, epochs=30, patience=30)
assert os.path.exists(save_path)
ckpt = torch.load(save_path, map_location='cpu', weights_only=True)
assert 'state_dict' in ckpt
@pytest.mark.slow
def test_pinn_evaluate_after_training(tmp_path, monkeypatch):
"""evaluate_model gira senza errori dopo un training minimo."""
import engine
save_path = str(tmp_path / 'model.pth')
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
from engine import prepare_data, train_model, evaluate_model
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
train_model(data, epochs=30, patience=30)
evaluate_model(data)
+67
View File
@@ -0,0 +1,67 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import config
from engine import set_seed, _get_device, prepare_data
def test_set_seed_reproducibility():
set_seed(42)
r1 = torch.rand(10)
set_seed(42)
r2 = torch.rand(10)
torch.testing.assert_close(r1, r2)
def test_get_device_valid():
device = _get_device()
assert isinstance(device, torch.device)
assert device.type in ('cpu', 'cuda', 'mps')
def test_prepare_data_keys():
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
assert set(data.keys()) == {'device', 'x_f', 't_f', 'x_ic', 't_bc'}
def test_prepare_data_shapes():
N_f, N_ic, N_bc = 100, 50, 50
data = prepare_data(N_f=N_f, N_ic=N_ic, N_bc=N_bc)
# engine.py aggiunge 2 * (N_f // 4) punti di clustering
expected_f = N_f + 2 * (N_f // 4)
assert data['x_f'].shape == (expected_f,)
assert data['t_f'].shape == (expected_f,)
assert data['x_ic'].shape == (N_ic,)
assert data['t_bc'].shape == (N_bc,)
def test_prepare_data_x_bounds():
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
assert data['x_f'].min().item() >= 0.0
assert data['x_f'].max().item() <= config.L
assert data['x_ic'].min().item() >= 0.0
assert data['x_ic'].max().item() <= config.L
def test_prepare_data_t_bounds():
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
assert data['t_f'].min().item() >= 0.0
assert data['t_f'].max().item() <= config.T_END
def test_prepare_data_device_consistency():
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
expected = data['device'].type
for key in ('x_f', 't_f', 'x_ic', 't_bc'):
assert data[key].device.type == expected, f"{key} sul device sbagliato"
def test_prepare_data_deterministic():
"""Due chiamate con lo stesso seed (fissato in prepare_data) producono dati identici."""
d1 = prepare_data(N_f=100, N_ic=50, N_bc=50)
d2 = prepare_data(N_f=100, N_ic=50, N_bc=50)
torch.testing.assert_close(d1['x_f'], d2['x_f'])
torch.testing.assert_close(d1['t_f'], d2['t_f'])
torch.testing.assert_close(d1['x_ic'], d2['x_ic'])
+65
View File
@@ -0,0 +1,65 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
from engine import prepare_data
from model import HeatPINN, heat_pinn_loss
def test_data_to_model_forward():
"""prepare_data → forward: shape e device coerenti, nessun NaN."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
xt = torch.stack([data['x_f'], data['t_f']], dim=1)
out = model(xt)
assert out.shape == (data['x_f'].shape[0], 1)
assert out.device.type == data['device'].type
assert torch.isfinite(out).all()
def test_full_loss_pipeline():
"""prepare_data → heat_pinn_loss: tutti i componenti finiti e non-negativi."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
total, L_pde, L_ic, L_bc = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
for name, v in [('total', total), ('L_pde', L_pde), ('L_ic', L_ic), ('L_bc', L_bc)]:
assert torch.isfinite(v), f"{name} non è finita"
assert v.item() >= 0.0, f"{name} è negativa"
def test_backward_gradients_finite():
"""Il backward della loss non produce NaN/Inf nei parametri."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer.zero_grad()
total, _, _, _ = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
total.backward()
for p in model.parameters():
assert p.grad is not None
assert torch.isfinite(p.grad).all(), "gradiente NaN/Inf"
def test_training_loop_stable():
"""20 step di Adam non producono NaN/Inf nei parametri né nella loss."""
data = prepare_data(N_f=200, N_ic=100, N_bc=100)
model = HeatPINN().to(data['device'])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
args = (model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'])
for _ in range(20):
optimizer.zero_grad()
loss, _, _, _ = heat_pinn_loss(*args)
loss.backward()
optimizer.step()
for p in model.parameters():
assert torch.isfinite(p).all(), "parametro NaN/Inf dopo training"
total, _, _, _ = heat_pinn_loss(*args)
assert torch.isfinite(total)
+102
View File
@@ -0,0 +1,102 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import config
from model import HeatPINN, heat_pinn_loss
# ── HeatPINN.forward ─────────────────────────────────────────────────────────
def test_forward_output_shape(pinn_model, device):
xt = torch.zeros(64, 2, device=device)
xt[:, 0] = torch.rand(64) * config.L
xt[:, 1] = torch.rand(64) * config.T_END
assert pinn_model(xt).shape == (64, 1)
def test_forward_finite(pinn_model, device):
xt = torch.zeros(100, 2, device=device)
xt[:, 0] = torch.rand(100) * config.L
xt[:, 1] = torch.rand(100) * config.T_END
assert torch.isfinite(pinn_model(xt)).all()
def test_forward_zero_weights_returns_t_amb(device):
"""Con pesi nulli net(x,t)=0 ⇒ forward restituisce T_AMB per ogni input."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
xt = torch.zeros(20, 2, device=device)
xt[:, 0] = torch.rand(20) * config.L
xt[:, 1] = torch.rand(20) * config.T_END
out = model(xt)
torch.testing.assert_close(out, torch.full_like(out, config.T_AMB), atol=1e-5, rtol=0.0)
def test_forward_t_normalization(device):
"""t viene normalizzato a [0,1]: il modello deve restituire output finiti
anche a t=T_END senza saturazione di Tanh."""
model = HeatPINN().to(device)
torch.nn.init.xavier_uniform_(model.net[0].weight)
xt = torch.tensor([[0.5, 0.0], [0.5, config.T_END]], device=device)
out = model(xt)
assert out.shape == (2, 1)
assert torch.isfinite(out).all()
# ── heat_pinn_loss ────────────────────────────────────────────────────────────
def _dummy_inputs(device, n_f=100, n_ic=50, n_bc=50):
x_f = torch.rand(n_f, device=device) * config.L
t_f = torch.rand(n_f, device=device) * config.T_END
x_ic = torch.rand(n_ic, device=device) * config.L
t_bc = torch.rand(n_bc, device=device) * config.T_END
return x_f, t_f, x_ic, t_bc
def test_loss_returns_four_values(pinn_model, device):
result = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
assert len(result) == 4
def test_loss_components_non_negative(pinn_model, device):
total, L_pde, L_ic, L_bc = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
assert total.item() >= 0.0
assert L_pde.item() >= 0.0
assert L_ic.item() >= 0.0
assert L_bc.item() >= 0.0
def test_loss_finite(pinn_model, device):
for v in heat_pinn_loss(pinn_model, *_dummy_inputs(device)):
assert torch.isfinite(v), f"loss non finita: {v}"
def test_loss_weight_doubles_pde_contribution(pinn_model, device):
"""Raddoppiare w_pde con w_ic=w_bc=0 deve raddoppiare il totale."""
inputs = _dummy_inputs(device)
total1, L_pde1, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=1.0, w_ic=0.0, w_bc=0.0)
total2, L_pde2, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=2.0, w_ic=0.0, w_bc=0.0)
# L_pde deve essere identico tra le due chiamate (stesso modello, stessi dati)
torch.testing.assert_close(L_pde1, L_pde2, atol=1e-5, rtol=1e-4)
torch.testing.assert_close(total2, 2.0 * total1, atol=1e-5, rtol=1e-4)
def test_ic_loss_zero_when_net_is_zero(device):
"""Con net=0 ⇒ T = T_AMB = T0 ⇒ L_ic = 0."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
_, _, L_ic, _ = heat_pinn_loss(model, *_dummy_inputs(device))
assert L_ic.item() < 1e-8
def test_bc_loss_zero_when_net_is_zero(device):
"""Con net=0 ⇒ T = T_AMB e dT/dx = 0 ⇒ Robin BC soddisfatta ⇒ L_bc = 0."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
_, _, _, L_bc = heat_pinn_loss(model, *_dummy_inputs(device))
assert L_bc.item() < 1e-8
+14 -19
View File
@@ -1,20 +1,24 @@
import os
from datetime import datetime
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import config
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ANIMATIONS_DIR = os.path.join(BASE_DIR, 'animations')
def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
os.makedirs(ANIMATIONS_DIR, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_dir = os.path.join(BASE_DIR, 'results', 'pinn', timestamp)
os.makedirs(out_dir, exist_ok=True)
# Downsample T_fdm from shape (NX, NT) to (NX, len(t_vals))
# Downsample T_fdm from shape (NX_fdm, NT_fdm) to match PINN grid
nx_pred = len(x_vals)
nt_pred = len(t_vals)
x_indices = np.linspace(0, T_fdm.shape[0] - 1, nx_pred, dtype=int)
t_indices = np.linspace(0, T_fdm.shape[1] - 1, nt_pred, dtype=int)
T_fdm_ds = T_fdm[:, t_indices] # now shape (NX, nt_pred)
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)]
# --- Static heatmap: PINN vs FDM ---
fig_map = make_subplots(
@@ -42,9 +46,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
fig_map.update_yaxes(title_text='t', row=1, col=1)
fig_map.update_layout(title_text='Heat Equation PINN vs FDM', height=450)
map_path = _next_path('heatmap', '.html')
map_path = os.path.join(out_dir, 'heatmap.html')
fig_map.write_html(map_path)
print(f"Heatmap saved → {map_path}")
print(f"Heatmap saved {map_path}")
# --- Animated profile T(x) evolving in time ---
n_frames = len(t_vals)
@@ -90,9 +94,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
frames=frames,
)
anim_path = _next_path('heat_animation', '.html')
anim_path = os.path.join(out_dir, 'animation.html')
fig_anim.write_html(anim_path)
print(f"Animation saved → {anim_path}")
print(f"Animation saved {anim_path}")
# --- Time-series comparison at fixed spatial points ---
# Spatial indices for x=0, x=L/2, x=L
@@ -139,15 +143,6 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
height=500,
)
comparison_path = _next_path('comparison', '.html')
comparison_path = os.path.join(out_dir, 'comparison.html')
fig_ts.write_html(comparison_path)
print(f"Time-series saved → {comparison_path}")
def _next_path(prefix, ext):
i = 1
while True:
path = os.path.join(ANIMATIONS_DIR, f'{prefix}_{i:03d}{ext}')
if not os.path.exists(path):
return path
i += 1
print(f"Comparison saved {comparison_path}")