test: aggiunge suite completa — unit, integration ed e2e (42 test)

- pytest.ini: configura testpaths, marker slow, output verboso
- tests/conftest.py: fixture condivise (device, small_data, pinn_model)
- tests/test_config.py: sanità parametri fisici e numerici, CFL, _pde_scale
- tests/test_model.py: HeatPINN.forward e heat_pinn_loss (shape, finiti,
  zero-weight analytici per IC e BC, scaling dei pesi)
- tests/test_engine_data.py: set_seed, _get_device, prepare_data
  (shape, bounds, device consistency, determinismo)
- tests/test_integration_pinn.py: pipeline dati→modello→loss→backward
- tests/test_e2e.py: FDM completo, visualizer FDM/PINN con tmp_path,
  training breve (2 test @slow)
- requirements.txt: aggiunge pytest>=7.0.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-14 15:50:54 +02:00
parent 1237a57290
commit b8301a4329
8 changed files with 424 additions and 0 deletions
+5
View File
@@ -0,0 +1,5 @@
[pytest]
testpaths = tests
addopts = -v --tb=short
markers =
slow: test lenti (training completo) — escludi con -m "not slow"
+1
View File
@@ -1,4 +1,5 @@
torch>=2.0.0 torch>=2.0.0
pytest>=7.0.0
pandas>=2.0.0 pandas>=2.0.0
numpy>=1.24.0 numpy>=1.24.0
scikit-learn>=1.3.0 scikit-learn>=1.3.0
+24
View File
@@ -0,0 +1,24 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
import torch
@pytest.fixture(scope="session")
def device():
from engine import _get_device
return _get_device()
@pytest.fixture
def small_data():
from engine import prepare_data
return prepare_data(N_f=200, N_ic=50, N_bc=50)
@pytest.fixture
def pinn_model(device):
from model import HeatPINN
return HeatPINN().to(device)
+73
View File
@@ -0,0 +1,73 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import math
import config
def test_x_src_within_domain():
assert 0.0 <= config.X_SRC <= config.L
def test_t_step_before_t_end():
assert 0.0 < config.T_STEP < config.T_END
def test_gauss_sigma_positive():
assert config.GAUSS_SIGMA > 0.0
def test_physics_positive():
assert config.ALPHA > 0.0
assert config.K > 0.0
assert config.H_CONV > 0.0
assert config.L > 0.0
assert config.T_END > 0.0
def test_training_hyperparameters_positive():
assert config.PATIENCE > 0
assert config.EPOCHS > 0
assert config.LR_ADAM > 0.0
assert config.SCHED_MIN_LR > 0.0
assert config.SCHED_FACTOR > 0.0
assert config.SCHED_PATIENCE > 0
def test_lr_ordering():
"""Min LR deve essere inferiore all'LR iniziale."""
assert config.SCHED_MIN_LR < config.LR_ADAM
def test_sched_patience_lt_patience():
"""Lo scheduler deve poter agire prima che scatti l'early stopping."""
assert config.SCHED_PATIENCE < config.PATIENCE
def test_cfl_stability():
"""La griglia FDM deve soddisfare la condizione CFL (r ≤ 0.5)."""
dx = config.L / (config.NX - 1)
dt = config.T_END / (config.NT - 1)
r = config.ALPHA * dt / dx ** 2
assert r <= 0.5, f"CFL violata: r={r:.4f} > 0.5"
def test_grid_dimensions():
assert config.NX >= 2
assert config.NT >= 2
assert config.N_F >= 1
assert config.N_IC >= 1
assert config.N_BC >= 1
def test_pde_scale_covers_source_peak():
"""_pde_scale in model.py deve coprire il picco gaussiano della sorgente."""
from model import _pde_scale
src_peak = config.ALPHA * config.Q_VAL / (
config.K * config.GAUSS_SIGMA * math.sqrt(2 * math.pi)
)
assert _pde_scale >= src_peak ** 2 - 1e-6, (
f"_pde_scale={_pde_scale:.1f} < src_peak²={src_peak**2:.1f}: "
"la loss PDE non è normalizzata correttamente"
)
+87
View File
@@ -0,0 +1,87 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
import numpy as np
import torch
import config
# ── FDM ───────────────────────────────────────────────────────────────────────
def test_fdm_full_run():
"""Il solver FDM produce un campo di temperatura fisicamente corretto."""
from fdm.solver import solve
T, x, t = solve()
assert T.shape == (config.NX, config.NT)
assert np.isfinite(T).all()
assert T[:, -1].mean() > config.T0 # la sorgente ha scaldato il dominio
assert T.max() > config.T0 # picco sopra la IC
assert T.min() >= config.T0 - 1e-6 # nessun raffreddamento sotto T0 (no sorgenti fredde)
def test_fdm_visualizer_creates_html(tmp_path, monkeypatch):
"""Il visualizer FDM scrive almeno un file HTML senza errori."""
import fdm.visualizer as fdm_vis
monkeypatch.setattr(fdm_vis, 'BASE_DIR', str(tmp_path))
from fdm.solver import solve
T, x, t = solve()
fdm_vis.visualize_fdm(T, x, t)
html_files = list(tmp_path.rglob('*.html'))
assert len(html_files) >= 1, "Nessun file HTML generato dal visualizer FDM"
def test_pinn_visualizer_creates_html(tmp_path, monkeypatch):
"""Il visualizer PINN scrive i tre file HTML senza errori."""
import visualizer as pinn_vis
monkeypatch.setattr(pinn_vis, 'BASE_DIR', str(tmp_path))
from fdm.solver import solve as fdm_solve
T_fdm, _, _ = fdm_solve()
nx, nt = 20, 20
x_vals = np.linspace(0, config.L, nx)
t_vals = np.linspace(0, config.T_END, nt)
T_pred = np.full((nx, nt), config.T_AMB) # predizione costante (dummy)
pinn_vis.visualize_heat_field(T_pred, x_vals, t_vals, T_fdm)
html_files = list(tmp_path.rglob('*.html'))
assert len(html_files) == 3, f"Attesi 3 HTML, trovati {len(html_files)}"
# ── PINN training (lento) ─────────────────────────────────────────────────────
@pytest.mark.slow
def test_pinn_training_saves_checkpoint(tmp_path, monkeypatch):
"""Training per 30 epoche: il checkpoint viene salvato."""
import engine
save_path = str(tmp_path / 'model.pth')
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
from engine import prepare_data, train_model
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
train_model(data, epochs=30, patience=30)
assert os.path.exists(save_path)
ckpt = torch.load(save_path, map_location='cpu', weights_only=True)
assert 'state_dict' in ckpt
@pytest.mark.slow
def test_pinn_evaluate_after_training(tmp_path, monkeypatch):
"""evaluate_model gira senza errori dopo un training minimo."""
import engine
save_path = str(tmp_path / 'model.pth')
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
from engine import prepare_data, train_model, evaluate_model
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
train_model(data, epochs=30, patience=30)
evaluate_model(data)
+67
View File
@@ -0,0 +1,67 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import config
from engine import set_seed, _get_device, prepare_data
def test_set_seed_reproducibility():
set_seed(42)
r1 = torch.rand(10)
set_seed(42)
r2 = torch.rand(10)
torch.testing.assert_close(r1, r2)
def test_get_device_valid():
device = _get_device()
assert isinstance(device, torch.device)
assert device.type in ('cpu', 'cuda', 'mps')
def test_prepare_data_keys():
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
assert set(data.keys()) == {'device', 'x_f', 't_f', 'x_ic', 't_bc'}
def test_prepare_data_shapes():
N_f, N_ic, N_bc = 100, 50, 50
data = prepare_data(N_f=N_f, N_ic=N_ic, N_bc=N_bc)
# engine.py aggiunge 2 * (N_f // 4) punti di clustering
expected_f = N_f + 2 * (N_f // 4)
assert data['x_f'].shape == (expected_f,)
assert data['t_f'].shape == (expected_f,)
assert data['x_ic'].shape == (N_ic,)
assert data['t_bc'].shape == (N_bc,)
def test_prepare_data_x_bounds():
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
assert data['x_f'].min().item() >= 0.0
assert data['x_f'].max().item() <= config.L
assert data['x_ic'].min().item() >= 0.0
assert data['x_ic'].max().item() <= config.L
def test_prepare_data_t_bounds():
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
assert data['t_f'].min().item() >= 0.0
assert data['t_f'].max().item() <= config.T_END
def test_prepare_data_device_consistency():
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
expected = data['device'].type
for key in ('x_f', 't_f', 'x_ic', 't_bc'):
assert data[key].device.type == expected, f"{key} sul device sbagliato"
def test_prepare_data_deterministic():
"""Due chiamate con lo stesso seed (fissato in prepare_data) producono dati identici."""
d1 = prepare_data(N_f=100, N_ic=50, N_bc=50)
d2 = prepare_data(N_f=100, N_ic=50, N_bc=50)
torch.testing.assert_close(d1['x_f'], d2['x_f'])
torch.testing.assert_close(d1['t_f'], d2['t_f'])
torch.testing.assert_close(d1['x_ic'], d2['x_ic'])
+65
View File
@@ -0,0 +1,65 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
from engine import prepare_data
from model import HeatPINN, heat_pinn_loss
def test_data_to_model_forward():
"""prepare_data → forward: shape e device coerenti, nessun NaN."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
xt = torch.stack([data['x_f'], data['t_f']], dim=1)
out = model(xt)
assert out.shape == (data['x_f'].shape[0], 1)
assert out.device.type == data['device'].type
assert torch.isfinite(out).all()
def test_full_loss_pipeline():
"""prepare_data → heat_pinn_loss: tutti i componenti finiti e non-negativi."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
total, L_pde, L_ic, L_bc = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
for name, v in [('total', total), ('L_pde', L_pde), ('L_ic', L_ic), ('L_bc', L_bc)]:
assert torch.isfinite(v), f"{name} non è finita"
assert v.item() >= 0.0, f"{name} è negativa"
def test_backward_gradients_finite():
"""Il backward della loss non produce NaN/Inf nei parametri."""
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
model = HeatPINN().to(data['device'])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer.zero_grad()
total, _, _, _ = heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
)
total.backward()
for p in model.parameters():
assert p.grad is not None
assert torch.isfinite(p.grad).all(), "gradiente NaN/Inf"
def test_training_loop_stable():
"""20 step di Adam non producono NaN/Inf nei parametri né nella loss."""
data = prepare_data(N_f=200, N_ic=100, N_bc=100)
model = HeatPINN().to(data['device'])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
args = (model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'])
for _ in range(20):
optimizer.zero_grad()
loss, _, _, _ = heat_pinn_loss(*args)
loss.backward()
optimizer.step()
for p in model.parameters():
assert torch.isfinite(p).all(), "parametro NaN/Inf dopo training"
total, _, _, _ = heat_pinn_loss(*args)
assert torch.isfinite(total)
+102
View File
@@ -0,0 +1,102 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import config
from model import HeatPINN, heat_pinn_loss
# ── HeatPINN.forward ─────────────────────────────────────────────────────────
def test_forward_output_shape(pinn_model, device):
xt = torch.zeros(64, 2, device=device)
xt[:, 0] = torch.rand(64) * config.L
xt[:, 1] = torch.rand(64) * config.T_END
assert pinn_model(xt).shape == (64, 1)
def test_forward_finite(pinn_model, device):
xt = torch.zeros(100, 2, device=device)
xt[:, 0] = torch.rand(100) * config.L
xt[:, 1] = torch.rand(100) * config.T_END
assert torch.isfinite(pinn_model(xt)).all()
def test_forward_zero_weights_returns_t_amb(device):
"""Con pesi nulli net(x,t)=0 ⇒ forward restituisce T_AMB per ogni input."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
xt = torch.zeros(20, 2, device=device)
xt[:, 0] = torch.rand(20) * config.L
xt[:, 1] = torch.rand(20) * config.T_END
out = model(xt)
torch.testing.assert_close(out, torch.full_like(out, config.T_AMB), atol=1e-5, rtol=0.0)
def test_forward_t_normalization(device):
"""t viene normalizzato a [0,1]: il modello deve restituire output finiti
anche a t=T_END senza saturazione di Tanh."""
model = HeatPINN().to(device)
torch.nn.init.xavier_uniform_(model.net[0].weight)
xt = torch.tensor([[0.5, 0.0], [0.5, config.T_END]], device=device)
out = model(xt)
assert out.shape == (2, 1)
assert torch.isfinite(out).all()
# ── heat_pinn_loss ────────────────────────────────────────────────────────────
def _dummy_inputs(device, n_f=100, n_ic=50, n_bc=50):
x_f = torch.rand(n_f, device=device) * config.L
t_f = torch.rand(n_f, device=device) * config.T_END
x_ic = torch.rand(n_ic, device=device) * config.L
t_bc = torch.rand(n_bc, device=device) * config.T_END
return x_f, t_f, x_ic, t_bc
def test_loss_returns_four_values(pinn_model, device):
result = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
assert len(result) == 4
def test_loss_components_non_negative(pinn_model, device):
total, L_pde, L_ic, L_bc = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
assert total.item() >= 0.0
assert L_pde.item() >= 0.0
assert L_ic.item() >= 0.0
assert L_bc.item() >= 0.0
def test_loss_finite(pinn_model, device):
for v in heat_pinn_loss(pinn_model, *_dummy_inputs(device)):
assert torch.isfinite(v), f"loss non finita: {v}"
def test_loss_weight_doubles_pde_contribution(pinn_model, device):
"""Raddoppiare w_pde con w_ic=w_bc=0 deve raddoppiare il totale."""
inputs = _dummy_inputs(device)
total1, L_pde1, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=1.0, w_ic=0.0, w_bc=0.0)
total2, L_pde2, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=2.0, w_ic=0.0, w_bc=0.0)
# L_pde deve essere identico tra le due chiamate (stesso modello, stessi dati)
torch.testing.assert_close(L_pde1, L_pde2, atol=1e-5, rtol=1e-4)
torch.testing.assert_close(total2, 2.0 * total1, atol=1e-5, rtol=1e-4)
def test_ic_loss_zero_when_net_is_zero(device):
"""Con net=0 ⇒ T = T_AMB = T0 ⇒ L_ic = 0."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
_, _, L_ic, _ = heat_pinn_loss(model, *_dummy_inputs(device))
assert L_ic.item() < 1e-8
def test_bc_loss_zero_when_net_is_zero(device):
"""Con net=0 ⇒ T = T_AMB e dT/dx = 0 ⇒ Robin BC soddisfatta ⇒ L_bc = 0."""
model = HeatPINN().to(device)
for p in model.parameters():
p.data.zero_()
_, _, _, L_bc = heat_pinn_loss(model, *_dummy_inputs(device))
assert L_bc.item() < 1e-8