4 Commits

Author SHA1 Message Date
davide 295057e80b feat: implementa PINN inversa per identificazione parametrica (α, k, h_conv)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 16:23:27 +02:00
davide b65051057a feat: aggiunge script campionamento sensori da soluzione FDM
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 15:28:47 +02:00
davide c928b98de2 feat: aggiunge salvataggio CSV soluzione FDM in results/fdm/
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 15:26:59 +02:00
davide 213cd6fba5 docs: aggiorna CLAUDE.md con piano implementazione PINN inversa e architettura semplificata
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 15:26:20 +02:00
12 changed files with 844 additions and 74 deletions
+75 -72
View File
@@ -6,97 +6,100 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
Write all git commit messages in Italian. Write all git commit messages in Italian.
## Scope of Work ## Commands
Lavora su tutto il repository: `fdm/`, `model.py`, `engine.py`, `visualizer.py`, `app.py`, `config.py`. Aiuta con migliorie, bugfix e ottimizzazioni su qualsiasi file del progetto.
## Project Overview
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with an internal point heat source:
```
∂T/∂t = α ∂²T/∂x² + (α/k) Q(t) δ(x X_SRC) x ∈ [0, L], t ∈ [0, T_END]
```
where `Q(t) = Q_VAL` if `t ≥ T_STEP` else `0`.
- **IC:** `T(x, 0) = T0` (uniform)
- **BC x=0:** Robin — convection: `k ∂T/∂x = h (T T_AMB)`
- **BC x=L:** Robin — convection: `k ∂T/∂x = h (T T_AMB)`
No experimental data is needed. A `fdm/` module provides a reference numerical solution (FTCS explicit scheme) used for evaluation and visualization comparison.
All physical and numerical parameters live in `config.py`.
## Running
Always activate the virtual environment first:
```bash ```bash
source .venv/bin/activate source .venv/bin/activate # always first
python app.py # forward PINN: train / evaluate / visualize
python fdm/app.py # FDM reference solver menu
python -m inverse.app # inverse PINN menu (to be implemented)
pytest # all tests
pytest -m "not slow" # skip full-training tests
pytest tests/test_model.py # single file
``` ```
**PINN:** Delete `models/best_heat_pinn_model.pth` to retrain from scratch.
```bash
python app.py # Train / Evaluate (L2 vs FDM) / Visualize
```
**FDM reference solver:**
```bash
python fdm/app.py # Solve / Heatmap / Animation / Time-series
```
Saved artifacts (git-ignored): `models/best_heat_pinn_model.pth`, HTML plots in `animations/` and `animations/fdm/`.
To retrain from scratch: `rm models/best_heat_pinn_model.pth` before running option 1.
## Dependencies
`requirements.txt` exists. Key packages: `torch`, `numpy`, `plotly`. No `pandas` or `scikit-learn` needed.
GPU is auto-detected (`cuda``mps``cpu`) in `engine.py:_get_device()`.
## Architecture ## Architecture
``` ```
config.py ← all physical + numerical parameters (edit here to change the problem) config.py ← all physical + numerical parameters
app.py ← PINN CLI menu model.pyHeatPINN (5-layer FC) + heat_pinn_loss()
model.py ← HeatPINN + heat_pinn_loss() engine.py ← prepare_data(), train_model(), evaluate_model()
engine.py ← data sampling, Adam+L-BFGS training, evaluation vs FDM, visualization call app.py ← forward PINN CLI
visualizer.py ← PINN vs FDM: heatmap, animated T(x), time-series at fixed points visualizer.py ← PINN vs FDM plots (Plotly HTML)
fdm/ fdm/solver.py ← FTCS explicit scheme, returns T_matrix[NX, NT]
solver.py ← FTCS explicit scheme, Robin on both ends, point source at X_SRC inverse/ ← inverse PINN (to implement — see plan below)
visualizer.py ← same 3 plot types for FDM-only output tests/ ← pytest suite (42 tests); conftest.py has device, small_data, pinn_model fixtures
app.py ← FDM CLI menu
``` ```
### Neural Network (`model.py`) ## Key design decisions
`HeatPINN`: 5-layer fully connected, input `(x, t)` → output `T`. **Output scaling** (`model.py:forward`):
**Output scaling** — the network predicts a dimensionless perturbation; the `forward()` applies:
``` ```
T = T_AMB + (Q_VAL · L / K) · net(x, t) T = T_AMB + (Q_VAL · L / K) · net(x_norm, t_norm)
``` ```
This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are O(1) for the network to learn. Do not remove this scaling. This keeps net outputs in [0,1] and ∂T/∂x at O(1). Do not remove.
`heat_pinn_loss()` normalizes all four loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. The PDE residual includes the Gaussian-smoothed source term (σ=0.02) as a continuous approximation to δ(x X_SRC). Changing physical parameters in `config.py` does not require re-tuning loss weights. **Loss normalization** (`model.py:heat_pinn_loss`): all four terms are scaled to O(1) via `_T_char = Q_VAL·L/K` and `_bc_scale`. Changing physical params in `config.py` does not require retuning weights.
### Training (`engine.py`) **Collocation clustering** (`engine.py:prepare_data`): 25% extra points near `X_SRC` (source gradient) and `T_STEP` (flux discontinuity). First lever to pull if accuracy is poor: increase `N_F`.
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=X_SRC` (steep gradient at source) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low. **Training sequence**: Adam (early stopping + ReduceLROnPlateau) → L-BFGS fine-tuning. L-BFGS uses a `_last` closure dict to capture loss components without double-calling the loss outside a grad context.
`train_model()` runs **Adam first, then L-BFGS fine-tuning**. L-BFGS uses a closure that captures loss components in `_last` dict (avoids calling `heat_pinn_loss` outside an active grad context). **FDM Robin BCs** (`fdm/solver.py`): implicit-like update `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`. Point source added after BCs: `T[i_src] += Q·α·dt/(k·dx)`.
`evaluate_model()` runs the FDM solver and downsamples its `(NX, NT)` output to the PINN prediction grid `(100, 100)` for L2 comparison. ---
### FDM Solver (`fdm/solver.py`) ## Inverse PINN — implementation plan
Returns `(T_matrix[NX, NT], x_vals, t_vals)`. Uses: Goal: identify unknown physical parameters (`ALPHA`, `K`, `H_CONV`) from sparse noisy temperature measurements. The network learns T(x,t) and the physics parameters simultaneously.
- Robin BC on both ends: `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`
- Point source injected at node `i_src = argmin|x - X_SRC|` after BCs: `T[i_src] += Q·α·dt/(k·dx)`
- CFL check at startup (warns, does not crash)
### Loss Scaling Notes ### Files to create (in order)
If you change `Q_VAL`, `K`, `H_CONV`, or `L` in `config.py`, the normalization in `heat_pinn_loss()` adjusts automatically. If losses diverge, check that `T_char = Q_VAL·L/K` is not near zero. **`inverse/config_inverse.py`**
- `N_SENSORS`, `SENSOR_POSITIONS` (list of x positions)
- `NOISE_STD` — Gaussian noise std on measurements [°C]
- `IDENTIFY = ['alpha', 'k', 'h_conv']`
- `ALPHA_INIT`, `K_INIT`, `H_CONV_INIT` — initial guesses (25× off from true values)
- `EPOCHS_INV`, `LR_ADAM_INV`, `W_DATA = 10.0`
- `MODELS_DIR`, `DATA_PATH`
**`inverse/data.py`**
- `generate_measurements(noise_std, sensor_positions)`: call `fdm.solver.solve()` with true params from `config.py`, sample at nearest FDM nodes, add noise, save to `inverse/data/measurements.csv` (columns: `x, t, T`)
- `load_measurements(device)`: load CSV → tensors `(x_s, t_s, T_meas)` on device
**`inverse/model.py`** — `InverseHeatPINN(nn.Module)`
- Same 5-layer architecture as `HeatPINN`
- Unknown params as log-space `nn.Parameter` (guarantees positivity without constraints):
```python
self.log_alpha = nn.Parameter(torch.log(torch.tensor(ALPHA_INIT)))
self.log_k = nn.Parameter(torch.log(torch.tensor(K_INIT)))
self.log_h_conv = nn.Parameter(torch.log(torch.tensor(H_CONV_INIT)))
```
- Properties `alpha`, `k`, `h_conv` that return `exp(log_*)`
- `forward()` uses same output scaling as `HeatPINN` but with `self.k` and `self.alpha`
- Never `.detach()` the learned params inside the loss — gradients must flow through them
**`inverse/loss.py`** — `inverse_heat_pinn_loss(..., x_s, t_s, T_meas)`
- Same PDE/IC/BC structure as `heat_pinn_loss()` but uses `model.alpha`, `model.k`, `model.h_conv`
- Normalization scales must be computed from the **current learned params** (not config constants), otherwise there is no gradient signal toward the physics params
- Adds data fit term: `L_data = mean((T_pred(x_s, t_s) T_meas)²) / T_char²`
- Total: `w_pde·L_pde + w_ic·L_ic + w_bc·L_bc + w_data·L_data`
**`inverse/engine.py`**
- `prepare_data_inverse()`: same clustering strategy as `engine.prepare_data()`
- `train_inverse(data, measurements)`: **Adam only** (no L-BFGS — unstable when physics params are learnable because loss curvature differs by orders of magnitude between network weights and physics params); print identified param values every 100 epochs
- `evaluate_inverse(model)`: print table of true vs identified params with relative error %; also compute L2 error of T field vs FDM
**`inverse/app.py`** — CLI menu: (1) Generate measurements, (2) Train, (3) Evaluate, (0) Exit
**`inverse/__init__.py`** — empty
### Pitfalls
- If `W_DATA` is too high, BC/IC are ignored and the net overfits measurements (physics collapses)
- Sensors far from x=0 and x=L → poor identification of `H_CONV` (weak boundary signal)
- Do not resample sensor points each epoch — `(x_s, t_s, T_meas)` are fixed throughout training
+9 -2
View File
@@ -3,7 +3,7 @@ import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fdm.solver import solve from fdm.solver import solve, save_csv
from fdm.visualizer import visualize_fdm from fdm.visualizer import visualize_fdm
@@ -26,10 +26,11 @@ def main_menu():
print("-" * 30) print("-" * 30)
print("1. Risolvi") print("1. Risolvi")
print("2. Visualizza") print("2. Visualizza")
print("3. Salva CSV")
print("0. Esci") print("0. Esci")
print("-" * 30) print("-" * 30)
choice = input("Select an option (0-2): ").strip() choice = input("Select an option (0-3): ").strip()
if choice == "1": if choice == "1":
T, x_vals, t_vals = solve() T, x_vals, t_vals = solve()
@@ -42,6 +43,12 @@ def main_menu():
else: else:
visualize_fdm(T, x_vals, t_vals) visualize_fdm(T, x_vals, t_vals)
elif choice == "3":
if T is None:
print("Eseguire prima l'opzione 1.")
else:
save_csv(T, x_vals, t_vals, "results/fdm/fdm_solution.csv")
elif choice == "0": elif choice == "0":
print("Uscita.") print("Uscita.")
sys.exit(0) sys.exit(0)
+14
View File
@@ -14,6 +14,7 @@ Returns T_matrix of shape (NX, NT).
import sys import sys
import os import os
import numpy as np import numpy as np
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config import config
@@ -85,3 +86,16 @@ def solve():
T_matrix[:, n + 1] = T_cur T_matrix[:, n + 1] = T_cur
return T_matrix, x_vals, t_vals return T_matrix, x_vals, t_vals
def save_csv(T_matrix, x_vals, t_vals, path: str) -> None:
"""Save FDM solution to CSV with columns: x, t, T."""
xs, ts = np.meshgrid(x_vals, t_vals, indexing="ij")
df = pd.DataFrame({
"x": xs.ravel(),
"t": ts.ravel(),
"T": T_matrix.ravel(),
})
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
df.to_csv(path, index=False)
print(f"Salvato: {path} ({len(df)} righe)")
View File
+117
View File
@@ -0,0 +1,117 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def print_header():
print("=" * 42)
print(" Inverse Heat PINN")
print(" Identifica: α, k, h_conv da misure")
print("=" * 42)
_ALL_PARAMS = ('alpha', 'k', 'h_conv')
def _ask_identify():
print(f"\nParametri disponibili: {', '.join(_ALL_PARAMS)}")
raw = input("Quali identificare? (invio = tutti, oppure es: alpha k): ").strip()
chosen = [p for p in (raw.split() if raw else _ALL_PARAMS) if p in _ALL_PARAMS]
invalid = [p for p in raw.split() if p not in _ALL_PARAMS] if raw else []
if invalid:
print(f" Ignorati (non validi): {invalid}")
if not chosen:
print(" Nessun parametro valido — uso tutti.")
chosen = list(_ALL_PARAMS)
print(f" Identifico: {chosen}")
import config as _cfg
_true = {'alpha': _cfg.ALPHA, 'k': _cfg.K, 'h_conv': _cfg.H_CONV}
init_vals = {}
for p in chosen:
true_val = _true[p]
raw_v = input(f" Valore iniziale per {p} (vero: {true_val}): ").strip()
try:
init_vals[p] = float(raw_v) if raw_v else true_val
except ValueError:
init_vals[p] = true_val
print(f"{p} inizia da {init_vals[p]}")
from inverse.config_inverse import W_PDE, W_IC, W_BC, W_DATA
print(f"\nPesi loss (default — PDE:{W_PDE} IC:{W_IC} BC:{W_BC} Data:{W_DATA})")
weights = {}
for wname, wdefault in [('w_pde', W_PDE), ('w_ic', W_IC), ('w_bc', W_BC), ('w_data', W_DATA)]:
raw_w = input(f" {wname} (default {wdefault}): ").strip()
try:
weights[wname] = float(raw_w) if raw_w else wdefault
except ValueError:
weights[wname] = wdefault
raw_ep = input("\nNumero di epoch (default 10000): ").strip()
try:
epochs = int(raw_ep) if raw_ep else 10000
except ValueError:
epochs = 10000
return chosen, init_vals, weights, epochs
def main():
print_header()
from inverse.engine import prepare_data_inverse, _get_device
print("\nGenerazione punti di collocazione...")
data = prepare_data_inverse()
print(f"Pronti — device: {data['device']}\n")
x_s = None
t_s = None
T_meas = None
while True:
print("\n" + "-" * 34)
print(" MAIN MENU")
print("-" * 34)
print("1. Carica misure")
print("2. Addestra (identifica α, k, h)")
print("3. Valuta risultati")
print("4. Visualizza")
print("0. Esci")
print("-" * 34)
choice = input("Scelta (0-4): ").strip()
if choice == "1":
from inverse.data import load_measurements
x_s, t_s, T_meas = load_measurements(data['device'])
elif choice == "2":
if x_s is None:
print("Caricare prima le misure (opzione 1).")
continue
identify, init_vals, weights, epochs = _ask_identify()
from inverse.engine import train_inverse
try:
train_inverse(data, x_s, t_s, T_meas, identify=identify, init_vals=init_vals, epochs=epochs, **weights)
except KeyboardInterrupt:
print("\nTraining interrotto. Il miglior modello trovato è stato salvato.")
elif choice == "3":
from inverse.engine import evaluate_inverse
evaluate_inverse()
elif choice == "4":
from inverse.engine import generate_visualization_inverse
generate_visualization_inverse()
elif choice == "0":
print("Uscita.")
sys.exit(0)
else:
print("Scelta non valida.")
if __name__ == "__main__":
main()
+43
View File
@@ -0,0 +1,43 @@
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config as _cfg
# Dati di misura
MEASUREMENTS_PATH = "results/measurements.csv"
# Parametri da identificare (valori veri in config.py)
IDENTIFY = ['alpha', 'k', 'h_conv']
# Stime iniziali — deliberatamente lontane dai valori veri
ALPHA_INIT = _cfg.ALPHA * 3.0 # vero: 0.01 → inizia a 0.03
K_INIT = _cfg.K * 0.4 # vero: 1.0 → inizia a 0.4
H_CONV_INIT = _cfg.H_CONV * 2.5 # vero: 10.0 → inizia a 25.0
# Training
EPOCHS_INV = 10000
LR_ADAM_INV = 1e-3
PATIENCE_INV = 800
SCHED_FACTOR = 0.5
SCHED_PATIENCE = 200
SCHED_MIN_LR = 1e-6
# Pesi loss
W_PDE = 1.0
W_IC = 1.0
W_BC = 5.0
W_DATA = 10.0
# Architettura (stessa del forward PINN)
HIDDEN_SIZE = _cfg.HIDDEN_SIZE
N_HIDDEN_LAYERS = _cfg.N_HIDDEN_LAYERS
# Collocation
N_F = _cfg.N_F
N_IC = _cfg.N_IC
N_BC = _cfg.N_BC
# Output
MODELS_DIR = "models"
MODEL_SAVE_PATH = "models/best_inverse_pinn.pth"
+25
View File
@@ -0,0 +1,25 @@
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pandas as pd
import torch
from inverse.config_inverse import MEASUREMENTS_PATH
def load_measurements(device: torch.device):
"""Carica measurements.csv e restituisce tensori (x_s, t_s, T_meas) sul device."""
if not os.path.exists(MEASUREMENTS_PATH):
raise FileNotFoundError(
f"File misure non trovato: {MEASUREMENTS_PATH}\n"
"Esegui prima 'python inverse/sample_sensors.py' per generare i dati."
)
df = pd.read_csv(MEASUREMENTS_PATH)
x_s = torch.tensor(df["x"].values, dtype=torch.float32, device=device)
t_s = torch.tensor(df["t"].values, dtype=torch.float32, device=device)
T_meas = torch.tensor(df["T"].values, dtype=torch.float32, device=device)
print(f"Misure caricate: {len(df)} punti da {MEASUREMENTS_PATH}")
print(f" Sensori x: {sorted(df['x'].unique().tolist())}")
print(f" T range: [{df['T'].min():.2f}, {df['T'].max():.2f}] °C")
return x_s, t_s, T_meas
+212
View File
@@ -0,0 +1,212 @@
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import random
import numpy as np
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import config
from inverse.config_inverse import (
N_F, N_IC, N_BC,
EPOCHS_INV, LR_ADAM_INV, PATIENCE_INV,
SCHED_FACTOR, SCHED_PATIENCE, SCHED_MIN_LR,
MODEL_SAVE_PATH, MODELS_DIR,
)
from inverse.model import InverseHeatPINN
from inverse.loss import inverse_heat_pinn_loss
def _get_device():
if torch.cuda.is_available():
try:
t = torch.zeros(2, 2).cuda()
torch.mm(t, t)
return torch.device('cuda')
except RuntimeError:
pass
if torch.backends.mps.is_available():
return torch.device('mps')
return torch.device('cpu')
def _set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
def prepare_data_inverse():
_set_seed(42)
device = _get_device()
x_f = torch.rand(N_F, device=device) * config.L
t_f = torch.rand(N_F, device=device) * config.T_END
# Clustering vicino a X_SRC e T_STEP (stessa strategia del forward PINN)
n_extra = N_F // 4
x_near = config.X_SRC + (torch.rand(n_extra, device=device) - 0.5) * config.L * 0.1
x_near = x_near.clamp(0, config.L)
t_near = torch.rand(n_extra, device=device) * config.T_END
x_step = torch.rand(n_extra, device=device) * config.L
t_step = config.T_STEP + (torch.rand(n_extra, device=device) - 0.5) * 0.1
t_step = t_step.clamp(0, config.T_END)
x_f = torch.cat([x_f, x_near, x_step])
t_f = torch.cat([t_f, t_near, t_step])
x_ic = torch.rand(N_IC, device=device) * config.L
t_bc = torch.rand(N_BC, device=device) * config.T_END
return {'device': device, 'x_f': x_f, 't_f': t_f, 'x_ic': x_ic, 't_bc': t_bc}
def train_inverse(data, x_s, t_s, T_meas, identify=('alpha', 'k', 'h_conv'), init_vals=None, epochs=None,
w_pde=None, w_ic=None, w_bc=None, w_data=None):
device = data['device']
if epochs is None:
epochs = EPOCHS_INV
model = InverseHeatPINN(identify=identify, init_vals=init_vals).to(device)
print(f"\n--- Inverse PINN Training (Adam) su {device} ---")
print(f"Stime iniziali: α={model.alpha.item():.4f} k={model.k.item():.4f} h={model.h_conv.item():.4f}")
print(f"Valori veri: α={config.ALPHA:.4f} k={config.K:.4f} h={config.H_CONV:.4f}\n")
optimizer = optim.Adam(model.parameters(), lr=LR_ADAM_INV)
scheduler = ReduceLROnPlateau(optimizer, mode='min',
factor=SCHED_FACTOR,
patience=SCHED_PATIENCE,
min_lr=SCHED_MIN_LR)
os.makedirs(MODELS_DIR, exist_ok=True)
best_loss = float('inf')
patience_counter = 0
def _save_best(model):
torch.save({
'state_dict': model.state_dict(),
'identify': model.identify,
'alpha': model.alpha.item(),
'k': model.k.item(),
'h_conv': model.h_conv.item(),
}, MODEL_SAVE_PATH)
model.train()
try:
for epoch in range(epochs):
optimizer.zero_grad()
loss, L_pde, L_ic, L_bc, L_data = inverse_heat_pinn_loss(
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'],
x_s, t_s, T_meas,
**{k: v for k, v in {'w_pde': w_pde, 'w_ic': w_ic, 'w_bc': w_bc, 'w_data': w_data}.items() if v is not None},
)
loss.backward()
optimizer.step()
scheduler.step(loss.item())
if loss.item() < best_loss - 1e-8:
best_loss = loss.item()
patience_counter = 0
_save_best(model)
else:
patience_counter += 1
if patience_counter >= PATIENCE_INV:
print(f"Early stopping a epoca {epoch + 1}")
break
if (epoch + 1) % 100 == 0:
print(
f"Epoch [{epoch+1}/{epochs}] | Loss: {loss.item():.6f} "
f"| PDE: {L_pde.item():.5f} IC: {L_ic.item():.5f} "
f"BC: {L_bc.item():.5f} Data: {L_data.item():.5f} "
f"| α={model.alpha.item():.5f} k={model.k.item():.5f} h={model.h_conv.item():.4f}"
)
except KeyboardInterrupt:
print(f"\nInterrotto a epoca {epoch + 1}. Salvo stato corrente...")
_save_best(model)
raise
print("\nTraining completato. Modello salvato.")
def evaluate_inverse():
device = _get_device()
if not os.path.exists(MODEL_SAVE_PATH):
print("Modello non trovato. Esegui prima il training.")
return
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
model = InverseHeatPINN().to(device)
model.load_state_dict(ckpt['state_dict'])
model.eval()
alpha_id = model.alpha.item()
k_id = model.k.item()
h_conv_id = model.h_conv.item()
print("\n--- Parametri identificati vs veri ---")
print(f"{'Param':<10} {'Vero':>10} {'Identificato':>14} {'Errore %':>10}")
print("-" * 48)
for name, true_val, id_val in [
('alpha', config.ALPHA, alpha_id),
('k', config.K, k_id),
('h_conv', config.H_CONV, h_conv_id),
]:
err = abs(id_val - true_val) / true_val * 100
print(f"{name:<10} {true_val:>10.5f} {id_val:>14.5f} {err:>9.2f}%")
# Errore L2 del campo T vs FDM
from fdm.solver import solve as fdm_solve
T_fdm, x_fdm, t_fdm = fdm_solve()
nx, nt = 100, 100
x_vals = torch.linspace(0, config.L, nx, device=device)
t_vals = torch.linspace(0, config.T_END, nt, device=device)
xx, tt = torch.meshgrid(x_vals, t_vals, indexing='ij')
inp = torch.stack([xx.reshape(-1), tt.reshape(-1)], dim=1)
with torch.no_grad():
T_pred = model(inp).reshape(nx, nt).cpu().numpy()
x_idx = np.linspace(0, T_fdm.shape[0] - 1, nx, dtype=int)
t_idx = np.linspace(0, T_fdm.shape[1] - 1, nt, dtype=int)
T_fdm_ds = T_fdm[np.ix_(x_idx, t_idx)]
l2_rel = np.sqrt(np.mean((T_pred - T_fdm_ds) ** 2)) / np.sqrt(np.mean(T_fdm_ds ** 2))
max_err = np.max(np.abs(T_pred - T_fdm_ds))
print(f"\nErrore L2 relativo campo T vs FDM: {l2_rel:.6f}")
print(f"Errore massimo assoluto: {max_err:.4f} °C")
def generate_visualization_inverse():
device = _get_device()
if not os.path.exists(MODEL_SAVE_PATH):
print("Modello non trovato. Esegui prima il training.")
return
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
identify = ckpt.get('identify', ['alpha', 'k', 'h_conv'])
model = InverseHeatPINN(identify=identify).to(device)
model.load_state_dict(ckpt['state_dict'])
model.eval()
nx, nt = 100, 100
x_vals = torch.linspace(0, config.L, nx, device=device)
t_vals = torch.linspace(0, config.T_END, nt, device=device)
xx, tt = torch.meshgrid(x_vals, t_vals, indexing='ij')
with torch.no_grad():
T_pred = model(torch.stack([xx.reshape(-1), tt.reshape(-1)], dim=1)).reshape(nx, nt).cpu().numpy()
from fdm.solver import solve as fdm_solve
T_fdm, _, _ = fdm_solve()
identified_params = {'α': model.alpha.item(), 'k': model.k.item(), 'h': model.h_conv.item()}
from inverse.visualizer import visualize_inverse
visualize_inverse(T_pred, x_vals.cpu().numpy(), t_vals.cpu().numpy(), T_fdm, identified_params)
+64
View File
@@ -0,0 +1,64 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import config
from inverse.config_inverse import W_PDE, W_IC, W_BC, W_DATA
def inverse_heat_pinn_loss(model, x_f, t_f, x_ic, t_bc, x_s, t_s, T_meas,
w_pde=W_PDE, w_ic=W_IC, w_bc=W_BC, w_data=W_DATA):
# Parametri appresi — NON detachare, i gradienti devono fluire
alpha = model.alpha
k = model.k
h_conv = model.h_conv
# Scale calcolate dai parametri correnti (necessario per segnale di gradiente verso i params)
T_char = config.Q_VAL * config.L / k
sigma = config.GAUSS_SIGMA
src_peak = alpha * config.Q_VAL / (k * sigma * (2 * torch.pi) ** 0.5)
pde_scale = torch.clamp((T_char / config.T_END) ** 2 + src_peak ** 2, min=1e-8)
bc_scale = torch.clamp(
torch.max((config.Q_VAL / k) ** 2, (h_conv * T_char / k) ** 2), min=1e-8
)
# --- PDE residual ---
x_f = x_f.detach().requires_grad_(True)
t_f = t_f.detach().requires_grad_(True)
T_f = model(torch.stack([x_f, t_f], dim=1))
dT_dt, dT_dx = torch.autograd.grad(T_f.sum(), [t_f, x_f], create_graph=True)
d2T_dx2 = torch.autograd.grad(dT_dx.sum(), x_f, create_graph=True)[0]
Q_t = torch.where(t_f >= config.T_STEP,
torch.tensor(config.Q_VAL, device=t_f.device, dtype=t_f.dtype),
torch.tensor(0.0, device=t_f.device, dtype=t_f.dtype))
gauss = torch.exp(-0.5 * ((x_f - config.X_SRC) / sigma) ** 2) / (sigma * (2 * torch.pi) ** 0.5)
source = (alpha / k) * Q_t * gauss
L_pde = ((dT_dt - alpha * d2T_dx2 - source) ** 2).mean() / pde_scale
# --- IC: T(x, 0) = T0 ---
T_ic_pred = model(torch.stack([x_ic, torch.zeros_like(x_ic)], dim=1))
L_ic = ((T_ic_pred - config.T0) ** 2).mean() / (T_char ** 2 + 1e-8)
# --- BC x=0: -k dT/dx = h(T - T_amb) → dT/dx - h/k*(T - T_amb) = 0 ---
x_left = torch.zeros(t_bc.shape[0], device=t_bc.device).requires_grad_(True)
T_left = model(torch.stack([x_left, t_bc.detach()], dim=1))
dT_dx_left = torch.autograd.grad(T_left.sum(), x_left, create_graph=True)[0]
L_bc_left = ((dT_dx_left - (h_conv / k) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / bc_scale
# --- BC x=L: k dT/dx = -h(T - T_amb) → dT/dx + h/k*(T - T_amb) = 0 ---
x_right = torch.full((t_bc.shape[0],), config.L, device=t_bc.device).requires_grad_(True)
T_right = model(torch.stack([x_right, t_bc.detach()], dim=1))
dT_dx_right = torch.autograd.grad(T_right.sum(), x_right, create_graph=True)[0]
L_bc_right = ((dT_dx_right + (h_conv / k) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / bc_scale
L_bc = L_bc_left + L_bc_right
# --- Data fit ---
T_pred_s = model(torch.stack([x_s, t_s], dim=1)).squeeze()
L_data = ((T_pred_s - T_meas) ** 2).mean() / (T_char ** 2 + 1e-8)
total = w_pde * L_pde + w_ic * L_ic + w_bc * L_bc + w_data * L_data
return total, L_pde, L_ic, L_bc, L_data
+55
View File
@@ -0,0 +1,55 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import torch.nn as nn
import config
from inverse.config_inverse import (
HIDDEN_SIZE, N_HIDDEN_LAYERS,
ALPHA_INIT, K_INIT, H_CONV_INIT,
)
class InverseHeatPINN(nn.Module):
def __init__(self, identify=('alpha', 'k', 'h_conv'), init_vals=None):
super().__init__()
layers = [nn.Linear(2, HIDDEN_SIZE), nn.Tanh()]
for _ in range(N_HIDDEN_LAYERS - 1):
layers += [nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE), nn.Tanh()]
layers.append(nn.Linear(HIDDEN_SIZE, 1))
self.net = nn.Sequential(*layers)
self.identify = list(identify)
if init_vals is None:
init_vals = {}
_inits = {'alpha': ALPHA_INIT, 'k': K_INIT, 'h_conv': H_CONV_INIT}
_true = {'alpha': config.ALPHA, 'k': config.K, 'h_conv': config.H_CONV}
for name in ('alpha', 'k', 'h_conv'):
val = init_vals.get(name, _inits[name]) if name in identify else _true[name]
log_val = torch.log(torch.tensor(float(val)))
if name in identify:
setattr(self, f'log_{name}', nn.Parameter(log_val))
else:
self.register_buffer(f'log_{name}', log_val)
@property
def alpha(self):
return torch.exp(self.log_alpha)
@property
def k(self):
return torch.exp(self.log_k)
@property
def h_conv(self):
return torch.exp(self.log_h_conv)
def forward(self, xt):
x = xt[:, :1] / config.L
t = xt[:, 1:] / config.T_END
# Output scaling identica al forward PINN, ma usa i parametri appresi
T_char = config.T_AMB + (config.Q_VAL * config.L / self.k)
return config.T_AMB + (config.Q_VAL * config.L / self.k) * self.net(torch.cat([x, t], dim=1))
+112
View File
@@ -0,0 +1,112 @@
"""
Campionamento sensori dalla soluzione FDM.
Legge results/fdm/fdm_solution.csv (o lancia il solver FDM se non esiste),
chiede le posizioni dei sensori e la frequenza di campionamento, salva
results/fdm/measurements.csv con colonne: x, t, T.
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
import pandas as pd
FDM_CSV = "results/fdm/fdm_solution.csv"
OUT_CSV = "results/measurements.csv"
def _load_or_solve():
if os.path.exists(FDM_CSV):
print(f"Carico soluzione FDM da {FDM_CSV} ...")
df = pd.read_csv(FDM_CSV)
x_vals = np.sort(df["x"].unique())
t_vals = np.sort(df["t"].unique())
T_matrix = df.pivot(index="x", columns="t", values="T").values
return T_matrix, x_vals, t_vals
else:
print(f"{FDM_CSV} non trovato. Lancio il solver FDM ...")
from fdm.solver import solve, save_csv
T_matrix, x_vals, t_vals = solve()
save_csv(T_matrix, x_vals, t_vals, FDM_CSV)
return T_matrix, x_vals, t_vals
def _ask_sensors(L: float) -> list[float]:
print(f"\nDominio spaziale: [0, {L}] m")
print("Inserisci le posizioni dei sensori separate da virgola (es: 0.2, 0.5, 0.8):")
while True:
raw = input(" x sensori [m]: ").strip()
try:
positions = [float(v.strip()) for v in raw.split(",") if v.strip()]
if not positions:
raise ValueError
out_of_range = [p for p in positions if p < 0 or p > L]
if out_of_range:
print(f" Valori fuori range [0, {L}]: {out_of_range}. Riprova.")
continue
return positions
except ValueError:
print(" Input non valido. Usa numeri separati da virgola.")
def _ask_freq(dt_fdm: float) -> float:
max_freq = 1.0 / dt_fdm
print(f"\nFrequenza massima (passo FDM): {max_freq:.2f} Hz")
print("Inserisci la frequenza di campionamento [Hz]:")
while True:
raw = input(" f [Hz]: ").strip()
try:
f = float(raw)
if f <= 0:
raise ValueError
if f > max_freq:
print(f" Frequenza troppo alta (max {max_freq:.2f} Hz). Viene usata la frequenza massima.")
f = max_freq
return f
except ValueError:
print(" Input non valido. Inserisci un numero positivo.")
def main():
print("=" * 40)
print(" Campionamento sensori FDM")
print("=" * 40)
import config
T_matrix, x_vals, t_vals = _load_or_solve()
sensor_positions = _ask_sensors(config.L)
freq = _ask_freq(t_vals[1] - t_vals[0])
# Indici temporali al passo di campionamento
dt_sample = 1.0 / freq
t_sampled = np.arange(t_vals[0], t_vals[-1] + dt_sample * 0.5, dt_sample)
t_indices = [int(np.argmin(np.abs(t_vals - t))) for t in t_sampled]
t_indices = sorted(set(t_indices))
# Campionamento
rows = []
for x_s in sensor_positions:
i_x = int(np.argmin(np.abs(x_vals - x_s)))
x_actual = x_vals[i_x]
for i_t in t_indices:
rows.append({"x": x_actual, "t": t_vals[i_t], "T": T_matrix[i_x, i_t]})
df_out = pd.DataFrame(rows).sort_values(["x", "t"]).reset_index(drop=True)
os.makedirs(os.path.dirname(os.path.abspath(OUT_CSV)), exist_ok=True)
df_out.to_csv(OUT_CSV, index=False)
n_sensors = len(sensor_positions)
n_times = len(t_indices)
print(f"\nSalvato: {OUT_CSV}")
print(f" Sensori: {n_sensors} | Istanti: {n_times} | Righe totali: {len(df_out)}")
print(f" Posizioni effettive: {sorted(df_out['x'].unique().tolist())}")
print(f" Intervallo T: [{df_out['T'].min():.2f}, {df_out['T'].max():.2f}] °C")
if __name__ == "__main__":
main()
+118
View File
@@ -0,0 +1,118 @@
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from datetime import datetime
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import config
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def visualize_inverse(T_pred, x_vals, t_vals, T_fdm, identified_params: dict):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_dir = os.path.join(BASE_DIR, 'results', 'inverse', timestamp)
os.makedirs(out_dir, exist_ok=True)
x_indices = np.linspace(0, T_fdm.shape[0] - 1, len(x_vals), dtype=int)
t_indices = np.linspace(0, T_fdm.shape[1] - 1, len(t_vals), dtype=int)
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)]
param_str = ' | '.join(f'{k}={v:.4f}' for k, v in identified_params.items())
subtitle = f'Parametri identificati: {param_str}'
colorscale = 'Hot_r'
zmin = float(min(np.min(T_pred), np.min(T_fdm_ds)))
zmax = float(max(np.max(T_pred), np.max(T_fdm_ds)))
# --- Heatmap ---
fig_map = make_subplots(
rows=1, cols=2,
subplot_titles=["Inverse PINN T(x,t)", "FDM Reference T(x,t)"],
shared_yaxes=True,
)
fig_map.add_trace(go.Heatmap(
z=T_pred.T, x=x_vals, y=t_vals,
colorscale=colorscale, zmin=zmin, zmax=zmax,
colorbar=dict(x=0.46, title='T [°C]'),
), row=1, col=1)
fig_map.add_trace(go.Heatmap(
z=T_fdm_ds.T, x=x_vals, y=t_vals,
colorscale=colorscale, zmin=zmin, zmax=zmax,
colorbar=dict(x=1.01, title='T [°C]'),
), row=1, col=2)
fig_map.update_xaxes(title_text='x')
fig_map.update_yaxes(title_text='t', row=1, col=1)
fig_map.update_layout(title_text=f'Inverse PINN vs FDM<br><sup>{subtitle}</sup>', height=450)
map_path = os.path.join(out_dir, 'heatmap.html')
fig_map.write_html(map_path)
print(f"Heatmap saved → {map_path}")
# --- Animated profile T(x) ---
n_frames = len(t_vals)
frames = [
go.Frame(
data=[
go.Scatter(x=x_vals, y=T_pred[:, i], mode='lines',
line=dict(color='royalblue', width=2), name='Inverse PINN'),
go.Scatter(x=x_vals, y=T_fdm_ds[:, i], mode='lines',
line=dict(color='firebrick', width=2, dash='dash'), name='FDM'),
],
name=str(i),
layout=go.Layout(title_text=f'Inverse PINN vs FDM | t = {t_vals[i]:.3f}'),
)
for i in range(n_frames)
]
fig_anim = go.Figure(
data=frames[0].data,
layout=go.Layout(
title=f'Inverse PINN vs FDM | t = {t_vals[0]:.3f}',
xaxis=dict(title='x [m]', range=[-0.02, config.L + 0.02]),
yaxis=dict(title='T [°C]', range=[zmin - 1, zmax + 1]),
legend=dict(x=0.75, y=0.95),
updatemenus=[dict(
type='buttons', showactive=False, y=1.15, x=0.5, xanchor='center',
buttons=[
dict(label='▶ Play', method='animate',
args=[None, dict(frame=dict(duration=40, redraw=False),
fromcurrent=True, mode='immediate')]),
dict(label='⏸ Pause', method='animate',
args=[[None], dict(frame=dict(duration=0, redraw=False),
mode='immediate')]),
],
)],
sliders=[dict(
steps=[dict(method='animate', args=[[str(i)],
dict(mode='immediate', frame=dict(duration=0, redraw=False))],
label=f'{t_vals[i]:.2f}') for i in range(n_frames)],
transition=dict(duration=0), x=0.05, y=0, len=0.9,
currentvalue=dict(prefix='t = ', font=dict(size=14)),
)],
),
frames=frames,
)
anim_path = os.path.join(out_dir, 'animation.html')
fig_anim.write_html(anim_path)
print(f"Animation saved → {anim_path}")
# --- Time-series a x=0, x=L/2, x=L ---
nx = len(x_vals)
points = [(0, 'x=0', 'blue'), (nx // 2, 'x=L/2', 'green'), (nx - 1, 'x=L', 'red')]
fig_ts = go.Figure()
for idx, label, color in points:
fig_ts.add_trace(go.Scatter(x=t_vals, y=T_pred[idx, :], mode='lines',
line=dict(color=color, width=2), name=f'Inv.PINN {label}'))
fig_ts.add_trace(go.Scatter(x=t_vals, y=T_fdm_ds[idx, :], mode='lines',
line=dict(color=color, width=2, dash='dash'), name=f'FDM {label}'))
fig_ts.add_vline(x=config.T_STEP, line=dict(color='red', dash='dash', width=1.5),
annotation_text='Heat ON', annotation_position='top right')
fig_ts.update_layout(
title=f'Inverse PINN vs FDM — Time Series<br><sup>{subtitle}</sup>',
xaxis_title='t', yaxis_title='T(x,t)',
legend=dict(x=0.01, y=0.99), height=500,
)
comparison_path = os.path.join(out_dir, 'comparison.html')
fig_ts.write_html(comparison_path)
print(f"Comparison saved → {comparison_path}")