diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..98f88c8 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +testpaths = tests +addopts = -v --tb=short +markers = + slow: test lenti (training completo) — escludi con -m "not slow" diff --git a/requirements.txt b/requirements.txt index 687263c..e6f5b2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ torch>=2.0.0 +pytest>=7.0.0 pandas>=2.0.0 numpy>=1.24.0 scikit-learn>=1.3.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5593660 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,24 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pytest +import torch + + +@pytest.fixture(scope="session") +def device(): + from engine import _get_device + return _get_device() + + +@pytest.fixture +def small_data(): + from engine import prepare_data + return prepare_data(N_f=200, N_ic=50, N_bc=50) + + +@pytest.fixture +def pinn_model(device): + from model import HeatPINN + return HeatPINN().to(device) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..27d4634 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,73 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import math +import config + + +def test_x_src_within_domain(): + assert 0.0 <= config.X_SRC <= config.L + + +def test_t_step_before_t_end(): + assert 0.0 < config.T_STEP < config.T_END + + +def test_gauss_sigma_positive(): + assert config.GAUSS_SIGMA > 0.0 + + +def test_physics_positive(): + assert config.ALPHA > 0.0 + assert config.K > 0.0 + assert config.H_CONV > 0.0 + assert config.L > 0.0 + assert config.T_END > 0.0 + + +def test_training_hyperparameters_positive(): + assert config.PATIENCE > 0 + assert config.EPOCHS > 0 + assert config.LR_ADAM > 0.0 + assert config.SCHED_MIN_LR > 0.0 + assert config.SCHED_FACTOR > 0.0 + assert config.SCHED_PATIENCE > 0 + + +def test_lr_ordering(): + """Min LR deve essere inferiore all'LR iniziale.""" + assert config.SCHED_MIN_LR < config.LR_ADAM + + +def test_sched_patience_lt_patience(): + """Lo scheduler deve poter agire prima che scatti l'early stopping.""" + assert config.SCHED_PATIENCE < config.PATIENCE + + +def test_cfl_stability(): + """La griglia FDM deve soddisfare la condizione CFL (r ≤ 0.5).""" + dx = config.L / (config.NX - 1) + dt = config.T_END / (config.NT - 1) + r = config.ALPHA * dt / dx ** 2 + assert r <= 0.5, f"CFL violata: r={r:.4f} > 0.5" + + +def test_grid_dimensions(): + assert config.NX >= 2 + assert config.NT >= 2 + assert config.N_F >= 1 + assert config.N_IC >= 1 + assert config.N_BC >= 1 + + +def test_pde_scale_covers_source_peak(): + """_pde_scale in model.py deve coprire il picco gaussiano della sorgente.""" + from model import _pde_scale + src_peak = config.ALPHA * config.Q_VAL / ( + config.K * config.GAUSS_SIGMA * math.sqrt(2 * math.pi) + ) + assert _pde_scale >= src_peak ** 2 - 1e-6, ( + f"_pde_scale={_pde_scale:.1f} < src_peak²={src_peak**2:.1f}: " + "la loss PDE non è normalizzata correttamente" + ) diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..e086073 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,87 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pytest +import numpy as np +import torch +import config + + +# ── FDM ─────────────────────────────────────────────────────────────────────── + +def test_fdm_full_run(): + """Il solver FDM produce un campo di temperatura fisicamente corretto.""" + from fdm.solver import solve + T, x, t = solve() + + assert T.shape == (config.NX, config.NT) + assert np.isfinite(T).all() + assert T[:, -1].mean() > config.T0 # la sorgente ha scaldato il dominio + assert T.max() > config.T0 # picco sopra la IC + assert T.min() >= config.T0 - 1e-6 # nessun raffreddamento sotto T0 (no sorgenti fredde) + + +def test_fdm_visualizer_creates_html(tmp_path, monkeypatch): + """Il visualizer FDM scrive almeno un file HTML senza errori.""" + import fdm.visualizer as fdm_vis + monkeypatch.setattr(fdm_vis, 'BASE_DIR', str(tmp_path)) + + from fdm.solver import solve + T, x, t = solve() + fdm_vis.visualize_fdm(T, x, t) + + html_files = list(tmp_path.rglob('*.html')) + assert len(html_files) >= 1, "Nessun file HTML generato dal visualizer FDM" + + +def test_pinn_visualizer_creates_html(tmp_path, monkeypatch): + """Il visualizer PINN scrive i tre file HTML senza errori.""" + import visualizer as pinn_vis + monkeypatch.setattr(pinn_vis, 'BASE_DIR', str(tmp_path)) + + from fdm.solver import solve as fdm_solve + T_fdm, _, _ = fdm_solve() + + nx, nt = 20, 20 + x_vals = np.linspace(0, config.L, nx) + t_vals = np.linspace(0, config.T_END, nt) + T_pred = np.full((nx, nt), config.T_AMB) # predizione costante (dummy) + + pinn_vis.visualize_heat_field(T_pred, x_vals, t_vals, T_fdm) + + html_files = list(tmp_path.rglob('*.html')) + assert len(html_files) == 3, f"Attesi 3 HTML, trovati {len(html_files)}" + + +# ── PINN training (lento) ───────────────────────────────────────────────────── + +@pytest.mark.slow +def test_pinn_training_saves_checkpoint(tmp_path, monkeypatch): + """Training per 30 epoche: il checkpoint viene salvato.""" + import engine + save_path = str(tmp_path / 'model.pth') + monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path) + monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path)) + + from engine import prepare_data, train_model + data = prepare_data(N_f=300, N_ic=100, N_bc=100) + train_model(data, epochs=30, patience=30) + + assert os.path.exists(save_path) + ckpt = torch.load(save_path, map_location='cpu', weights_only=True) + assert 'state_dict' in ckpt + + +@pytest.mark.slow +def test_pinn_evaluate_after_training(tmp_path, monkeypatch): + """evaluate_model gira senza errori dopo un training minimo.""" + import engine + save_path = str(tmp_path / 'model.pth') + monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path) + monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path)) + + from engine import prepare_data, train_model, evaluate_model + data = prepare_data(N_f=300, N_ic=100, N_bc=100) + train_model(data, epochs=30, patience=30) + evaluate_model(data) diff --git a/tests/test_engine_data.py b/tests/test_engine_data.py new file mode 100644 index 0000000..5737297 --- /dev/null +++ b/tests/test_engine_data.py @@ -0,0 +1,67 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import torch +import config +from engine import set_seed, _get_device, prepare_data + + +def test_set_seed_reproducibility(): + set_seed(42) + r1 = torch.rand(10) + set_seed(42) + r2 = torch.rand(10) + torch.testing.assert_close(r1, r2) + + +def test_get_device_valid(): + device = _get_device() + assert isinstance(device, torch.device) + assert device.type in ('cpu', 'cuda', 'mps') + + +def test_prepare_data_keys(): + data = prepare_data(N_f=100, N_ic=50, N_bc=50) + assert set(data.keys()) == {'device', 'x_f', 't_f', 'x_ic', 't_bc'} + + +def test_prepare_data_shapes(): + N_f, N_ic, N_bc = 100, 50, 50 + data = prepare_data(N_f=N_f, N_ic=N_ic, N_bc=N_bc) + # engine.py aggiunge 2 * (N_f // 4) punti di clustering + expected_f = N_f + 2 * (N_f // 4) + assert data['x_f'].shape == (expected_f,) + assert data['t_f'].shape == (expected_f,) + assert data['x_ic'].shape == (N_ic,) + assert data['t_bc'].shape == (N_bc,) + + +def test_prepare_data_x_bounds(): + data = prepare_data(N_f=500, N_ic=100, N_bc=100) + assert data['x_f'].min().item() >= 0.0 + assert data['x_f'].max().item() <= config.L + assert data['x_ic'].min().item() >= 0.0 + assert data['x_ic'].max().item() <= config.L + + +def test_prepare_data_t_bounds(): + data = prepare_data(N_f=500, N_ic=100, N_bc=100) + assert data['t_f'].min().item() >= 0.0 + assert data['t_f'].max().item() <= config.T_END + + +def test_prepare_data_device_consistency(): + data = prepare_data(N_f=100, N_ic=50, N_bc=50) + expected = data['device'].type + for key in ('x_f', 't_f', 'x_ic', 't_bc'): + assert data[key].device.type == expected, f"{key} sul device sbagliato" + + +def test_prepare_data_deterministic(): + """Due chiamate con lo stesso seed (fissato in prepare_data) producono dati identici.""" + d1 = prepare_data(N_f=100, N_ic=50, N_bc=50) + d2 = prepare_data(N_f=100, N_ic=50, N_bc=50) + torch.testing.assert_close(d1['x_f'], d2['x_f']) + torch.testing.assert_close(d1['t_f'], d2['t_f']) + torch.testing.assert_close(d1['x_ic'], d2['x_ic']) diff --git a/tests/test_integration_pinn.py b/tests/test_integration_pinn.py new file mode 100644 index 0000000..ee9267e --- /dev/null +++ b/tests/test_integration_pinn.py @@ -0,0 +1,65 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import torch +from engine import prepare_data +from model import HeatPINN, heat_pinn_loss + + +def test_data_to_model_forward(): + """prepare_data → forward: shape e device coerenti, nessun NaN.""" + data = prepare_data(N_f=100, N_ic=50, N_bc=50) + model = HeatPINN().to(data['device']) + xt = torch.stack([data['x_f'], data['t_f']], dim=1) + out = model(xt) + assert out.shape == (data['x_f'].shape[0], 1) + assert out.device.type == data['device'].type + assert torch.isfinite(out).all() + + +def test_full_loss_pipeline(): + """prepare_data → heat_pinn_loss: tutti i componenti finiti e non-negativi.""" + data = prepare_data(N_f=100, N_ic=50, N_bc=50) + model = HeatPINN().to(data['device']) + total, L_pde, L_ic, L_bc = heat_pinn_loss( + model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'] + ) + for name, v in [('total', total), ('L_pde', L_pde), ('L_ic', L_ic), ('L_bc', L_bc)]: + assert torch.isfinite(v), f"{name} non è finita" + assert v.item() >= 0.0, f"{name} è negativa" + + +def test_backward_gradients_finite(): + """Il backward della loss non produce NaN/Inf nei parametri.""" + data = prepare_data(N_f=100, N_ic=50, N_bc=50) + model = HeatPINN().to(data['device']) + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + optimizer.zero_grad() + total, _, _, _ = heat_pinn_loss( + model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'] + ) + total.backward() + for p in model.parameters(): + assert p.grad is not None + assert torch.isfinite(p.grad).all(), "gradiente NaN/Inf" + + +def test_training_loop_stable(): + """20 step di Adam non producono NaN/Inf nei parametri né nella loss.""" + data = prepare_data(N_f=200, N_ic=100, N_bc=100) + model = HeatPINN().to(data['device']) + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + args = (model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']) + + for _ in range(20): + optimizer.zero_grad() + loss, _, _, _ = heat_pinn_loss(*args) + loss.backward() + optimizer.step() + + for p in model.parameters(): + assert torch.isfinite(p).all(), "parametro NaN/Inf dopo training" + + total, _, _, _ = heat_pinn_loss(*args) + assert torch.isfinite(total) diff --git a/tests/test_model.py b/tests/test_model.py new file mode 100644 index 0000000..0ca83e3 --- /dev/null +++ b/tests/test_model.py @@ -0,0 +1,102 @@ +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import torch +import config +from model import HeatPINN, heat_pinn_loss + + +# ── HeatPINN.forward ───────────────────────────────────────────────────────── + +def test_forward_output_shape(pinn_model, device): + xt = torch.zeros(64, 2, device=device) + xt[:, 0] = torch.rand(64) * config.L + xt[:, 1] = torch.rand(64) * config.T_END + assert pinn_model(xt).shape == (64, 1) + + +def test_forward_finite(pinn_model, device): + xt = torch.zeros(100, 2, device=device) + xt[:, 0] = torch.rand(100) * config.L + xt[:, 1] = torch.rand(100) * config.T_END + assert torch.isfinite(pinn_model(xt)).all() + + +def test_forward_zero_weights_returns_t_amb(device): + """Con pesi nulli net(x,t)=0 ⇒ forward restituisce T_AMB per ogni input.""" + model = HeatPINN().to(device) + for p in model.parameters(): + p.data.zero_() + xt = torch.zeros(20, 2, device=device) + xt[:, 0] = torch.rand(20) * config.L + xt[:, 1] = torch.rand(20) * config.T_END + out = model(xt) + torch.testing.assert_close(out, torch.full_like(out, config.T_AMB), atol=1e-5, rtol=0.0) + + +def test_forward_t_normalization(device): + """t viene normalizzato a [0,1]: il modello deve restituire output finiti + anche a t=T_END senza saturazione di Tanh.""" + model = HeatPINN().to(device) + torch.nn.init.xavier_uniform_(model.net[0].weight) + xt = torch.tensor([[0.5, 0.0], [0.5, config.T_END]], device=device) + out = model(xt) + assert out.shape == (2, 1) + assert torch.isfinite(out).all() + + +# ── heat_pinn_loss ──────────────────────────────────────────────────────────── + +def _dummy_inputs(device, n_f=100, n_ic=50, n_bc=50): + x_f = torch.rand(n_f, device=device) * config.L + t_f = torch.rand(n_f, device=device) * config.T_END + x_ic = torch.rand(n_ic, device=device) * config.L + t_bc = torch.rand(n_bc, device=device) * config.T_END + return x_f, t_f, x_ic, t_bc + + +def test_loss_returns_four_values(pinn_model, device): + result = heat_pinn_loss(pinn_model, *_dummy_inputs(device)) + assert len(result) == 4 + + +def test_loss_components_non_negative(pinn_model, device): + total, L_pde, L_ic, L_bc = heat_pinn_loss(pinn_model, *_dummy_inputs(device)) + assert total.item() >= 0.0 + assert L_pde.item() >= 0.0 + assert L_ic.item() >= 0.0 + assert L_bc.item() >= 0.0 + + +def test_loss_finite(pinn_model, device): + for v in heat_pinn_loss(pinn_model, *_dummy_inputs(device)): + assert torch.isfinite(v), f"loss non finita: {v}" + + +def test_loss_weight_doubles_pde_contribution(pinn_model, device): + """Raddoppiare w_pde con w_ic=w_bc=0 deve raddoppiare il totale.""" + inputs = _dummy_inputs(device) + total1, L_pde1, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=1.0, w_ic=0.0, w_bc=0.0) + total2, L_pde2, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=2.0, w_ic=0.0, w_bc=0.0) + # L_pde deve essere identico tra le due chiamate (stesso modello, stessi dati) + torch.testing.assert_close(L_pde1, L_pde2, atol=1e-5, rtol=1e-4) + torch.testing.assert_close(total2, 2.0 * total1, atol=1e-5, rtol=1e-4) + + +def test_ic_loss_zero_when_net_is_zero(device): + """Con net=0 ⇒ T = T_AMB = T0 ⇒ L_ic = 0.""" + model = HeatPINN().to(device) + for p in model.parameters(): + p.data.zero_() + _, _, L_ic, _ = heat_pinn_loss(model, *_dummy_inputs(device)) + assert L_ic.item() < 1e-8 + + +def test_bc_loss_zero_when_net_is_zero(device): + """Con net=0 ⇒ T = T_AMB e dT/dx = 0 ⇒ Robin BC soddisfatta ⇒ L_bc = 0.""" + model = HeatPINN().to(device) + for p in model.parameters(): + p.data.zero_() + _, _, _, L_bc = heat_pinn_loss(model, *_dummy_inputs(device)) + assert L_bc.item() < 1e-8