Compare commits
20 Commits
687ff45512
...
5a8cd3ef46
| Author | SHA1 | Date | |
|---|---|---|---|
| 5a8cd3ef46 | |||
| f94fd51942 | |||
| b8301a4329 | |||
| 1237a57290 | |||
| e868c47190 | |||
| 649d26cfd4 | |||
| 256945ada3 | |||
| f02c5f2bbe | |||
| 9e77deffd5 | |||
| bca829bd7e | |||
| 98bfc78651 | |||
| fbb0458f69 | |||
| 4f050e80df | |||
| b5553691e8 | |||
| a655e53551 | |||
| fa5ce5d19c | |||
| 1a530e86ba | |||
| 54a7e9aed9 | |||
| 19207cabe4 | |||
| b663a89abd |
@@ -98,6 +98,7 @@ StyleCopReport.xml
|
||||
*.svclog
|
||||
*.scc
|
||||
.venv
|
||||
.venv*
|
||||
|
||||
# Chutzpah Test files
|
||||
_Chutzpah*
|
||||
|
||||
@@ -6,16 +6,22 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
||||
|
||||
Write all git commit messages in Italian.
|
||||
|
||||
## Scope of Work
|
||||
|
||||
Lavora su tutto il repository: `fdm/`, `model.py`, `engine.py`, `visualizer.py`, `app.py`, `config.py`. Aiuta con migliorie, bugfix e ottimizzazioni su qualsiasi file del progetto.
|
||||
|
||||
## Project Overview
|
||||
|
||||
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with physical boundary conditions:
|
||||
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with an internal point heat source:
|
||||
|
||||
```
|
||||
∂T/∂t = α ∂²T/∂x² x ∈ [0, L], t ∈ [0, T_END]
|
||||
∂T/∂t = α ∂²T/∂x² + (α/k) Q(t) δ(x − X_SRC) x ∈ [0, L], t ∈ [0, T_END]
|
||||
```
|
||||
|
||||
where `Q(t) = Q_VAL` if `t ≥ T_STEP` else `0`.
|
||||
|
||||
- **IC:** `T(x, 0) = T0` (uniform)
|
||||
- **BC x=0:** Neumann — heat flux step: `−k ∂T/∂x = Q(t)` where `Q = Q_VAL` if `t ≥ T_STEP` else `0`
|
||||
- **BC x=0:** Robin — convection: `−k ∂T/∂x = h (T − T_AMB)`
|
||||
- **BC x=L:** Robin — convection: `−k ∂T/∂x = h (T − T_AMB)`
|
||||
|
||||
No experimental data is needed. A `fdm/` module provides a reference numerical solution (FTCS explicit scheme) used for evaluation and visualization comparison.
|
||||
@@ -59,7 +65,7 @@ model.py ← HeatPINN + heat_pinn_loss()
|
||||
engine.py ← data sampling, Adam+L-BFGS training, evaluation vs FDM, visualization call
|
||||
visualizer.py ← PINN vs FDM: heatmap, animated T(x), time-series at fixed points
|
||||
fdm/
|
||||
solver.py ← FTCS explicit scheme, ghost-cell Neumann, explicit Robin
|
||||
solver.py ← FTCS explicit scheme, Robin on both ends, point source at X_SRC
|
||||
visualizer.py ← same 3 plot types for FDM-only output
|
||||
app.py ← FDM CLI menu
|
||||
```
|
||||
@@ -74,11 +80,11 @@ T = T_AMB + (Q_VAL · L / K) · net(x, t)
|
||||
```
|
||||
This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are O(1) for the network to learn. Do not remove this scaling.
|
||||
|
||||
`heat_pinn_loss()` normalizes all three loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. Changing physical parameters in `config.py` does not require re-tuning loss weights.
|
||||
`heat_pinn_loss()` normalizes all four loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. The PDE residual includes the Gaussian-smoothed source term (σ=0.02) as a continuous approximation to δ(x − X_SRC). Changing physical parameters in `config.py` does not require re-tuning loss weights.
|
||||
|
||||
### Training (`engine.py`)
|
||||
|
||||
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=0` (steep Neumann gradient) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low.
|
||||
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=X_SRC` (steep gradient at source) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low.
|
||||
|
||||
`train_model()` runs **Adam first, then L-BFGS fine-tuning**. L-BFGS uses a closure that captures loss components in `_last` dict (avoids calling `heat_pinn_loss` outside an active grad context).
|
||||
|
||||
@@ -87,8 +93,8 @@ This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are
|
||||
### FDM Solver (`fdm/solver.py`)
|
||||
|
||||
Returns `(T_matrix[NX, NT], x_vals, t_vals)`. Uses:
|
||||
- Ghost cell for Neumann: `T_ghost = T[1] + 2·dx·Q(t)/k`
|
||||
- Explicit Robin at x=L: `T[N] = (T[N−1] + dx·h/k·T_amb) / (1 + dx·h/k)` — uses `T_cur[-2]`, not `T_new[-2]`
|
||||
- Robin BC on both ends: `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`
|
||||
- Point source injected at node `i_src = argmin|x - X_SRC|` after BCs: `T[i_src] += Q·α·dt/(k·dx)`
|
||||
- CFL check at startup (warns, does not crash)
|
||||
|
||||
### Loss Scaling Notes
|
||||
|
||||
@@ -5,7 +5,7 @@ import engine
|
||||
def print_header():
|
||||
print("=" * 55)
|
||||
print(" Heat Equation PINN — ∂T/∂t = α ∂²T/∂x²")
|
||||
print(" Neumann BC (x=0) + Robin BC (x=L)")
|
||||
print(" Robin BC (x=0, x=L) + point source @ X_SRC")
|
||||
print("=" * 55)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
BASE_DIR="$(dirname "$0")"
|
||||
|
||||
cleanup_dir() {
|
||||
local LABEL="$1"
|
||||
local DIR="$2"
|
||||
|
||||
if [[ ! -d "$DIR" ]]; then
|
||||
echo "Nessuna cartella $DIR trovata."
|
||||
return
|
||||
fi
|
||||
|
||||
mapfile -t RUNS < <(find "$DIR" -mindepth 1 -maxdepth 1 -type d | sort)
|
||||
|
||||
if [[ ${#RUNS[@]} -eq 0 ]]; then
|
||||
echo "Nessun risultato $LABEL da cancellare."
|
||||
return
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Risultati $LABEL disponibili:"
|
||||
for i in "${!RUNS[@]}"; do
|
||||
printf " %2d. %s\n" "$((i+1))" "$(basename "${RUNS[$i]}")"
|
||||
done
|
||||
echo ""
|
||||
echo "a. Cancella tutti"
|
||||
echo "s. Selezione manuale"
|
||||
echo "0. Annulla"
|
||||
echo ""
|
||||
read -rp "Scelta [$LABEL]: " MODE
|
||||
|
||||
case "$MODE" in
|
||||
a|A)
|
||||
read -rp "Confermi la cancellazione di ${#RUNS[@]} cartelle $LABEL? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${RUNS[@]}"
|
||||
echo "Cancellati ${#RUNS[@]} risultati $LABEL."
|
||||
else
|
||||
echo "Annullato."
|
||||
fi
|
||||
;;
|
||||
s|S)
|
||||
read -rp "Numeri da cancellare (es. 1 3 5): " -a CHOICES
|
||||
TO_DELETE=()
|
||||
for N in "${CHOICES[@]}"; do
|
||||
if [[ "$N" =~ ^[0-9]+$ ]] && (( N >= 1 && N <= ${#RUNS[@]} )); then
|
||||
TO_DELETE+=("${RUNS[$((N-1))]}")
|
||||
else
|
||||
echo "Indice non valido: $N (ignorato)"
|
||||
fi
|
||||
done
|
||||
if [[ ${#TO_DELETE[@]} -eq 0 ]]; then
|
||||
echo "Nessuna selezione valida."
|
||||
return
|
||||
fi
|
||||
echo "Verranno cancellati:"
|
||||
for D in "${TO_DELETE[@]}"; do
|
||||
echo " - $(basename "$D")"
|
||||
done
|
||||
read -rp "Confermi? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${TO_DELETE[@]}"
|
||||
echo "Cancellati ${#TO_DELETE[@]} risultati $LABEL."
|
||||
else
|
||||
echo "Annullato."
|
||||
fi
|
||||
;;
|
||||
0|"")
|
||||
echo "Annullato."
|
||||
;;
|
||||
*)
|
||||
echo "Scelta non valida."
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
echo "Cosa vuoi ripulire?"
|
||||
echo " 1. Risultati FDM (results/fdm/)"
|
||||
echo " 2. Risultati PINN (results/pinn/)"
|
||||
echo " 3. Entrambi"
|
||||
echo " 0. Annulla"
|
||||
echo ""
|
||||
read -rp "Scelta: " CHOICE
|
||||
|
||||
case "$CHOICE" in
|
||||
1) cleanup_dir "FDM" "$BASE_DIR/results/fdm" ;;
|
||||
2) cleanup_dir "PINN" "$BASE_DIR/results/pinn" ;;
|
||||
3)
|
||||
cleanup_dir "FDM" "$BASE_DIR/results/fdm"
|
||||
cleanup_dir "PINN" "$BASE_DIR/results/pinn"
|
||||
;;
|
||||
0|"") echo "Annullato." ;;
|
||||
*) echo "Scelta non valida." ; exit 1 ;;
|
||||
esac
|
||||
@@ -19,3 +19,32 @@ T_END = 10.0 # fine simulazione [s]
|
||||
# Griglia FDM
|
||||
NX = 250 # nodi spaziali
|
||||
NT = 15000 # passi temporali (verifica CFL automatica)
|
||||
|
||||
# Sorgente gaussiana (approssimazione continua del delta di Dirac)
|
||||
GAUSS_SIGMA = 0.01 # larghezza del picco gaussiano [m]
|
||||
|
||||
# Architettura PINN
|
||||
HIDDEN_SIZE = 128 # neuroni per layer nascosto
|
||||
N_HIDDEN_LAYERS = 4 # numero di layer nascosti
|
||||
|
||||
# Sampling punti di collocazione
|
||||
N_F = 6000 # punti PDE (+ 50% clustering automatico vicino a X_SRC e T_STEP)
|
||||
N_IC = 400 # punti condizione iniziale
|
||||
N_BC = 400 # punti condizioni al contorno
|
||||
|
||||
# Training Adam
|
||||
EPOCHS = 5000 # epoche massime
|
||||
PATIENCE = 500 # early stopping
|
||||
LR_ADAM = 1e-3 # learning rate iniziale
|
||||
SCHED_FACTOR = 0.5 # ReduceLROnPlateau: fattore di riduzione
|
||||
SCHED_PATIENCE = 150 # ReduceLROnPlateau: patience
|
||||
SCHED_MIN_LR = 1e-6 # ReduceLROnPlateau: lr minimo
|
||||
|
||||
# Fine-tuning L-BFGS
|
||||
LR_LBFGS = 0.1 # learning rate L-BFGS
|
||||
LBFGS_STEPS = 200 # numero di step L-BFGS
|
||||
|
||||
# Pesi della loss
|
||||
W_PDE = 10.0 # peso residuo PDE
|
||||
W_IC = 1.0 # peso condizione iniziale
|
||||
W_BC = 5.0 # peso condizioni al contorno
|
||||
|
||||
@@ -35,7 +35,10 @@ def _get_device():
|
||||
return torch.device('cpu')
|
||||
|
||||
|
||||
def prepare_data(N_f=4000, N_ic=400, N_bc=400):
|
||||
def prepare_data(N_f=None, N_ic=None, N_bc=None):
|
||||
if N_f is None: N_f = config.N_F
|
||||
if N_ic is None: N_ic = config.N_IC
|
||||
if N_bc is None: N_bc = config.N_BC
|
||||
set_seed(42)
|
||||
device = _get_device()
|
||||
|
||||
@@ -43,16 +46,17 @@ def prepare_data(N_f=4000, N_ic=400, N_bc=400):
|
||||
x_f = torch.rand(N_f, device=device) * config.L
|
||||
t_f = torch.rand(N_f, device=device) * config.T_END
|
||||
|
||||
# Extra points near x=0 (steep Neumann gradient) and t=T_STEP (flux step)
|
||||
# Extra points near X_SRC (steep gradient at source) and t=T_STEP (flux step)
|
||||
n_extra = N_f // 4
|
||||
x_near0 = torch.rand(n_extra, device=device) * config.L * 0.1 # x in [0, 0.1]
|
||||
t_near0 = torch.rand(n_extra, device=device) * config.T_END
|
||||
x_near_src = config.X_SRC + (torch.rand(n_extra, device=device) - 0.5) * config.L * 0.1
|
||||
x_near_src = x_near_src.clamp(0, config.L)
|
||||
t_near_src = torch.rand(n_extra, device=device) * config.T_END
|
||||
x_step = torch.rand(n_extra, device=device) * config.L
|
||||
t_step = config.T_STEP + (torch.rand(n_extra, device=device) - 0.5) * 0.1 # t near T_STEP
|
||||
t_step = t_step.clamp(0, config.T_END)
|
||||
|
||||
x_f = torch.cat([x_f, x_near0, x_step])
|
||||
t_f = torch.cat([t_f, t_near0, t_step])
|
||||
x_f = torch.cat([x_f, x_near_src, x_step])
|
||||
t_f = torch.cat([t_f, t_near_src, t_step])
|
||||
|
||||
x_ic = torch.rand(N_ic, device=device) * config.L
|
||||
t_bc = torch.rand(N_bc, device=device) * config.T_END
|
||||
@@ -60,19 +64,24 @@ def prepare_data(N_f=4000, N_ic=400, N_bc=400):
|
||||
return {'device': device, 'x_f': x_f, 't_f': t_f, 'x_ic': x_ic, 't_bc': t_bc}
|
||||
|
||||
|
||||
def train_model(data, epochs=5000, patience=100):
|
||||
def train_model(data, epochs=None, patience=None):
|
||||
if epochs is None: epochs = config.EPOCHS
|
||||
if patience is None: patience = config.PATIENCE
|
||||
device = data['device']
|
||||
model = HeatPINN().to(device)
|
||||
optimizer = optim.Adam(model.parameters(), lr=1e-3)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=30, min_lr=1e-6)
|
||||
optimizer = optim.Adam(model.parameters(), lr=config.LR_ADAM)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min',
|
||||
factor=config.SCHED_FACTOR,
|
||||
patience=config.SCHED_PATIENCE,
|
||||
min_lr=config.SCHED_MIN_LR)
|
||||
|
||||
os.makedirs(MODELS_DIR, exist_ok=True)
|
||||
best_loss = float('inf')
|
||||
patience_counter = 0
|
||||
|
||||
print(f"\n--- Heat PINN Training (Adam) on {device} ---")
|
||||
model.train()
|
||||
for epoch in range(epochs):
|
||||
model.train()
|
||||
optimizer.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
@@ -100,33 +109,33 @@ def train_model(data, epochs=5000, patience=100):
|
||||
|
||||
# L-BFGS fine-tuning phase (standard PINN practice for convergence to better minima)
|
||||
print("\n--- L-BFGS fine-tuning ---")
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
|
||||
lbfgs = optim.LBFGS(model.parameters(), lr=0.1, max_iter=50,
|
||||
lbfgs = optim.LBFGS(model.parameters(), lr=config.LR_LBFGS, max_iter=50,
|
||||
history_size=50, tolerance_grad=1e-7, line_search_fn='strong_wolfe')
|
||||
|
||||
_last = {}
|
||||
|
||||
for step in range(20):
|
||||
def closure():
|
||||
lbfgs.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
loss.backward()
|
||||
_last['loss'] = loss.item()
|
||||
_last['pde'] = L_pde.item()
|
||||
_last['ic'] = L_ic.item()
|
||||
_last['bc'] = L_bc.item()
|
||||
return loss
|
||||
def closure():
|
||||
lbfgs.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
loss.backward()
|
||||
_last['loss'] = loss.item()
|
||||
_last['pde'] = L_pde.item()
|
||||
_last['ic'] = L_ic.item()
|
||||
_last['bc'] = L_bc.item()
|
||||
return loss
|
||||
|
||||
for step in range(config.LBFGS_STEPS):
|
||||
lbfgs.step(closure)
|
||||
if _last['loss'] < best_loss:
|
||||
best_loss = _last['loss']
|
||||
torch.save({'state_dict': model.state_dict()}, MODEL_SAVE_PATH)
|
||||
if (step + 1) % 5 == 0:
|
||||
print(f"L-BFGS step {step+1}/20 | Loss: {_last['loss']:.6f} "
|
||||
print(f"L-BFGS step {step+1}/{config.LBFGS_STEPS} | Loss: {_last['loss']:.6f} "
|
||||
f"| PDE: {_last['pde']:.6f} | IC: {_last['ic']:.6f} | BC: {_last['bc']:.6f}")
|
||||
|
||||
print("Training complete! Model saved.")
|
||||
@@ -136,7 +145,7 @@ def _load_model(device):
|
||||
if not os.path.exists(MODEL_SAVE_PATH):
|
||||
print("Error: model not found. Train the model first.")
|
||||
return None
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
model = HeatPINN().to(device)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
model.eval()
|
||||
@@ -163,10 +172,11 @@ def evaluate_model(data):
|
||||
from fdm.solver import solve as fdm_solve
|
||||
T_fdm, _, _ = fdm_solve()
|
||||
|
||||
# Downsample FDM time axis (NX=100, NT=5000) to match PINN grid (100x100)
|
||||
# Downsample FDM grid (NX, NT) to match PINN prediction grid (100x100)
|
||||
nx, nt = T_pred.shape
|
||||
x_indices = np.linspace(0, T_fdm.shape[0] - 1, nx, dtype=int)
|
||||
t_indices = np.linspace(0, T_fdm.shape[1] - 1, nt, dtype=int)
|
||||
T_fdm_ds = T_fdm[:, t_indices] # (100, 100)
|
||||
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)] # (100, 100)
|
||||
|
||||
l2_rel = np.sqrt(np.mean((T_pred - T_fdm_ds) ** 2)) / np.sqrt(np.mean(T_fdm_ds ** 2))
|
||||
max_err = np.max(np.abs(T_pred - T_fdm_ds))
|
||||
|
||||
+4
-7
@@ -24,26 +24,23 @@ def main_menu():
|
||||
print("\n" + "-" * 30)
|
||||
print(" MAIN MENU")
|
||||
print("-" * 30)
|
||||
print("1. Risolvi e salva risultati")
|
||||
print("2. Heatmap T(x,t)")
|
||||
print("3. Animazione T(x) nel tempo")
|
||||
print("4. Grafico T(t) in punti fissi")
|
||||
print("1. Risolvi")
|
||||
print("2. Visualizza")
|
||||
print("0. Esci")
|
||||
print("-" * 30)
|
||||
|
||||
choice = input("Select an option (0-4): ").strip()
|
||||
choice = input("Select an option (0-2): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
T, x_vals, t_vals = solve()
|
||||
print(f"Soluzione completata. Shape T: {T.shape}")
|
||||
print(f"T range: [{T.min():.2f}, {T.max():.2f}] °C")
|
||||
|
||||
elif choice in ("2", "3", "4"):
|
||||
elif choice == "2":
|
||||
if T is None:
|
||||
print("Eseguire prima l'opzione 1.")
|
||||
else:
|
||||
visualize_fdm(T, x_vals, t_vals)
|
||||
print("Grafici salvati in animations/fdm/")
|
||||
|
||||
elif choice == "0":
|
||||
print("Uscita.")
|
||||
|
||||
+102
-39
@@ -1,5 +1,6 @@
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
|
||||
@@ -7,16 +8,6 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import config
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
FDM_ANIM_DIR = os.path.join(BASE_DIR, 'animations', 'fdm')
|
||||
|
||||
|
||||
def _next_path(base_dir, prefix, ext):
|
||||
i = 1
|
||||
while True:
|
||||
path = os.path.join(base_dir, f'{prefix}_{i:03d}{ext}')
|
||||
if not os.path.exists(path):
|
||||
return path
|
||||
i += 1
|
||||
|
||||
|
||||
def visualize_fdm(T_matrix, x_vals, t_vals):
|
||||
@@ -31,55 +22,127 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
|
||||
t_vals : np.ndarray, shape (NT,)
|
||||
Time values.
|
||||
"""
|
||||
os.makedirs(FDM_ANIM_DIR, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
out_dir = os.path.join(BASE_DIR, 'results', 'fdm', timestamp)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 1. Heatmap
|
||||
# 1. heatmap.html: plot statico T(x,t) + striscia 1D animata
|
||||
# Due figure Plotly indipendenti nello stesso file HTML per
|
||||
# evitare conflitti tra animazioni e assi multipli.
|
||||
# ------------------------------------------------------------------
|
||||
zmax = float(np.max(np.abs(T_matrix - config.T0)))
|
||||
# Use symmetric range centred on T0 if there is any variation,
|
||||
# otherwise fall back to the raw range.
|
||||
z_data = T_matrix.T # shape (NT, NX) — rows=time, cols=space
|
||||
zmin_val = float(np.min(T_matrix))
|
||||
zmax_val = float(np.max(T_matrix))
|
||||
|
||||
if zmax > 0:
|
||||
z_center = float(np.mean(T_matrix))
|
||||
z_abs = float(np.max(np.abs(T_matrix - z_center)))
|
||||
zmin_sym = z_center - z_abs
|
||||
zmax_sym = z_center + z_abs
|
||||
else:
|
||||
zmin_sym = float(np.min(T_matrix))
|
||||
zmax_sym = float(np.max(T_matrix))
|
||||
# Subsample frames (shared with animation below)
|
||||
n_frames = len(t_vals)
|
||||
max_frames = 200
|
||||
step = max(1, n_frames // max_frames)
|
||||
frame_indices = list(range(0, n_frames, step))
|
||||
|
||||
fig_heatmap = go.Figure(go.Heatmap(
|
||||
# --- Figura A: heatmap 2D statica (identica all'originale) ---
|
||||
z_data = T_matrix.T # (NT, NX)
|
||||
z_center = float(np.mean(T_matrix))
|
||||
z_abs = float(np.max(np.abs(T_matrix - z_center))) or 1.0
|
||||
fig_static = go.Figure(go.Heatmap(
|
||||
z=z_data,
|
||||
x=x_vals,
|
||||
y=t_vals,
|
||||
colorscale='RdBu_r',
|
||||
zmin=zmin_sym,
|
||||
zmax=zmax_sym,
|
||||
zmin=z_center - z_abs,
|
||||
zmax=z_center + z_abs,
|
||||
colorbar=dict(title='T [°C]'),
|
||||
))
|
||||
fig_heatmap.update_layout(
|
||||
fig_static.update_layout(
|
||||
title='FDM — Heat Equation T(x,t)',
|
||||
xaxis_title='x [m]',
|
||||
yaxis_title='t [s]',
|
||||
height=500,
|
||||
height=480,
|
||||
margin=dict(t=50, b=50, l=70, r=90),
|
||||
)
|
||||
|
||||
heatmap_path = _next_path(FDM_ANIM_DIR, 'heatmap', '.html')
|
||||
fig_heatmap.write_html(heatmap_path)
|
||||
# --- Figura B: striscia 1D animata ---
|
||||
strip_frames = [
|
||||
go.Frame(
|
||||
data=[go.Heatmap(
|
||||
z=T_matrix[:, idx].reshape(1, -1),
|
||||
x=x_vals,
|
||||
y=[0],
|
||||
colorscale='Jet',
|
||||
zmin=zmin_val,
|
||||
zmax=zmax_val,
|
||||
showscale=True,
|
||||
colorbar=dict(title='T [°C]', thickness=18),
|
||||
)],
|
||||
name=str(idx),
|
||||
layout=go.Layout(title_text=f't = {t_vals[idx]:.3f} s'),
|
||||
)
|
||||
for idx in frame_indices
|
||||
]
|
||||
|
||||
fig_strip = go.Figure(
|
||||
data=[go.Heatmap(
|
||||
z=T_matrix[:, frame_indices[0]].reshape(1, -1),
|
||||
x=x_vals,
|
||||
y=[0],
|
||||
colorscale='Jet',
|
||||
zmin=zmin_val,
|
||||
zmax=zmax_val,
|
||||
showscale=True,
|
||||
colorbar=dict(title='T [°C]', thickness=18),
|
||||
)],
|
||||
frames=strip_frames,
|
||||
layout=go.Layout(
|
||||
title=f't = {t_vals[frame_indices[0]]:.3f} s',
|
||||
xaxis=dict(title='x [m]'),
|
||||
yaxis=dict(showticklabels=False, showgrid=False,
|
||||
zeroline=False, fixedrange=True),
|
||||
height=300,
|
||||
margin=dict(t=80, b=130, l=70, r=110),
|
||||
updatemenus=[dict(
|
||||
type='buttons',
|
||||
showactive=False,
|
||||
y=1.22, x=0.5, xanchor='center',
|
||||
buttons=[
|
||||
dict(label='▶ Play', method='animate',
|
||||
args=[None, dict(frame=dict(duration=40, redraw=True),
|
||||
fromcurrent=True, mode='immediate')]),
|
||||
dict(label='⏸ Pause', method='animate',
|
||||
args=[[None], dict(frame=dict(duration=0, redraw=False),
|
||||
mode='immediate')]),
|
||||
],
|
||||
)],
|
||||
sliders=[dict(
|
||||
steps=[
|
||||
dict(method='animate',
|
||||
args=[[str(idx)], dict(mode='immediate',
|
||||
frame=dict(duration=0, redraw=True))],
|
||||
label=f'{t_vals[idx]:.2f}')
|
||||
for idx in frame_indices
|
||||
],
|
||||
transition=dict(duration=0),
|
||||
x=0.05, y=0, len=0.9,
|
||||
currentvalue=dict(prefix='t = ', font=dict(size=14)),
|
||||
)],
|
||||
),
|
||||
)
|
||||
|
||||
# Scrivi entrambe le figure in un unico file HTML
|
||||
html_static = fig_static.to_html(full_html=False, include_plotlyjs='cdn')
|
||||
html_strip = fig_strip.to_html(full_html=False, include_plotlyjs=False)
|
||||
heatmap_path = os.path.join(out_dir, 'heatmap.html')
|
||||
with open(heatmap_path, 'w', encoding='utf-8') as _f:
|
||||
_f.write('<!DOCTYPE html><html><head><meta charset="utf-8"></head><body>\n')
|
||||
_f.write(html_static)
|
||||
_f.write('\n')
|
||||
_f.write(html_strip)
|
||||
_f.write('\n</body></html>')
|
||||
print(f"Heatmap saved → {heatmap_path}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 2. Animated profile T(x) evolving in time
|
||||
# ------------------------------------------------------------------
|
||||
L = config.L
|
||||
n_frames = len(t_vals)
|
||||
|
||||
# Subsample frames for a manageable animation (max ~200 frames)
|
||||
max_frames = 200
|
||||
step = max(1, n_frames // max_frames)
|
||||
frame_indices = list(range(0, n_frames, step))
|
||||
|
||||
y_min = float(np.min(T_matrix)) - 1.0
|
||||
y_max = float(np.max(T_matrix)) + 1.0
|
||||
@@ -168,7 +231,7 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
|
||||
frames=frames,
|
||||
)
|
||||
|
||||
anim_path = _next_path(FDM_ANIM_DIR, 'animation', '.html')
|
||||
anim_path = os.path.join(out_dir, 'animation.html')
|
||||
fig_anim.write_html(anim_path)
|
||||
print(f"Animation saved → {anim_path}")
|
||||
|
||||
@@ -222,6 +285,6 @@ def visualize_fdm(T_matrix, x_vals, t_vals):
|
||||
height=480,
|
||||
)
|
||||
|
||||
ts_path = _next_path(FDM_ANIM_DIR, 'time_series', '.html')
|
||||
ts_path = os.path.join(out_dir, 'time_series.html')
|
||||
fig_ts.write_html(ts_path)
|
||||
print(f"Time-series saved → {ts_path}")
|
||||
|
||||
@@ -6,55 +6,66 @@ import config
|
||||
class HeatPINN(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(2, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 1),
|
||||
)
|
||||
h = config.HIDDEN_SIZE
|
||||
layers = [nn.Linear(2, h), nn.Tanh()]
|
||||
for _ in range(config.N_HIDDEN_LAYERS - 1):
|
||||
layers += [nn.Linear(h, h), nn.Tanh()]
|
||||
layers.append(nn.Linear(h, 1))
|
||||
self.net = nn.Sequential(*layers)
|
||||
|
||||
def forward(self, x):
|
||||
# Output scaled to physical range: T_AMB + (Q*L/K) * net
|
||||
# net learns dimensionless perturbation in [0,1] range
|
||||
T_scale = config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(x)
|
||||
return T_scale
|
||||
def forward(self, xt):
|
||||
x = xt[:, :1] / config.L
|
||||
t = xt[:, 1:] / config.T_END
|
||||
return config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(torch.cat([x, t], dim=1))
|
||||
|
||||
|
||||
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc, w_pde=1.0, w_ic=1.0, w_bc=10.0):
|
||||
# Characteristic scales for normalization
|
||||
T_char = config.Q_VAL * config.L / config.K # ~50 °C — temperature scale
|
||||
grad_char = (config.Q_VAL / config.K) ** 2 # ~2500 — gradient scale²
|
||||
# Precomputed loss scales (depend only on config constants)
|
||||
_T_char = config.Q_VAL * config.L / config.K # ~150 °C — temperature scale
|
||||
# _pde_scale covers both dT/dt and the Gaussian source peak (dominant with small sigma)
|
||||
_src_peak = config.ALPHA * config.Q_VAL / (config.K * config.GAUSS_SIGMA * (2 * 3.141592653589793) ** 0.5)
|
||||
_pde_scale = max((_T_char / config.T_END) ** 2, _src_peak ** 2) + 1e-8
|
||||
# Robin BC residual scale: max(dT/dx, H_CONV/K * T_char) — convective term dominates when H*L/K >> 1
|
||||
_bc_scale = max(config.Q_VAL / config.K,
|
||||
config.H_CONV * _T_char / config.K) ** 2
|
||||
|
||||
# PDE residual: dT/dt - alpha * d2T/dx2 = 0 (normalized by T_char/t_char)
|
||||
|
||||
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc,
|
||||
w_pde=None, w_ic=None, w_bc=None):
|
||||
if w_pde is None: w_pde = config.W_PDE
|
||||
if w_ic is None: w_ic = config.W_IC
|
||||
if w_bc is None: w_bc = config.W_BC
|
||||
T_char = _T_char
|
||||
|
||||
# PDE residual: dT/dt - alpha * d2T/dx2 - source(x,t) = 0 (normalized by T_char/t_char)
|
||||
x_f = x_f.detach().requires_grad_(True)
|
||||
t_f = t_f.detach().requires_grad_(True)
|
||||
T_f = model(torch.stack([x_f, t_f], dim=1))
|
||||
dT_dt = torch.autograd.grad(T_f.sum(), t_f, create_graph=True)[0]
|
||||
dT_dx = torch.autograd.grad(T_f.sum(), x_f, create_graph=True)[0]
|
||||
dT_dt, dT_dx = torch.autograd.grad(T_f.sum(), [t_f, x_f], create_graph=True)
|
||||
d2T_dx2 = torch.autograd.grad(dT_dx.sum(), x_f, create_graph=True)[0]
|
||||
pde_scale = (T_char / config.T_END) ** 2 + 1e-8
|
||||
L_pde = ((dT_dt - config.ALPHA * d2T_dx2) ** 2).mean() / pde_scale
|
||||
Q_t_f = torch.where(t_f >= config.T_STEP,
|
||||
torch.tensor(config.Q_VAL, device=t_f.device, dtype=t_f.dtype),
|
||||
torch.tensor(0.0, device=t_f.device, dtype=t_f.dtype))
|
||||
sigma = config.GAUSS_SIGMA
|
||||
gauss = torch.exp(-0.5 * ((x_f - config.X_SRC) / sigma) ** 2) / (sigma * (2 * torch.pi) ** 0.5)
|
||||
source_term = (config.ALPHA / config.K) * Q_t_f * gauss
|
||||
L_pde = ((dT_dt - config.ALPHA * d2T_dx2 - source_term) ** 2).mean() / _pde_scale
|
||||
|
||||
# IC: T(x, 0) = T0 — normalized by T_char²
|
||||
T_ic_pred = model(torch.stack([x_ic, torch.zeros_like(x_ic)], dim=1))
|
||||
T_ic_true = torch.full_like(T_ic_pred, config.T0)
|
||||
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (T_char ** 2 + 1e-8)
|
||||
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (_T_char ** 2 + 1e-8)
|
||||
|
||||
# BC x=0: Neumann — dT/dx + Q(t)/K = 0
|
||||
# BC x=0: Robin — dT/dx + H_CONV/K * (T(0,t) - T_AMB) = 0
|
||||
x_left = torch.zeros(t_bc.shape[0], device=t_bc.device).requires_grad_(True)
|
||||
T_left = model(torch.stack([x_left, t_bc.detach()], dim=1))
|
||||
dT_dx_left = torch.autograd.grad(T_left.sum(), x_left, create_graph=True)[0]
|
||||
Q_t = torch.where(t_bc >= config.T_STEP,
|
||||
torch.tensor(config.Q_VAL, device=t_bc.device, dtype=t_bc.dtype),
|
||||
torch.tensor(0.0, device=t_bc.device, dtype=t_bc.dtype))
|
||||
L_bc_left = ((dT_dx_left + Q_t / config.K) ** 2).mean() / grad_char
|
||||
L_bc_left = ((dT_dx_left + (config.H_CONV / config.K) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
|
||||
|
||||
# BC x=L: Robin — dT/dx + H_CONV/K * (T(L,t) - T_AMB) = 0
|
||||
x_right = torch.full((t_bc.shape[0],), config.L, device=t_bc.device).requires_grad_(True)
|
||||
T_right = model(torch.stack([x_right, t_bc.detach()], dim=1))
|
||||
dT_dx_right = torch.autograd.grad(T_right.sum(), x_right, create_graph=True)[0]
|
||||
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / grad_char
|
||||
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
|
||||
|
||||
L_bc = L_bc_left + L_bc_right
|
||||
total = w_pde * L_pde + w_ic * L_ic + w_bc * L_bc
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
addopts = -v --tb=short
|
||||
markers =
|
||||
slow: test lenti (training completo) — escludi con -m "not slow"
|
||||
@@ -1,4 +1,5 @@
|
||||
torch>=2.0.0
|
||||
pytest>=7.0.0
|
||||
pandas>=2.0.0
|
||||
numpy>=1.24.0
|
||||
scikit-learn>=1.3.0
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def device():
|
||||
from engine import _get_device
|
||||
return _get_device()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def small_data():
|
||||
from engine import prepare_data
|
||||
return prepare_data(N_f=200, N_ic=50, N_bc=50)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pinn_model(device):
|
||||
from model import HeatPINN
|
||||
return HeatPINN().to(device)
|
||||
@@ -0,0 +1,73 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import math
|
||||
import config
|
||||
|
||||
|
||||
def test_x_src_within_domain():
|
||||
assert 0.0 <= config.X_SRC <= config.L
|
||||
|
||||
|
||||
def test_t_step_before_t_end():
|
||||
assert 0.0 < config.T_STEP < config.T_END
|
||||
|
||||
|
||||
def test_gauss_sigma_positive():
|
||||
assert config.GAUSS_SIGMA > 0.0
|
||||
|
||||
|
||||
def test_physics_positive():
|
||||
assert config.ALPHA > 0.0
|
||||
assert config.K > 0.0
|
||||
assert config.H_CONV > 0.0
|
||||
assert config.L > 0.0
|
||||
assert config.T_END > 0.0
|
||||
|
||||
|
||||
def test_training_hyperparameters_positive():
|
||||
assert config.PATIENCE > 0
|
||||
assert config.EPOCHS > 0
|
||||
assert config.LR_ADAM > 0.0
|
||||
assert config.SCHED_MIN_LR > 0.0
|
||||
assert config.SCHED_FACTOR > 0.0
|
||||
assert config.SCHED_PATIENCE > 0
|
||||
|
||||
|
||||
def test_lr_ordering():
|
||||
"""Min LR deve essere inferiore all'LR iniziale."""
|
||||
assert config.SCHED_MIN_LR < config.LR_ADAM
|
||||
|
||||
|
||||
def test_sched_patience_lt_patience():
|
||||
"""Lo scheduler deve poter agire prima che scatti l'early stopping."""
|
||||
assert config.SCHED_PATIENCE < config.PATIENCE
|
||||
|
||||
|
||||
def test_cfl_stability():
|
||||
"""La griglia FDM deve soddisfare la condizione CFL (r ≤ 0.5)."""
|
||||
dx = config.L / (config.NX - 1)
|
||||
dt = config.T_END / (config.NT - 1)
|
||||
r = config.ALPHA * dt / dx ** 2
|
||||
assert r <= 0.5, f"CFL violata: r={r:.4f} > 0.5"
|
||||
|
||||
|
||||
def test_grid_dimensions():
|
||||
assert config.NX >= 2
|
||||
assert config.NT >= 2
|
||||
assert config.N_F >= 1
|
||||
assert config.N_IC >= 1
|
||||
assert config.N_BC >= 1
|
||||
|
||||
|
||||
def test_pde_scale_covers_source_peak():
|
||||
"""_pde_scale in model.py deve coprire il picco gaussiano della sorgente."""
|
||||
from model import _pde_scale
|
||||
src_peak = config.ALPHA * config.Q_VAL / (
|
||||
config.K * config.GAUSS_SIGMA * math.sqrt(2 * math.pi)
|
||||
)
|
||||
assert _pde_scale >= src_peak ** 2 - 1e-6, (
|
||||
f"_pde_scale={_pde_scale:.1f} < src_peak²={src_peak**2:.1f}: "
|
||||
"la loss PDE non è normalizzata correttamente"
|
||||
)
|
||||
@@ -0,0 +1,87 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import pytest
|
||||
import numpy as np
|
||||
import torch
|
||||
import config
|
||||
|
||||
|
||||
# ── FDM ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_fdm_full_run():
|
||||
"""Il solver FDM produce un campo di temperatura fisicamente corretto."""
|
||||
from fdm.solver import solve
|
||||
T, x, t = solve()
|
||||
|
||||
assert T.shape == (config.NX, config.NT)
|
||||
assert np.isfinite(T).all()
|
||||
assert T[:, -1].mean() > config.T0 # la sorgente ha scaldato il dominio
|
||||
assert T.max() > config.T0 # picco sopra la IC
|
||||
assert T.min() >= config.T0 - 1e-6 # nessun raffreddamento sotto T0 (no sorgenti fredde)
|
||||
|
||||
|
||||
def test_fdm_visualizer_creates_html(tmp_path, monkeypatch):
|
||||
"""Il visualizer FDM scrive almeno un file HTML senza errori."""
|
||||
import fdm.visualizer as fdm_vis
|
||||
monkeypatch.setattr(fdm_vis, 'BASE_DIR', str(tmp_path))
|
||||
|
||||
from fdm.solver import solve
|
||||
T, x, t = solve()
|
||||
fdm_vis.visualize_fdm(T, x, t)
|
||||
|
||||
html_files = list(tmp_path.rglob('*.html'))
|
||||
assert len(html_files) >= 1, "Nessun file HTML generato dal visualizer FDM"
|
||||
|
||||
|
||||
def test_pinn_visualizer_creates_html(tmp_path, monkeypatch):
|
||||
"""Il visualizer PINN scrive i tre file HTML senza errori."""
|
||||
import visualizer as pinn_vis
|
||||
monkeypatch.setattr(pinn_vis, 'BASE_DIR', str(tmp_path))
|
||||
|
||||
from fdm.solver import solve as fdm_solve
|
||||
T_fdm, _, _ = fdm_solve()
|
||||
|
||||
nx, nt = 20, 20
|
||||
x_vals = np.linspace(0, config.L, nx)
|
||||
t_vals = np.linspace(0, config.T_END, nt)
|
||||
T_pred = np.full((nx, nt), config.T_AMB) # predizione costante (dummy)
|
||||
|
||||
pinn_vis.visualize_heat_field(T_pred, x_vals, t_vals, T_fdm)
|
||||
|
||||
html_files = list(tmp_path.rglob('*.html'))
|
||||
assert len(html_files) == 3, f"Attesi 3 HTML, trovati {len(html_files)}"
|
||||
|
||||
|
||||
# ── PINN training (lento) ─────────────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_pinn_training_saves_checkpoint(tmp_path, monkeypatch):
|
||||
"""Training per 30 epoche: il checkpoint viene salvato."""
|
||||
import engine
|
||||
save_path = str(tmp_path / 'model.pth')
|
||||
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
|
||||
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
|
||||
|
||||
from engine import prepare_data, train_model
|
||||
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
|
||||
train_model(data, epochs=30, patience=30)
|
||||
|
||||
assert os.path.exists(save_path)
|
||||
ckpt = torch.load(save_path, map_location='cpu', weights_only=True)
|
||||
assert 'state_dict' in ckpt
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_pinn_evaluate_after_training(tmp_path, monkeypatch):
|
||||
"""evaluate_model gira senza errori dopo un training minimo."""
|
||||
import engine
|
||||
save_path = str(tmp_path / 'model.pth')
|
||||
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
|
||||
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
|
||||
|
||||
from engine import prepare_data, train_model, evaluate_model
|
||||
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
|
||||
train_model(data, epochs=30, patience=30)
|
||||
evaluate_model(data)
|
||||
@@ -0,0 +1,67 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import config
|
||||
from engine import set_seed, _get_device, prepare_data
|
||||
|
||||
|
||||
def test_set_seed_reproducibility():
|
||||
set_seed(42)
|
||||
r1 = torch.rand(10)
|
||||
set_seed(42)
|
||||
r2 = torch.rand(10)
|
||||
torch.testing.assert_close(r1, r2)
|
||||
|
||||
|
||||
def test_get_device_valid():
|
||||
device = _get_device()
|
||||
assert isinstance(device, torch.device)
|
||||
assert device.type in ('cpu', 'cuda', 'mps')
|
||||
|
||||
|
||||
def test_prepare_data_keys():
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
assert set(data.keys()) == {'device', 'x_f', 't_f', 'x_ic', 't_bc'}
|
||||
|
||||
|
||||
def test_prepare_data_shapes():
|
||||
N_f, N_ic, N_bc = 100, 50, 50
|
||||
data = prepare_data(N_f=N_f, N_ic=N_ic, N_bc=N_bc)
|
||||
# engine.py aggiunge 2 * (N_f // 4) punti di clustering
|
||||
expected_f = N_f + 2 * (N_f // 4)
|
||||
assert data['x_f'].shape == (expected_f,)
|
||||
assert data['t_f'].shape == (expected_f,)
|
||||
assert data['x_ic'].shape == (N_ic,)
|
||||
assert data['t_bc'].shape == (N_bc,)
|
||||
|
||||
|
||||
def test_prepare_data_x_bounds():
|
||||
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
|
||||
assert data['x_f'].min().item() >= 0.0
|
||||
assert data['x_f'].max().item() <= config.L
|
||||
assert data['x_ic'].min().item() >= 0.0
|
||||
assert data['x_ic'].max().item() <= config.L
|
||||
|
||||
|
||||
def test_prepare_data_t_bounds():
|
||||
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
|
||||
assert data['t_f'].min().item() >= 0.0
|
||||
assert data['t_f'].max().item() <= config.T_END
|
||||
|
||||
|
||||
def test_prepare_data_device_consistency():
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
expected = data['device'].type
|
||||
for key in ('x_f', 't_f', 'x_ic', 't_bc'):
|
||||
assert data[key].device.type == expected, f"{key} sul device sbagliato"
|
||||
|
||||
|
||||
def test_prepare_data_deterministic():
|
||||
"""Due chiamate con lo stesso seed (fissato in prepare_data) producono dati identici."""
|
||||
d1 = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
d2 = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
torch.testing.assert_close(d1['x_f'], d2['x_f'])
|
||||
torch.testing.assert_close(d1['t_f'], d2['t_f'])
|
||||
torch.testing.assert_close(d1['x_ic'], d2['x_ic'])
|
||||
@@ -0,0 +1,65 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
from engine import prepare_data
|
||||
from model import HeatPINN, heat_pinn_loss
|
||||
|
||||
|
||||
def test_data_to_model_forward():
|
||||
"""prepare_data → forward: shape e device coerenti, nessun NaN."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
xt = torch.stack([data['x_f'], data['t_f']], dim=1)
|
||||
out = model(xt)
|
||||
assert out.shape == (data['x_f'].shape[0], 1)
|
||||
assert out.device.type == data['device'].type
|
||||
assert torch.isfinite(out).all()
|
||||
|
||||
|
||||
def test_full_loss_pipeline():
|
||||
"""prepare_data → heat_pinn_loss: tutti i componenti finiti e non-negativi."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
total, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
for name, v in [('total', total), ('L_pde', L_pde), ('L_ic', L_ic), ('L_bc', L_bc)]:
|
||||
assert torch.isfinite(v), f"{name} non è finita"
|
||||
assert v.item() >= 0.0, f"{name} è negativa"
|
||||
|
||||
|
||||
def test_backward_gradients_finite():
|
||||
"""Il backward della loss non produce NaN/Inf nei parametri."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
|
||||
optimizer.zero_grad()
|
||||
total, _, _, _ = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
total.backward()
|
||||
for p in model.parameters():
|
||||
assert p.grad is not None
|
||||
assert torch.isfinite(p.grad).all(), "gradiente NaN/Inf"
|
||||
|
||||
|
||||
def test_training_loop_stable():
|
||||
"""20 step di Adam non producono NaN/Inf nei parametri né nella loss."""
|
||||
data = prepare_data(N_f=200, N_ic=100, N_bc=100)
|
||||
model = HeatPINN().to(data['device'])
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
|
||||
args = (model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'])
|
||||
|
||||
for _ in range(20):
|
||||
optimizer.zero_grad()
|
||||
loss, _, _, _ = heat_pinn_loss(*args)
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
for p in model.parameters():
|
||||
assert torch.isfinite(p).all(), "parametro NaN/Inf dopo training"
|
||||
|
||||
total, _, _, _ = heat_pinn_loss(*args)
|
||||
assert torch.isfinite(total)
|
||||
@@ -0,0 +1,102 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import config
|
||||
from model import HeatPINN, heat_pinn_loss
|
||||
|
||||
|
||||
# ── HeatPINN.forward ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_forward_output_shape(pinn_model, device):
|
||||
xt = torch.zeros(64, 2, device=device)
|
||||
xt[:, 0] = torch.rand(64) * config.L
|
||||
xt[:, 1] = torch.rand(64) * config.T_END
|
||||
assert pinn_model(xt).shape == (64, 1)
|
||||
|
||||
|
||||
def test_forward_finite(pinn_model, device):
|
||||
xt = torch.zeros(100, 2, device=device)
|
||||
xt[:, 0] = torch.rand(100) * config.L
|
||||
xt[:, 1] = torch.rand(100) * config.T_END
|
||||
assert torch.isfinite(pinn_model(xt)).all()
|
||||
|
||||
|
||||
def test_forward_zero_weights_returns_t_amb(device):
|
||||
"""Con pesi nulli net(x,t)=0 ⇒ forward restituisce T_AMB per ogni input."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
xt = torch.zeros(20, 2, device=device)
|
||||
xt[:, 0] = torch.rand(20) * config.L
|
||||
xt[:, 1] = torch.rand(20) * config.T_END
|
||||
out = model(xt)
|
||||
torch.testing.assert_close(out, torch.full_like(out, config.T_AMB), atol=1e-5, rtol=0.0)
|
||||
|
||||
|
||||
def test_forward_t_normalization(device):
|
||||
"""t viene normalizzato a [0,1]: il modello deve restituire output finiti
|
||||
anche a t=T_END senza saturazione di Tanh."""
|
||||
model = HeatPINN().to(device)
|
||||
torch.nn.init.xavier_uniform_(model.net[0].weight)
|
||||
xt = torch.tensor([[0.5, 0.0], [0.5, config.T_END]], device=device)
|
||||
out = model(xt)
|
||||
assert out.shape == (2, 1)
|
||||
assert torch.isfinite(out).all()
|
||||
|
||||
|
||||
# ── heat_pinn_loss ────────────────────────────────────────────────────────────
|
||||
|
||||
def _dummy_inputs(device, n_f=100, n_ic=50, n_bc=50):
|
||||
x_f = torch.rand(n_f, device=device) * config.L
|
||||
t_f = torch.rand(n_f, device=device) * config.T_END
|
||||
x_ic = torch.rand(n_ic, device=device) * config.L
|
||||
t_bc = torch.rand(n_bc, device=device) * config.T_END
|
||||
return x_f, t_f, x_ic, t_bc
|
||||
|
||||
|
||||
def test_loss_returns_four_values(pinn_model, device):
|
||||
result = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
|
||||
assert len(result) == 4
|
||||
|
||||
|
||||
def test_loss_components_non_negative(pinn_model, device):
|
||||
total, L_pde, L_ic, L_bc = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
|
||||
assert total.item() >= 0.0
|
||||
assert L_pde.item() >= 0.0
|
||||
assert L_ic.item() >= 0.0
|
||||
assert L_bc.item() >= 0.0
|
||||
|
||||
|
||||
def test_loss_finite(pinn_model, device):
|
||||
for v in heat_pinn_loss(pinn_model, *_dummy_inputs(device)):
|
||||
assert torch.isfinite(v), f"loss non finita: {v}"
|
||||
|
||||
|
||||
def test_loss_weight_doubles_pde_contribution(pinn_model, device):
|
||||
"""Raddoppiare w_pde con w_ic=w_bc=0 deve raddoppiare il totale."""
|
||||
inputs = _dummy_inputs(device)
|
||||
total1, L_pde1, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=1.0, w_ic=0.0, w_bc=0.0)
|
||||
total2, L_pde2, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=2.0, w_ic=0.0, w_bc=0.0)
|
||||
# L_pde deve essere identico tra le due chiamate (stesso modello, stessi dati)
|
||||
torch.testing.assert_close(L_pde1, L_pde2, atol=1e-5, rtol=1e-4)
|
||||
torch.testing.assert_close(total2, 2.0 * total1, atol=1e-5, rtol=1e-4)
|
||||
|
||||
|
||||
def test_ic_loss_zero_when_net_is_zero(device):
|
||||
"""Con net=0 ⇒ T = T_AMB = T0 ⇒ L_ic = 0."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
_, _, L_ic, _ = heat_pinn_loss(model, *_dummy_inputs(device))
|
||||
assert L_ic.item() < 1e-8
|
||||
|
||||
|
||||
def test_bc_loss_zero_when_net_is_zero(device):
|
||||
"""Con net=0 ⇒ T = T_AMB e dT/dx = 0 ⇒ Robin BC soddisfatta ⇒ L_bc = 0."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
_, _, _, L_bc = heat_pinn_loss(model, *_dummy_inputs(device))
|
||||
assert L_bc.item() < 1e-8
|
||||
+14
-19
@@ -1,20 +1,24 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
from plotly.subplots import make_subplots
|
||||
import config
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
ANIMATIONS_DIR = os.path.join(BASE_DIR, 'animations')
|
||||
|
||||
|
||||
def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
os.makedirs(ANIMATIONS_DIR, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
out_dir = os.path.join(BASE_DIR, 'results', 'pinn', timestamp)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
# Downsample T_fdm from shape (NX, NT) to (NX, len(t_vals))
|
||||
# Downsample T_fdm from shape (NX_fdm, NT_fdm) to match PINN grid
|
||||
nx_pred = len(x_vals)
|
||||
nt_pred = len(t_vals)
|
||||
x_indices = np.linspace(0, T_fdm.shape[0] - 1, nx_pred, dtype=int)
|
||||
t_indices = np.linspace(0, T_fdm.shape[1] - 1, nt_pred, dtype=int)
|
||||
T_fdm_ds = T_fdm[:, t_indices] # now shape (NX, nt_pred)
|
||||
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)]
|
||||
|
||||
# --- Static heatmap: PINN vs FDM ---
|
||||
fig_map = make_subplots(
|
||||
@@ -42,9 +46,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
fig_map.update_yaxes(title_text='t', row=1, col=1)
|
||||
fig_map.update_layout(title_text='Heat Equation PINN vs FDM', height=450)
|
||||
|
||||
map_path = _next_path('heatmap', '.html')
|
||||
map_path = os.path.join(out_dir, 'heatmap.html')
|
||||
fig_map.write_html(map_path)
|
||||
print(f"Heatmap saved → {map_path}")
|
||||
print(f"Heatmap saved → {map_path}")
|
||||
|
||||
# --- Animated profile T(x) evolving in time ---
|
||||
n_frames = len(t_vals)
|
||||
@@ -90,9 +94,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
frames=frames,
|
||||
)
|
||||
|
||||
anim_path = _next_path('heat_animation', '.html')
|
||||
anim_path = os.path.join(out_dir, 'animation.html')
|
||||
fig_anim.write_html(anim_path)
|
||||
print(f"Animation saved → {anim_path}")
|
||||
print(f"Animation saved → {anim_path}")
|
||||
|
||||
# --- Time-series comparison at fixed spatial points ---
|
||||
# Spatial indices for x=0, x=L/2, x=L
|
||||
@@ -139,15 +143,6 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
height=500,
|
||||
)
|
||||
|
||||
comparison_path = _next_path('comparison', '.html')
|
||||
comparison_path = os.path.join(out_dir, 'comparison.html')
|
||||
fig_ts.write_html(comparison_path)
|
||||
print(f"Time-series saved → {comparison_path}")
|
||||
|
||||
|
||||
def _next_path(prefix, ext):
|
||||
i = 1
|
||||
while True:
|
||||
path = os.path.join(ANIMATIONS_DIR, f'{prefix}_{i:03d}{ext}')
|
||||
if not os.path.exists(path):
|
||||
return path
|
||||
i += 1
|
||||
print(f"Comparison saved → {comparison_path}")
|
||||
|
||||
Reference in New Issue
Block a user