Compare commits
20 Commits
fdm
...
pinn-inversa
| Author | SHA1 | Date | |
|---|---|---|---|
| 295057e80b | |||
| b65051057a | |||
| c928b98de2 | |||
| 213cd6fba5 | |||
| 51bb8e5fc8 | |||
| b6598fb7d8 | |||
| 5a8cd3ef46 | |||
| f94fd51942 | |||
| b8301a4329 | |||
| 1237a57290 | |||
| e868c47190 | |||
| 649d26cfd4 | |||
| 256945ada3 | |||
| f02c5f2bbe | |||
| 9e77deffd5 | |||
| bca829bd7e | |||
| 98bfc78651 | |||
| fbb0458f69 | |||
| 4f050e80df | |||
| b5553691e8 |
@@ -98,6 +98,7 @@ StyleCopReport.xml
|
||||
*.svclog
|
||||
*.scc
|
||||
.venv
|
||||
.venv*
|
||||
|
||||
# Chutzpah Test files
|
||||
_Chutzpah*
|
||||
|
||||
@@ -6,97 +6,100 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
||||
|
||||
Write all git commit messages in Italian.
|
||||
|
||||
## Scope of Work
|
||||
|
||||
**Modifica solo il modulo `fdm/`.** Ignora qualsiasi richiesta riguardante PINN (`model.py`, `engine.py`, `visualizer.py`, `app.py` radice). Se una richiesta coinvolge file PINN, avvisa l'utente e non apportare modifiche.
|
||||
|
||||
## Project Overview
|
||||
|
||||
**Heat Equation PINN** — A Physics-Informed Neural Network that solves the 1D time-varying heat equation with an internal point heat source:
|
||||
|
||||
```
|
||||
∂T/∂t = α ∂²T/∂x² + (α/k) Q(t) δ(x − X_SRC) x ∈ [0, L], t ∈ [0, T_END]
|
||||
```
|
||||
|
||||
where `Q(t) = Q_VAL` if `t ≥ T_STEP` else `0`.
|
||||
|
||||
- **IC:** `T(x, 0) = T0` (uniform)
|
||||
- **BC x=0:** Robin — convection: `−k ∂T/∂x = h (T − T_AMB)`
|
||||
- **BC x=L:** Robin — convection: `−k ∂T/∂x = h (T − T_AMB)`
|
||||
|
||||
No experimental data is needed. A `fdm/` module provides a reference numerical solution (FTCS explicit scheme) used for evaluation and visualization comparison.
|
||||
|
||||
All physical and numerical parameters live in `config.py`.
|
||||
|
||||
## Running
|
||||
|
||||
Always activate the virtual environment first:
|
||||
## Commands
|
||||
|
||||
```bash
|
||||
source .venv/bin/activate
|
||||
source .venv/bin/activate # always first
|
||||
|
||||
python app.py # forward PINN: train / evaluate / visualize
|
||||
python fdm/app.py # FDM reference solver menu
|
||||
python -m inverse.app # inverse PINN menu (to be implemented)
|
||||
|
||||
pytest # all tests
|
||||
pytest -m "not slow" # skip full-training tests
|
||||
pytest tests/test_model.py # single file
|
||||
```
|
||||
|
||||
**PINN:**
|
||||
```bash
|
||||
python app.py # Train / Evaluate (L2 vs FDM) / Visualize
|
||||
```
|
||||
|
||||
**FDM reference solver:**
|
||||
```bash
|
||||
python fdm/app.py # Solve / Heatmap / Animation / Time-series
|
||||
```
|
||||
|
||||
Saved artifacts (git-ignored): `models/best_heat_pinn_model.pth`, HTML plots in `animations/` and `animations/fdm/`.
|
||||
|
||||
To retrain from scratch: `rm models/best_heat_pinn_model.pth` before running option 1.
|
||||
|
||||
## Dependencies
|
||||
|
||||
`requirements.txt` exists. Key packages: `torch`, `numpy`, `plotly`. No `pandas` or `scikit-learn` needed.
|
||||
|
||||
GPU is auto-detected (`cuda` → `mps` → `cpu`) in `engine.py:_get_device()`.
|
||||
Delete `models/best_heat_pinn_model.pth` to retrain from scratch.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
config.py ← all physical + numerical parameters (edit here to change the problem)
|
||||
app.py ← PINN CLI menu
|
||||
model.py ← HeatPINN + heat_pinn_loss()
|
||||
engine.py ← data sampling, Adam+L-BFGS training, evaluation vs FDM, visualization call
|
||||
visualizer.py ← PINN vs FDM: heatmap, animated T(x), time-series at fixed points
|
||||
fdm/
|
||||
solver.py ← FTCS explicit scheme, Robin on both ends, point source at X_SRC
|
||||
visualizer.py ← same 3 plot types for FDM-only output
|
||||
app.py ← FDM CLI menu
|
||||
config.py ← all physical + numerical parameters
|
||||
model.py ← HeatPINN (5-layer FC) + heat_pinn_loss()
|
||||
engine.py ← prepare_data(), train_model(), evaluate_model()
|
||||
app.py ← forward PINN CLI
|
||||
visualizer.py ← PINN vs FDM plots (Plotly HTML)
|
||||
fdm/solver.py ← FTCS explicit scheme, returns T_matrix[NX, NT]
|
||||
inverse/ ← inverse PINN (to implement — see plan below)
|
||||
tests/ ← pytest suite (42 tests); conftest.py has device, small_data, pinn_model fixtures
|
||||
```
|
||||
|
||||
### Neural Network (`model.py`)
|
||||
## Key design decisions
|
||||
|
||||
`HeatPINN`: 5-layer fully connected, input `(x, t)` → output `T`.
|
||||
|
||||
**Output scaling** — the network predicts a dimensionless perturbation; the `forward()` applies:
|
||||
**Output scaling** (`model.py:forward`):
|
||||
```
|
||||
T = T_AMB + (Q_VAL · L / K) · net(x, t)
|
||||
T = T_AMB + (Q_VAL · L / K) · net(x_norm, t_norm)
|
||||
```
|
||||
This keeps `net` outputs in `[0, 1]` range and ensures gradients `∂T/∂x` are O(1) for the network to learn. Do not remove this scaling.
|
||||
This keeps net outputs in [0,1] and ∂T/∂x at O(1). Do not remove.
|
||||
|
||||
`heat_pinn_loss()` normalizes all four loss terms to O(1) using `T_char = Q_VAL·L/K` and `grad_char = (Q_VAL/K)²`. The PDE residual includes the Gaussian-smoothed source term (σ=0.02) as a continuous approximation to δ(x − X_SRC). Changing physical parameters in `config.py` does not require re-tuning loss weights.
|
||||
**Loss normalization** (`model.py:heat_pinn_loss`): all four terms are scaled to O(1) via `_T_char = Q_VAL·L/K` and `_bc_scale`. Changing physical params in `config.py` does not require retuning weights.
|
||||
|
||||
### Training (`engine.py`)
|
||||
**Collocation clustering** (`engine.py:prepare_data`): 25% extra points near `X_SRC` (source gradient) and `T_STEP` (flux discontinuity). First lever to pull if accuracy is poor: increase `N_F`.
|
||||
|
||||
`prepare_data()` samples collocation points with **deliberate clustering**: extra points near `x=X_SRC` (steep gradient at source) and around `t=T_STEP` (flux step discontinuity). Increasing `N_f` / `N_bc` here is the first lever to pull if accuracy is low.
|
||||
**Training sequence**: Adam (early stopping + ReduceLROnPlateau) → L-BFGS fine-tuning. L-BFGS uses a `_last` closure dict to capture loss components without double-calling the loss outside a grad context.
|
||||
|
||||
`train_model()` runs **Adam first, then L-BFGS fine-tuning**. L-BFGS uses a closure that captures loss components in `_last` dict (avoids calling `heat_pinn_loss` outside an active grad context).
|
||||
**FDM Robin BCs** (`fdm/solver.py`): implicit-like update `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`. Point source added after BCs: `T[i_src] += Q·α·dt/(k·dx)`.
|
||||
|
||||
`evaluate_model()` runs the FDM solver and downsamples its `(NX, NT)` output to the PINN prediction grid `(100, 100)` for L2 comparison.
|
||||
---
|
||||
|
||||
### FDM Solver (`fdm/solver.py`)
|
||||
## Inverse PINN — implementation plan
|
||||
|
||||
Returns `(T_matrix[NX, NT], x_vals, t_vals)`. Uses:
|
||||
- Robin BC on both ends: `T[0] = (T[1] + robin_coeff·T_amb) / (1 + robin_coeff)`
|
||||
- Point source injected at node `i_src = argmin|x - X_SRC|` after BCs: `T[i_src] += Q·α·dt/(k·dx)`
|
||||
- CFL check at startup (warns, does not crash)
|
||||
Goal: identify unknown physical parameters (`ALPHA`, `K`, `H_CONV`) from sparse noisy temperature measurements. The network learns T(x,t) and the physics parameters simultaneously.
|
||||
|
||||
### Loss Scaling Notes
|
||||
### Files to create (in order)
|
||||
|
||||
If you change `Q_VAL`, `K`, `H_CONV`, or `L` in `config.py`, the normalization in `heat_pinn_loss()` adjusts automatically. If losses diverge, check that `T_char = Q_VAL·L/K` is not near zero.
|
||||
**`inverse/config_inverse.py`**
|
||||
- `N_SENSORS`, `SENSOR_POSITIONS` (list of x positions)
|
||||
- `NOISE_STD` — Gaussian noise std on measurements [°C]
|
||||
- `IDENTIFY = ['alpha', 'k', 'h_conv']`
|
||||
- `ALPHA_INIT`, `K_INIT`, `H_CONV_INIT` — initial guesses (2–5× off from true values)
|
||||
- `EPOCHS_INV`, `LR_ADAM_INV`, `W_DATA = 10.0`
|
||||
- `MODELS_DIR`, `DATA_PATH`
|
||||
|
||||
**`inverse/data.py`**
|
||||
- `generate_measurements(noise_std, sensor_positions)`: call `fdm.solver.solve()` with true params from `config.py`, sample at nearest FDM nodes, add noise, save to `inverse/data/measurements.csv` (columns: `x, t, T`)
|
||||
- `load_measurements(device)`: load CSV → tensors `(x_s, t_s, T_meas)` on device
|
||||
|
||||
**`inverse/model.py`** — `InverseHeatPINN(nn.Module)`
|
||||
- Same 5-layer architecture as `HeatPINN`
|
||||
- Unknown params as log-space `nn.Parameter` (guarantees positivity without constraints):
|
||||
```python
|
||||
self.log_alpha = nn.Parameter(torch.log(torch.tensor(ALPHA_INIT)))
|
||||
self.log_k = nn.Parameter(torch.log(torch.tensor(K_INIT)))
|
||||
self.log_h_conv = nn.Parameter(torch.log(torch.tensor(H_CONV_INIT)))
|
||||
```
|
||||
- Properties `alpha`, `k`, `h_conv` that return `exp(log_*)`
|
||||
- `forward()` uses same output scaling as `HeatPINN` but with `self.k` and `self.alpha`
|
||||
- Never `.detach()` the learned params inside the loss — gradients must flow through them
|
||||
|
||||
**`inverse/loss.py`** — `inverse_heat_pinn_loss(..., x_s, t_s, T_meas)`
|
||||
- Same PDE/IC/BC structure as `heat_pinn_loss()` but uses `model.alpha`, `model.k`, `model.h_conv`
|
||||
- Normalization scales must be computed from the **current learned params** (not config constants), otherwise there is no gradient signal toward the physics params
|
||||
- Adds data fit term: `L_data = mean((T_pred(x_s, t_s) − T_meas)²) / T_char²`
|
||||
- Total: `w_pde·L_pde + w_ic·L_ic + w_bc·L_bc + w_data·L_data`
|
||||
|
||||
**`inverse/engine.py`**
|
||||
- `prepare_data_inverse()`: same clustering strategy as `engine.prepare_data()`
|
||||
- `train_inverse(data, measurements)`: **Adam only** (no L-BFGS — unstable when physics params are learnable because loss curvature differs by orders of magnitude between network weights and physics params); print identified param values every 100 epochs
|
||||
- `evaluate_inverse(model)`: print table of true vs identified params with relative error %; also compute L2 error of T field vs FDM
|
||||
|
||||
**`inverse/app.py`** — CLI menu: (1) Generate measurements, (2) Train, (3) Evaluate, (0) Exit
|
||||
|
||||
**`inverse/__init__.py`** — empty
|
||||
|
||||
### Pitfalls
|
||||
|
||||
- If `W_DATA` is too high, BC/IC are ignored and the net overfits measurements (physics collapses)
|
||||
- Sensors far from x=0 and x=L → poor identification of `H_CONV` (weak boundary signal)
|
||||
- Do not resample sensor points each epoch — `(x_s, t_s, T_meas)` are fixed throughout training
|
||||
|
||||
@@ -0,0 +1,553 @@
|
||||
# Heat Equation PINN
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||
|
||||
Soluzione dell'equazione del calore 1D con sorgente puntuale tramite **Physics-Informed Neural Network (PINN)**, validata contro un solver numerico **FDM** (Finite Difference Method).
|
||||
|
||||
---
|
||||
|
||||
## Indice
|
||||
|
||||
1. [Problema fisico](#1-problema-fisico)
|
||||
2. [Approccio: PINN vs FDM](#2-approccio-pinn-vs-fdm)
|
||||
3. [Struttura del progetto](#3-struttura-del-progetto)
|
||||
4. [Installazione](#4-installazione)
|
||||
5. [Utilizzo](#5-utilizzo)
|
||||
6. [Architettura della rete neurale](#6-architettura-della-rete-neurale)
|
||||
7. [Funzione di loss](#7-funzione-di-loss)
|
||||
8. [Training](#8-training)
|
||||
9. [Solver FDM di riferimento](#9-solver-fdm-di-riferimento)
|
||||
10. [Visualizzazioni](#10-visualizzazioni)
|
||||
11. [Parametri configurabili](#11-parametri-configurabili)
|
||||
12. [Test](#12-test)
|
||||
|
||||
---
|
||||
|
||||
## 1. Problema fisico
|
||||
|
||||
Il progetto risolve l'equazione del calore 1D con una sorgente di calore puntuale interna che si attiva a un istante fissato:
|
||||
|
||||
```
|
||||
∂T/∂t = α ∂²T/∂x² + (α/k) Q(t) δ(x − X_SRC)
|
||||
```
|
||||
|
||||
dove:
|
||||
- `T(x, t)` — temperatura [°C]
|
||||
- `α` — diffusività termica [m²/s]
|
||||
- `k` — conducibilità termica [W/m·K]
|
||||
- `Q(t) = Q_VAL` se `t ≥ T_STEP`, altrimenti `0` — flusso di calore [W/m²]
|
||||
- `δ(x − X_SRC)` — delta di Dirac nella posizione della sorgente
|
||||
- Dominio: `x ∈ [0, L]`, `t ∈ [0, T_END]`
|
||||
|
||||
### Condizioni al contorno (Robin / convezione)
|
||||
|
||||
Entrambe le estremità della barra scambiano calore per convezione con l'ambiente:
|
||||
|
||||
```
|
||||
x = 0: −k ∂T/∂x = h (T − T_AMB) → ∂T/∂x − (h/k)(T − T_AMB) = 0
|
||||
x = L: −k ∂T/∂x = h (T − T_AMB) → ∂T/∂x + (h/k)(T − T_AMB) = 0
|
||||
```
|
||||
|
||||
### Condizione iniziale
|
||||
|
||||
```
|
||||
T(x, 0) = T₀ (temperatura uniforme)
|
||||
```
|
||||
|
||||
### Parametri fisici di default
|
||||
|
||||
| Parametro | Valore | Unità | Significato |
|
||||
|--------------|---------|----------|------------------------------------------|
|
||||
| `ALPHA` | 0.01 | m²/s | Diffusività termica |
|
||||
| `K` | 1.0 | W/m·K | Conducibilità termica |
|
||||
| `L` | 1.0 | m | Lunghezza della barra |
|
||||
| `T0` | 20.0 | °C | Temperatura iniziale uniforme |
|
||||
| `X_SRC` | 0.35 | m | Posizione della sorgente di calore |
|
||||
| `Q_VAL` | 150.0 | W/m² | Intensità del flusso di calore |
|
||||
| `T_STEP` | 0.2 | s | Istante di attivazione della sorgente |
|
||||
| `H_CONV` | 10.0 | W/m²·K | Coefficiente convettivo alle estremità |
|
||||
| `T_AMB` | 20.0 | °C | Temperatura ambiente |
|
||||
| `T_END` | 10.0 | s | Fine della simulazione |
|
||||
|
||||
---
|
||||
|
||||
## 2. Approccio: PINN vs FDM
|
||||
|
||||
### PINN (Physics-Informed Neural Network)
|
||||
|
||||
Una PINN è una rete neurale che impara a soddisfare un'equazione differenziale alle derivate parziali **senza dati sperimentali**. L'addestramento avviene minimizzando una funzione di loss che penalizza le violazioni della PDE, delle condizioni iniziali e delle condizioni al contorno in un insieme di **punti di collocazione** distribuiti nel dominio.
|
||||
|
||||
Vantaggi:
|
||||
- Non richiede una griglia regolare
|
||||
- Può essere addestrata su domini irregolari
|
||||
- La soluzione è una funzione continua e differenziabile ovunque
|
||||
|
||||
In questo progetto la loss ha tre componenti:
|
||||
1. **Residuo PDE** — la rete deve soddisfare l'equazione del calore
|
||||
2. **Condizione iniziale** — la rete deve restituire `T₀` per `t = 0`
|
||||
3. **Condizioni al contorno** — la rete deve soddisfare le Robin BC a `x=0` e `x=L`
|
||||
|
||||
### FDM (Finite Difference Method)
|
||||
|
||||
Il solver FDM (`fdm/`) implementa lo schema **FTCS** (Forward-Time Centered-Space) esplicito su una griglia regolare `NX × NT`. Non usa reti neurali: è una soluzione numerica classica che serve come **riferimento ad alta fedeltà** per validare la PINN.
|
||||
|
||||
La valutazione della PINN consiste nel calcolare l'errore relativo L2 tra la predizione della rete e la soluzione FDM interpolata sulla stessa griglia `100 × 100`.
|
||||
|
||||
---
|
||||
|
||||
## 3. Struttura del progetto
|
||||
|
||||
```
|
||||
pinn/
|
||||
├── config.py ← tutti i parametri fisici e numerici (modificare qui)
|
||||
├── model.py ← HeatPINN (rete neurale) + heat_pinn_loss()
|
||||
├── engine.py ← campionamento dati, training, valutazione vs FDM
|
||||
├── visualizer.py ← grafici PINN vs FDM: heatmap, animazione, serie temporali
|
||||
├── app.py ← CLI interattiva per PINN
|
||||
├── requirements.txt
|
||||
├── pytest.ini
|
||||
├── clear.sh ← script di pulizia artefatti
|
||||
├── fdm/
|
||||
│ ├── solver.py ← schema FTCS esplicito con Robin BC e sorgente puntuale
|
||||
│ ├── visualizer.py ← grafici FDM standalone
|
||||
│ └── app.py ← CLI interattiva per FDM
|
||||
└── tests/
|
||||
├── conftest.py
|
||||
├── test_config.py
|
||||
├── test_model.py
|
||||
├── test_engine_data.py
|
||||
├── test_fdm_solver.py
|
||||
├── test_integration_pinn.py
|
||||
└── test_e2e.py
|
||||
```
|
||||
|
||||
| File | Responsabilità |
|
||||
|--------------------|-------------------------------------------------------------------------|
|
||||
| `config.py` | Unica fonte di verità per tutti i parametri |
|
||||
| `model.py` | Definizione della rete e calcolo della loss fisica |
|
||||
| `engine.py` | Pipeline completa: campionamento → training → valutazione |
|
||||
| `visualizer.py` | Plot interattivi HTML (PINN vs FDM) |
|
||||
| `app.py` | Menu CLI per l'utente |
|
||||
| `fdm/solver.py` | Solver numerico FTCS per la soluzione di riferimento |
|
||||
| `fdm/visualizer.py`| Plot interattivi HTML (FDM standalone) |
|
||||
| `fdm/app.py` | Menu CLI per il solver FDM |
|
||||
|
||||
---
|
||||
|
||||
## 4. Installazione
|
||||
|
||||
**Prerequisiti:** Python 3.10+, `pip`, `virtualenv` (o `venv`).
|
||||
|
||||
```bash
|
||||
git clone <repository-url>
|
||||
cd pinn
|
||||
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate # Linux / macOS
|
||||
# .venv\Scripts\activate # Windows
|
||||
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
Il progetto rileva automaticamente GPU/MPS/CPU all'avvio (vedi [engine.py — device detection](#12-dettagli-implementativi)).
|
||||
|
||||
---
|
||||
|
||||
## 5. Utilizzo
|
||||
|
||||
Attivare sempre il virtual environment prima di eseguire qualsiasi script:
|
||||
|
||||
```bash
|
||||
source .venv/bin/activate
|
||||
```
|
||||
|
||||
### PINN (`app.py`)
|
||||
|
||||
```bash
|
||||
python app.py
|
||||
```
|
||||
|
||||
```
|
||||
1. Addestra nuovo modello
|
||||
2. Valuta vs FDM (L2 error, max error)
|
||||
3. Visualizza (genera 3 file HTML)
|
||||
0. Esci
|
||||
```
|
||||
|
||||
- **Opzione 1** — avvia l'addestramento (Adam + L-BFGS). Chiede il numero di epoche; premi Invio per usare il default (5000). Il modello migliore viene salvato in `models/best_heat_pinn_model.pth`. Per riaddestrare da zero: `rm models/best_heat_pinn_model.pth`.
|
||||
- **Opzione 2** — carica il modello salvato, esegue il solver FDM, stampa l'errore relativo L2 e l'errore massimo assoluto.
|
||||
- **Opzione 3** — genera tre file HTML interattivi in `results/pinn/<timestamp>/`.
|
||||
|
||||
### FDM (`fdm/app.py`)
|
||||
|
||||
```bash
|
||||
python fdm/app.py
|
||||
```
|
||||
|
||||
```
|
||||
1. Risolvi (schema FTCS)
|
||||
2. Visualizza (genera 3 file HTML)
|
||||
0. Esci
|
||||
```
|
||||
|
||||
- **Opzione 1** — esegue il solver e stampa shape della matrice e range di temperatura.
|
||||
- **Opzione 2** — genera tre file HTML in `results/fdm/<timestamp>/`.
|
||||
|
||||
### Test
|
||||
|
||||
```bash
|
||||
pytest tests/ -v # tutti i test (42)
|
||||
pytest tests/test_model.py -v # rete e loss
|
||||
pytest tests/test_engine_data.py -v # campionamento dati
|
||||
pytest tests/test_fdm_solver.py -v # solver FDM
|
||||
pytest tests/test_integration_pinn.py -v # integrazione PINN
|
||||
pytest tests/test_e2e.py -v # workflow completo
|
||||
```
|
||||
|
||||
### Pulizia artefatti
|
||||
|
||||
```bash
|
||||
./clear.sh # menu interattivo per eliminare models/, results/ o entrambi
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Architettura della rete neurale
|
||||
|
||||
`HeatPINN` ([model.py](model.py)) è una rete fully connected a 5 layer:
|
||||
|
||||
```
|
||||
Input (x, t)
|
||||
│
|
||||
[Linear 2→128, Tanh]
|
||||
[Linear 128→128, Tanh] ×3
|
||||
[Linear 128→1]
|
||||
│
|
||||
Output: T
|
||||
```
|
||||
|
||||
La rete riceve le coordinate `(x, t)` e produce un unico scalare: la temperatura `T(x, t)`.
|
||||
|
||||
### Normalizzazione dell'input
|
||||
|
||||
Prima di entrare nella rete, le coordinate vengono normalizzate al range `[0, 1]`:
|
||||
|
||||
```python
|
||||
x_norm = x / L # x ∈ [0, 1]
|
||||
t_norm = t / T_END # t ∈ [0, 1]
|
||||
```
|
||||
|
||||
Questo migliora il condizionamento numerico dell'ottimizzazione.
|
||||
|
||||
### Output scaling
|
||||
|
||||
La rete interna `net` non predice direttamente la temperatura: predice una **perturbazione adimensionale**. La temperatura fisica viene ricostruita con:
|
||||
|
||||
```python
|
||||
T = T_AMB + (Q_VAL * L / K) * net(x_norm, t_norm)
|
||||
```
|
||||
|
||||
dove `T_char = Q_VAL * L / K ≈ 150 °C` è la scala caratteristica di temperatura del problema.
|
||||
|
||||
**Perché questo scaling?**
|
||||
- L'output della rete rimane nell'ordine di `[0, 1]`, rendendo il training più stabile.
|
||||
- I gradienti `∂T/∂x` risultanti sono `O(1)` — la rete può imparare la struttura spaziale senza problemi di scala.
|
||||
- Il termine di fondo `T_AMB` garantisce che la soluzione parta dalla condizione iniziale corretta anche con pesi random.
|
||||
|
||||
**Non rimuovere questo scaling**: senza di esso la rete deve imparare ordini di grandezza diversi tra le condizioni iniziali/al contorno e il gradiente interno, rendendo l'ottimizzazione molto più difficile.
|
||||
|
||||
---
|
||||
|
||||
## 7. Funzione di loss
|
||||
|
||||
`heat_pinn_loss()` ([model.py](model.py)) calcola quattro valori: `(total, L_pde, L_ic, L_bc)`.
|
||||
|
||||
```
|
||||
total = W_PDE * L_pde + W_IC * L_ic + W_BC * L_bc
|
||||
```
|
||||
|
||||
Ogni termine è **normalizzato automaticamente** da scale precompilate che dipendono solo dalle costanti in `config.py`. Cambiare `Q_VAL`, `K`, `H_CONV` o `L` ribilancia automaticamente la loss senza richiedere rituning manuale dei pesi.
|
||||
|
||||
### L_pde — Residuo PDE
|
||||
|
||||
Valutato su `N_F` punti di collocazione `(x_f, t_f)` distribuiti nel dominio:
|
||||
|
||||
```
|
||||
residuo = dT/dt − α d²T/dx² − source(x, t)
|
||||
```
|
||||
|
||||
Il termine sorgente usa un'approssimazione gaussiana al delta di Dirac (il delta non è differenziabile):
|
||||
|
||||
```
|
||||
source(x, t) = (α/k) · Q(t) · G(x)
|
||||
|
||||
G(x) = exp(−0.5 · ((x − X_SRC) / σ)²) / (σ √(2π)) σ = GAUSS_SIGMA = 0.01 m
|
||||
```
|
||||
|
||||
`Q(t)` è la funzione gradino: vale `Q_VAL` per `t ≥ T_STEP`, zero altrimenti.
|
||||
|
||||
I gradienti `dT/dt`, `dT/dx`, `d²T/dx²` sono calcolati con `torch.autograd.grad` — **autodifferenziazione esatta**, non differenze finite.
|
||||
|
||||
Normalizzazione: `_pde_scale = max((T_char/T_END)², src_peak²)` dove `src_peak` è il picco gaussiano.
|
||||
|
||||
### L_ic — Condizione iniziale
|
||||
|
||||
Valutato su `N_IC` punti `(x_ic, 0)`:
|
||||
|
||||
```
|
||||
L_ic = mean( (T(x, 0) − T₀)² ) / T_char²
|
||||
```
|
||||
|
||||
### L_bc — Condizioni al contorno Robin
|
||||
|
||||
Valutato su `N_BC` istanti temporali `t_bc`, applicato a entrambe le estremità:
|
||||
|
||||
```
|
||||
x = 0: ∂T/∂x(0, t) − (h/k)(T(0,t) − T_AMB) = 0
|
||||
x = L: ∂T/∂x(L, t) + (h/k)(T(L,t) − T_AMB) = 0
|
||||
```
|
||||
|
||||
Normalizzazione: `_bc_scale = max(Q_VAL/K, H_CONV·T_char/K)²`
|
||||
|
||||
> **Nota sul segno:** la BC a sinistra (x=0) ha segno negativo davanti al termine convettivo perché il flusso uscente è orientato verso `−x`; a destra (x=L) il segno è positivo perché il flusso uscente è orientato verso `+x`.
|
||||
|
||||
---
|
||||
|
||||
## 8. Training
|
||||
|
||||
Il training è implementato in `train_model()` ([engine.py](engine.py)) e procede in due fasi.
|
||||
|
||||
### Fase 1: Adam
|
||||
|
||||
```
|
||||
Ottimizzatore: Adam, LR = 1e-3
|
||||
Scheduler: ReduceLROnPlateau (factor=0.5, patience=150, min_lr=1e-6)
|
||||
Early stopping: se la loss non migliora di > 1e-7 per 500 epoche consecutive
|
||||
```
|
||||
|
||||
Il modello con la loss più bassa viene salvato a ogni miglioramento in `models/best_heat_pinn_model.pth`.
|
||||
|
||||
### Fase 2: L-BFGS (fine-tuning)
|
||||
|
||||
Al termine dell'Adam, viene caricato il miglior modello e affinato con L-BFGS:
|
||||
|
||||
```
|
||||
Ottimizzatore: L-BFGS, LR=0.1, max_iter=50, history_size=50, strong Wolfe
|
||||
Steps: 200
|
||||
```
|
||||
|
||||
L-BFGS è un ottimizzatore di secondo ordine (quasi-Newton) particolarmente efficace nella fase finale del training PINN perché sfrutta la curvatura della loss per convergere a minimi più precisi di quanto Adam riesca a fare.
|
||||
|
||||
**Meccanismo closure:** L-BFGS richiede di poter rivalutare la loss più volte per iterazione. La funzione `closure()` cattura i componenti della loss in un dizionario `_last` per poterli stampare senza ricalcolare il grafo computazionale fuori dal contesto di `backward()`.
|
||||
|
||||
### Campionamento dei punti di collocazione
|
||||
|
||||
`prepare_data()` genera i punti di collocazione con **clustering deliberato** nelle zone fisicamente più complesse:
|
||||
|
||||
| Zona | Proporzione | Motivazione |
|
||||
|-----------------------------|-------------|------------------------------------------------|
|
||||
| Uniforme `[0,L] × [0,T_END]`| 50% | Copertura generale del dominio |
|
||||
| Intorno a `X_SRC ± 5% L` | 25% | Gradiente ripido in prossimità della sorgente |
|
||||
| Intorno a `T_STEP ± 0.1 s` | 25% | Discontinuità temporale all'attivazione |
|
||||
|
||||
Il clustering aumenta la densità di punti dove la fisica è più difficile da apprendere, senza aumentare il costo computazionale totale.
|
||||
|
||||
---
|
||||
|
||||
## 9. Solver FDM di riferimento
|
||||
|
||||
`fdm/solver.py` implementa lo schema **FTCS** (Forward-Time Centered-Space) esplicito.
|
||||
|
||||
### Schema di avanzamento temporale
|
||||
|
||||
```
|
||||
T[i, n+1] = T[i, n] + r · (T[i+1,n] − 2·T[i,n] + T[i−1,n])
|
||||
|
||||
r = α·dt/dx² (numero di Courant-Friedrichs-Lewy)
|
||||
```
|
||||
|
||||
### Stabilità CFL
|
||||
|
||||
Condizione necessaria per la stabilità dello schema esplicito:
|
||||
|
||||
```
|
||||
r = α·dt/dx² ≤ 0.5
|
||||
```
|
||||
|
||||
Se la condizione è violata, il solver stampa un avvertimento ma non si blocca. Con i parametri di default (`NX=250`, `NT=15000`) la condizione è soddisfatta. Se si riducono `NX` o `NT`, verificare che `r ≤ 0.5`.
|
||||
|
||||
### Condizioni al contorno Robin
|
||||
|
||||
Le BC sono applicate a ogni passo temporale usando uno schema centrato:
|
||||
|
||||
```
|
||||
T[0] = (T[1] + robin_coeff · T_AMB) / (1 + robin_coeff) # x = 0
|
||||
T[-1] = (T[-2] + robin_coeff · T_AMB) / (1 + robin_coeff) # x = L
|
||||
|
||||
robin_coeff = dx · h / k
|
||||
```
|
||||
|
||||
### Iniezione della sorgente puntuale
|
||||
|
||||
Dopo l'applicazione delle BC, la sorgente viene iniettata al nodo più vicino a `X_SRC`:
|
||||
|
||||
```
|
||||
T[i_src] += Q(t) · α · dt / (k · dx)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 10. Visualizzazioni
|
||||
|
||||
Tutti i plot sono **HTML interattivi** generati con Plotly (zoom, hover, slider, play/pause).
|
||||
|
||||
### PINN vs FDM (`visualizer.py`)
|
||||
|
||||
Generati con `python app.py → opzione 3`, salvati in `results/pinn/<timestamp>/`:
|
||||
|
||||
| File | Contenuto |
|
||||
|-------------------|------------------------------------------------------------------------|
|
||||
| `heatmap.html` | Heatmap 2D affiancate PINN vs FDM, stessa scala colori |
|
||||
| `animation.html` | Profilo T(x) animato nel tempo: PINN (blu continuo) vs FDM (rosso tratteggiato) |
|
||||
| `comparison.html` | Serie temporali T(t) nei punti fissi `x=0`, `x=L/2`, `x=L`; linea verticale a `t=T_STEP` |
|
||||
|
||||
### FDM standalone (`fdm/visualizer.py`)
|
||||
|
||||
Generati con `python fdm/app.py → opzione 2`, salvati in `results/fdm/<timestamp>/`:
|
||||
|
||||
| File | Contenuto |
|
||||
|-------------------|------------------------------------------------------------------------|
|
||||
| `heatmap.html` | Heatmap 2D T(x,t) + striscia animata del profilo di temperatura |
|
||||
| `animation.html` | Profilo T(x) animato nel tempo |
|
||||
| `time_series.html`| Serie temporali in 5 punti fissi: `0`, `0.25L`, `0.5L`, `0.75L`, `L` |
|
||||
|
||||
---
|
||||
|
||||
## 11. Parametri configurabili
|
||||
|
||||
Tutti i parametri si trovano in `config.py`. Modificare solo questo file per cambiare il problema o il training.
|
||||
|
||||
### Fisica
|
||||
|
||||
| Parametro | Default | Unità | Descrizione |
|
||||
|--------------|---------|--------|------------------------------------------|
|
||||
| `ALPHA` | 0.01 | m²/s | Diffusività termica |
|
||||
| `K` | 1.0 | W/m·K | Conducibilità termica |
|
||||
| `L` | 1.0 | m | Lunghezza della barra |
|
||||
| `T0` | 20.0 | °C | Temperatura iniziale uniforme |
|
||||
| `X_SRC` | 0.35 | m | Posizione della sorgente di calore |
|
||||
| `Q_VAL` | 150.0 | W/m² | Intensità del flusso di calore |
|
||||
| `T_STEP` | 0.2 | s | Istante di attivazione della sorgente |
|
||||
| `H_CONV` | 10.0 | W/m²·K | Coefficiente convettivo alle estremità |
|
||||
| `T_AMB` | 20.0 | °C | Temperatura ambiente |
|
||||
| `T_END` | 10.0 | s | Fine della simulazione |
|
||||
|
||||
### Griglia FDM
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|--------------|---------|---------------------------------------------------------|
|
||||
| `NX` | 250 | Nodi spaziali (aumentare per maggiore risoluzione) |
|
||||
| `NT` | 15000 | Passi temporali (verificare `r = α·dt/dx² ≤ 0.5`) |
|
||||
| `GAUSS_SIGMA`| 0.01 | Larghezza del picco gaussiano nella loss PINN [m] |
|
||||
|
||||
### Architettura PINN
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|-------------------|---------|----------------------------------------------|
|
||||
| `HIDDEN_SIZE` | 128 | Neuroni per layer nascosto |
|
||||
| `N_HIDDEN_LAYERS` | 4 | Numero di layer nascosti (totale: 5 layer) |
|
||||
|
||||
### Campionamento
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|-----------|---------|--------------------------------------------------------------------|
|
||||
| `N_F` | 6000 | Punti PDE (+ 50% clustering automatico vicino a X_SRC e T_STEP) |
|
||||
| `N_IC` | 400 | Punti condizione iniziale |
|
||||
| `N_BC` | 400 | Punti condizioni al contorno |
|
||||
|
||||
### Training Adam
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|-----------------|---------|-----------------------------------------------------|
|
||||
| `EPOCHS` | 5000 | Epoche massime |
|
||||
| `PATIENCE` | 500 | Early stopping: epoche senza miglioramento |
|
||||
| `LR_ADAM` | 1e-3 | Learning rate iniziale |
|
||||
| `SCHED_FACTOR` | 0.5 | Fattore di riduzione LR (ReduceLROnPlateau) |
|
||||
| `SCHED_PATIENCE`| 150 | Patience per la riduzione LR |
|
||||
| `SCHED_MIN_LR` | 1e-6 | Learning rate minimo |
|
||||
|
||||
### Fine-tuning L-BFGS
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|---------------|---------|--------------------------|
|
||||
| `LR_LBFGS` | 0.1 | Learning rate L-BFGS |
|
||||
| `LBFGS_STEPS` | 200 | Numero di step L-BFGS |
|
||||
|
||||
### Pesi della loss
|
||||
|
||||
| Parametro | Default | Descrizione |
|
||||
|-----------|---------|--------------------------------|
|
||||
| `W_PDE` | 10.0 | Peso residuo PDE |
|
||||
| `W_IC` | 1.0 | Peso condizione iniziale |
|
||||
| `W_BC` | 5.0 | Peso condizioni al contorno |
|
||||
|
||||
> **Se la loss diverge:** verificare che `T_char = Q_VAL * L / K` non sia vicino a zero. Questo valore è la scala caratteristica di temperatura usata per normalizzare tutti i termini.
|
||||
|
||||
---
|
||||
|
||||
## 12. Test
|
||||
|
||||
La test suite è in `tests/` e conta **42 test** organizzati in tre livelli:
|
||||
|
||||
| File | Tipo | Cosa testa |
|
||||
|------------------------------|-------------|---------------------------------------------------|
|
||||
| `test_config.py` | Unit | Validità e coerenza dei parametri in `config.py` |
|
||||
| `test_model.py` | Unit | Shape output, finitezza, loss components |
|
||||
| `test_engine_data.py` | Unit | Campionamento e clustering dei punti |
|
||||
| `test_fdm_solver.py` | Unit | Griglia, CFL, shape output del solver FDM |
|
||||
| `test_integration_pinn.py` | Integration | Caricamento modello, griglia predizione, pipeline loss |
|
||||
| `test_e2e.py` | End-to-end | Workflow completo: train → evaluate → visualize |
|
||||
|
||||
```bash
|
||||
pytest tests/ -v # run tutti i test
|
||||
pytest tests/ -k "model" # solo test con "model" nel nome
|
||||
pytest tests/ --tb=short # traceback breve in caso di fallimento
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Dettagli implementativi
|
||||
|
||||
### Normalizzazione automatica della loss
|
||||
|
||||
Le scale sono precompilate una sola volta a import time in `model.py`:
|
||||
|
||||
```python
|
||||
_T_char = Q_VAL * L / K # ~150 °C
|
||||
_src_peak = ALPHA * Q_VAL / (K * GAUSS_SIGMA * sqrt(2π))
|
||||
_pde_scale = max((_T_char / T_END)², _src_peak²) + 1e-8
|
||||
_bc_scale = max(Q_VAL / K, H_CONV * _T_char / K) ** 2
|
||||
```
|
||||
|
||||
Dividere ogni termine per la sua scala porta tutti i contributi a `O(1)`, rendendo i pesi `W_PDE`, `W_IC`, `W_BC` interpretabili come importanza relativa piuttosto che come fattori di scala assoluti.
|
||||
|
||||
### Rilevamento device
|
||||
|
||||
`engine.py` seleziona automaticamente il device più performante disponibile:
|
||||
|
||||
```python
|
||||
CUDA → MPS (Apple Silicon) → CPU
|
||||
```
|
||||
|
||||
Include un test di funzionamento effettivo della GPU prima di usarla, per evitare fallimenti silenziosi su driver incompleti.
|
||||
|
||||
### Closure L-BFGS
|
||||
|
||||
L-BFGS richiede una funzione `closure()` che esegue `zero_grad`, forward pass, `backward`, e restituisce la loss. I componenti della loss vengono catturati in un dizionario `_last` per permettere il logging a ogni step senza ricalcolare il grafo fuori dal contesto `backward`.
|
||||
|
||||
### Subsampling delle animazioni FDM
|
||||
|
||||
Se `NT > 200`, il visualizer FDM campiona ogni `n`-esimo frame (`n = NT // 200`) per mantenere le animazioni HTML leggere e fluide.
|
||||
@@ -1,72 +1,96 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
RESULTS_DIR="$(dirname "$0")/results/fdm"
|
||||
BASE_DIR="$(dirname "$0")"
|
||||
|
||||
if [[ ! -d "$RESULTS_DIR" ]]; then
|
||||
echo "Nessuna cartella results/fdm trovata."
|
||||
exit 0
|
||||
fi
|
||||
cleanup_dir() {
|
||||
local LABEL="$1"
|
||||
local DIR="$2"
|
||||
|
||||
mapfile -t RUNS < <(find "$RESULTS_DIR" -mindepth 1 -maxdepth 1 -type d | sort)
|
||||
if [[ ! -d "$DIR" ]]; then
|
||||
echo "Nessuna cartella $DIR trovata."
|
||||
return
|
||||
fi
|
||||
|
||||
if [[ ${#RUNS[@]} -eq 0 ]]; then
|
||||
echo "Nessun risultato da cancellare."
|
||||
exit 0
|
||||
fi
|
||||
mapfile -t RUNS < <(find "$DIR" -mindepth 1 -maxdepth 1 -type d | sort)
|
||||
|
||||
echo "Risultati disponibili:"
|
||||
for i in "${!RUNS[@]}"; do
|
||||
printf " %2d. %s\n" "$((i+1))" "$(basename "${RUNS[$i]}")"
|
||||
done
|
||||
echo ""
|
||||
echo "a. Cancella tutti"
|
||||
echo "s. Selezione manuale"
|
||||
echo "0. Annulla"
|
||||
echo ""
|
||||
read -rp "Scelta: " MODE
|
||||
if [[ ${#RUNS[@]} -eq 0 ]]; then
|
||||
echo "Nessun risultato $LABEL da cancellare."
|
||||
return
|
||||
fi
|
||||
|
||||
case "$MODE" in
|
||||
a|A)
|
||||
read -rp "Confermi la cancellazione di ${#RUNS[@]} cartelle? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${RUNS[@]}"
|
||||
echo "Cancellati ${#RUNS[@]} risultati."
|
||||
else
|
||||
echo "Annullato."
|
||||
fi
|
||||
;;
|
||||
s|S)
|
||||
read -rp "Numeri da cancellare (es. 1 3 5): " -a CHOICES
|
||||
TO_DELETE=()
|
||||
for N in "${CHOICES[@]}"; do
|
||||
if [[ "$N" =~ ^[0-9]+$ ]] && (( N >= 1 && N <= ${#RUNS[@]} )); then
|
||||
TO_DELETE+=("${RUNS[$((N-1))]}")
|
||||
echo ""
|
||||
echo "Risultati $LABEL disponibili:"
|
||||
for i in "${!RUNS[@]}"; do
|
||||
printf " %2d. %s\n" "$((i+1))" "$(basename "${RUNS[$i]}")"
|
||||
done
|
||||
echo ""
|
||||
echo "a. Cancella tutti"
|
||||
echo "s. Selezione manuale"
|
||||
echo "0. Annulla"
|
||||
echo ""
|
||||
read -rp "Scelta [$LABEL]: " MODE
|
||||
|
||||
case "$MODE" in
|
||||
a|A)
|
||||
read -rp "Confermi la cancellazione di ${#RUNS[@]} cartelle $LABEL? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${RUNS[@]}"
|
||||
echo "Cancellati ${#RUNS[@]} risultati $LABEL."
|
||||
else
|
||||
echo "Indice non valido: $N (ignorato)"
|
||||
echo "Annullato."
|
||||
fi
|
||||
done
|
||||
if [[ ${#TO_DELETE[@]} -eq 0 ]]; then
|
||||
echo "Nessuna selezione valida."
|
||||
exit 0
|
||||
fi
|
||||
echo "Verranno cancellati:"
|
||||
for D in "${TO_DELETE[@]}"; do
|
||||
echo " - $(basename "$D")"
|
||||
done
|
||||
read -rp "Confermi? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${TO_DELETE[@]}"
|
||||
echo "Cancellati ${#TO_DELETE[@]} risultati."
|
||||
else
|
||||
;;
|
||||
s|S)
|
||||
read -rp "Numeri da cancellare (es. 1 3 5): " -a CHOICES
|
||||
TO_DELETE=()
|
||||
for N in "${CHOICES[@]}"; do
|
||||
if [[ "$N" =~ ^[0-9]+$ ]] && (( N >= 1 && N <= ${#RUNS[@]} )); then
|
||||
TO_DELETE+=("${RUNS[$((N-1))]}")
|
||||
else
|
||||
echo "Indice non valido: $N (ignorato)"
|
||||
fi
|
||||
done
|
||||
if [[ ${#TO_DELETE[@]} -eq 0 ]]; then
|
||||
echo "Nessuna selezione valida."
|
||||
return
|
||||
fi
|
||||
echo "Verranno cancellati:"
|
||||
for D in "${TO_DELETE[@]}"; do
|
||||
echo " - $(basename "$D")"
|
||||
done
|
||||
read -rp "Confermi? [s/N] " CONFIRM
|
||||
if [[ "${CONFIRM,,}" == "s" ]]; then
|
||||
rm -rf "${TO_DELETE[@]}"
|
||||
echo "Cancellati ${#TO_DELETE[@]} risultati $LABEL."
|
||||
else
|
||||
echo "Annullato."
|
||||
fi
|
||||
;;
|
||||
0|"")
|
||||
echo "Annullato."
|
||||
fi
|
||||
;;
|
||||
0|"")
|
||||
echo "Annullato."
|
||||
;;
|
||||
*)
|
||||
echo "Scelta non valida."
|
||||
exit 1
|
||||
;;
|
||||
*)
|
||||
echo "Scelta non valida."
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
echo "Cosa vuoi ripulire?"
|
||||
echo " 1. Risultati FDM (results/fdm/)"
|
||||
echo " 2. Risultati PINN (results/pinn/)"
|
||||
echo " 3. Entrambi"
|
||||
echo " 0. Annulla"
|
||||
echo ""
|
||||
read -rp "Scelta: " CHOICE
|
||||
|
||||
case "$CHOICE" in
|
||||
1) cleanup_dir "FDM" "$BASE_DIR/results/fdm" ;;
|
||||
2) cleanup_dir "PINN" "$BASE_DIR/results/pinn" ;;
|
||||
3)
|
||||
cleanup_dir "FDM" "$BASE_DIR/results/fdm"
|
||||
cleanup_dir "PINN" "$BASE_DIR/results/pinn"
|
||||
;;
|
||||
0|"") echo "Annullato." ;;
|
||||
*) echo "Scelta non valida." ; exit 1 ;;
|
||||
esac
|
||||
|
||||
@@ -19,3 +19,32 @@ T_END = 10.0 # fine simulazione [s]
|
||||
# Griglia FDM
|
||||
NX = 250 # nodi spaziali
|
||||
NT = 15000 # passi temporali (verifica CFL automatica)
|
||||
|
||||
# Sorgente gaussiana (approssimazione continua del delta di Dirac)
|
||||
GAUSS_SIGMA = 0.01 # larghezza del picco gaussiano [m]
|
||||
|
||||
# Architettura PINN
|
||||
HIDDEN_SIZE = 128 # neuroni per layer nascosto
|
||||
N_HIDDEN_LAYERS = 4 # numero di layer nascosti
|
||||
|
||||
# Sampling punti di collocazione
|
||||
N_F = 6000 # punti PDE (+ 50% clustering automatico vicino a X_SRC e T_STEP)
|
||||
N_IC = 400 # punti condizione iniziale
|
||||
N_BC = 400 # punti condizioni al contorno
|
||||
|
||||
# Training Adam
|
||||
EPOCHS = 5000 # epoche massime
|
||||
PATIENCE = 500 # early stopping
|
||||
LR_ADAM = 1e-3 # learning rate iniziale
|
||||
SCHED_FACTOR = 0.5 # ReduceLROnPlateau: fattore di riduzione
|
||||
SCHED_PATIENCE = 150 # ReduceLROnPlateau: patience
|
||||
SCHED_MIN_LR = 1e-6 # ReduceLROnPlateau: lr minimo
|
||||
|
||||
# Fine-tuning L-BFGS
|
||||
LR_LBFGS = 0.1 # learning rate L-BFGS
|
||||
LBFGS_STEPS = 200 # numero di step L-BFGS
|
||||
|
||||
# Pesi della loss
|
||||
W_PDE = 10.0 # peso residuo PDE
|
||||
W_IC = 1.0 # peso condizione iniziale
|
||||
W_BC = 5.0 # peso condizioni al contorno
|
||||
|
||||
@@ -35,7 +35,10 @@ def _get_device():
|
||||
return torch.device('cpu')
|
||||
|
||||
|
||||
def prepare_data(N_f=4000, N_ic=400, N_bc=400):
|
||||
def prepare_data(N_f=None, N_ic=None, N_bc=None):
|
||||
if N_f is None: N_f = config.N_F
|
||||
if N_ic is None: N_ic = config.N_IC
|
||||
if N_bc is None: N_bc = config.N_BC
|
||||
set_seed(42)
|
||||
device = _get_device()
|
||||
|
||||
@@ -61,19 +64,24 @@ def prepare_data(N_f=4000, N_ic=400, N_bc=400):
|
||||
return {'device': device, 'x_f': x_f, 't_f': t_f, 'x_ic': x_ic, 't_bc': t_bc}
|
||||
|
||||
|
||||
def train_model(data, epochs=5000, patience=100):
|
||||
def train_model(data, epochs=None, patience=None):
|
||||
if epochs is None: epochs = config.EPOCHS
|
||||
if patience is None: patience = config.PATIENCE
|
||||
device = data['device']
|
||||
model = HeatPINN().to(device)
|
||||
optimizer = optim.Adam(model.parameters(), lr=1e-3)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=30, min_lr=1e-6)
|
||||
optimizer = optim.Adam(model.parameters(), lr=config.LR_ADAM)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min',
|
||||
factor=config.SCHED_FACTOR,
|
||||
patience=config.SCHED_PATIENCE,
|
||||
min_lr=config.SCHED_MIN_LR)
|
||||
|
||||
os.makedirs(MODELS_DIR, exist_ok=True)
|
||||
best_loss = float('inf')
|
||||
patience_counter = 0
|
||||
|
||||
print(f"\n--- Heat PINN Training (Adam) on {device} ---")
|
||||
model.train()
|
||||
for epoch in range(epochs):
|
||||
model.train()
|
||||
optimizer.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
@@ -101,33 +109,33 @@ def train_model(data, epochs=5000, patience=100):
|
||||
|
||||
# L-BFGS fine-tuning phase (standard PINN practice for convergence to better minima)
|
||||
print("\n--- L-BFGS fine-tuning ---")
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
|
||||
lbfgs = optim.LBFGS(model.parameters(), lr=0.1, max_iter=50,
|
||||
lbfgs = optim.LBFGS(model.parameters(), lr=config.LR_LBFGS, max_iter=50,
|
||||
history_size=50, tolerance_grad=1e-7, line_search_fn='strong_wolfe')
|
||||
|
||||
_last = {}
|
||||
|
||||
for step in range(20):
|
||||
def closure():
|
||||
lbfgs.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
loss.backward()
|
||||
_last['loss'] = loss.item()
|
||||
_last['pde'] = L_pde.item()
|
||||
_last['ic'] = L_ic.item()
|
||||
_last['bc'] = L_bc.item()
|
||||
return loss
|
||||
def closure():
|
||||
lbfgs.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
loss.backward()
|
||||
_last['loss'] = loss.item()
|
||||
_last['pde'] = L_pde.item()
|
||||
_last['ic'] = L_ic.item()
|
||||
_last['bc'] = L_bc.item()
|
||||
return loss
|
||||
|
||||
for step in range(config.LBFGS_STEPS):
|
||||
lbfgs.step(closure)
|
||||
if _last['loss'] < best_loss:
|
||||
best_loss = _last['loss']
|
||||
torch.save({'state_dict': model.state_dict()}, MODEL_SAVE_PATH)
|
||||
if (step + 1) % 5 == 0:
|
||||
print(f"L-BFGS step {step+1}/20 | Loss: {_last['loss']:.6f} "
|
||||
print(f"L-BFGS step {step+1}/{config.LBFGS_STEPS} | Loss: {_last['loss']:.6f} "
|
||||
f"| PDE: {_last['pde']:.6f} | IC: {_last['ic']:.6f} | BC: {_last['bc']:.6f}")
|
||||
|
||||
print("Training complete! Model saved.")
|
||||
@@ -137,7 +145,7 @@ def _load_model(device):
|
||||
if not os.path.exists(MODEL_SAVE_PATH):
|
||||
print("Error: model not found. Train the model first.")
|
||||
return None
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device)
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
model = HeatPINN().to(device)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
model.eval()
|
||||
|
||||
+9
-2
@@ -3,7 +3,7 @@ import os
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from fdm.solver import solve
|
||||
from fdm.solver import solve, save_csv
|
||||
from fdm.visualizer import visualize_fdm
|
||||
|
||||
|
||||
@@ -26,10 +26,11 @@ def main_menu():
|
||||
print("-" * 30)
|
||||
print("1. Risolvi")
|
||||
print("2. Visualizza")
|
||||
print("3. Salva CSV")
|
||||
print("0. Esci")
|
||||
print("-" * 30)
|
||||
|
||||
choice = input("Select an option (0-2): ").strip()
|
||||
choice = input("Select an option (0-3): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
T, x_vals, t_vals = solve()
|
||||
@@ -42,6 +43,12 @@ def main_menu():
|
||||
else:
|
||||
visualize_fdm(T, x_vals, t_vals)
|
||||
|
||||
elif choice == "3":
|
||||
if T is None:
|
||||
print("Eseguire prima l'opzione 1.")
|
||||
else:
|
||||
save_csv(T, x_vals, t_vals, "results/fdm/fdm_solution.csv")
|
||||
|
||||
elif choice == "0":
|
||||
print("Uscita.")
|
||||
sys.exit(0)
|
||||
|
||||
@@ -14,6 +14,7 @@ Returns T_matrix of shape (NX, NT).
|
||||
import sys
|
||||
import os
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import config
|
||||
@@ -85,3 +86,16 @@ def solve():
|
||||
T_matrix[:, n + 1] = T_cur
|
||||
|
||||
return T_matrix, x_vals, t_vals
|
||||
|
||||
|
||||
def save_csv(T_matrix, x_vals, t_vals, path: str) -> None:
|
||||
"""Save FDM solution to CSV with columns: x, t, T."""
|
||||
xs, ts = np.meshgrid(x_vals, t_vals, indexing="ij")
|
||||
df = pd.DataFrame({
|
||||
"x": xs.ravel(),
|
||||
"t": ts.ravel(),
|
||||
"T": T_matrix.ravel(),
|
||||
})
|
||||
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
|
||||
df.to_csv(path, index=False)
|
||||
print(f"Salvato: {path} ({len(df)} righe)")
|
||||
|
||||
+117
@@ -0,0 +1,117 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
def print_header():
|
||||
print("=" * 42)
|
||||
print(" Inverse Heat PINN")
|
||||
print(" Identifica: α, k, h_conv da misure")
|
||||
print("=" * 42)
|
||||
|
||||
|
||||
_ALL_PARAMS = ('alpha', 'k', 'h_conv')
|
||||
|
||||
|
||||
def _ask_identify():
|
||||
print(f"\nParametri disponibili: {', '.join(_ALL_PARAMS)}")
|
||||
raw = input("Quali identificare? (invio = tutti, oppure es: alpha k): ").strip()
|
||||
chosen = [p for p in (raw.split() if raw else _ALL_PARAMS) if p in _ALL_PARAMS]
|
||||
invalid = [p for p in raw.split() if p not in _ALL_PARAMS] if raw else []
|
||||
if invalid:
|
||||
print(f" Ignorati (non validi): {invalid}")
|
||||
if not chosen:
|
||||
print(" Nessun parametro valido — uso tutti.")
|
||||
chosen = list(_ALL_PARAMS)
|
||||
print(f" Identifico: {chosen}")
|
||||
|
||||
import config as _cfg
|
||||
_true = {'alpha': _cfg.ALPHA, 'k': _cfg.K, 'h_conv': _cfg.H_CONV}
|
||||
|
||||
init_vals = {}
|
||||
for p in chosen:
|
||||
true_val = _true[p]
|
||||
raw_v = input(f" Valore iniziale per {p} (vero: {true_val}): ").strip()
|
||||
try:
|
||||
init_vals[p] = float(raw_v) if raw_v else true_val
|
||||
except ValueError:
|
||||
init_vals[p] = true_val
|
||||
print(f" → {p} inizia da {init_vals[p]}")
|
||||
|
||||
from inverse.config_inverse import W_PDE, W_IC, W_BC, W_DATA
|
||||
print(f"\nPesi loss (default — PDE:{W_PDE} IC:{W_IC} BC:{W_BC} Data:{W_DATA})")
|
||||
weights = {}
|
||||
for wname, wdefault in [('w_pde', W_PDE), ('w_ic', W_IC), ('w_bc', W_BC), ('w_data', W_DATA)]:
|
||||
raw_w = input(f" {wname} (default {wdefault}): ").strip()
|
||||
try:
|
||||
weights[wname] = float(raw_w) if raw_w else wdefault
|
||||
except ValueError:
|
||||
weights[wname] = wdefault
|
||||
|
||||
raw_ep = input("\nNumero di epoch (default 10000): ").strip()
|
||||
try:
|
||||
epochs = int(raw_ep) if raw_ep else 10000
|
||||
except ValueError:
|
||||
epochs = 10000
|
||||
|
||||
return chosen, init_vals, weights, epochs
|
||||
|
||||
|
||||
def main():
|
||||
print_header()
|
||||
|
||||
from inverse.engine import prepare_data_inverse, _get_device
|
||||
print("\nGenerazione punti di collocazione...")
|
||||
data = prepare_data_inverse()
|
||||
print(f"Pronti — device: {data['device']}\n")
|
||||
|
||||
x_s = None
|
||||
t_s = None
|
||||
T_meas = None
|
||||
|
||||
while True:
|
||||
print("\n" + "-" * 34)
|
||||
print(" MAIN MENU")
|
||||
print("-" * 34)
|
||||
print("1. Carica misure")
|
||||
print("2. Addestra (identifica α, k, h)")
|
||||
print("3. Valuta risultati")
|
||||
print("4. Visualizza")
|
||||
print("0. Esci")
|
||||
print("-" * 34)
|
||||
|
||||
choice = input("Scelta (0-4): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
from inverse.data import load_measurements
|
||||
x_s, t_s, T_meas = load_measurements(data['device'])
|
||||
|
||||
elif choice == "2":
|
||||
if x_s is None:
|
||||
print("Caricare prima le misure (opzione 1).")
|
||||
continue
|
||||
identify, init_vals, weights, epochs = _ask_identify()
|
||||
from inverse.engine import train_inverse
|
||||
try:
|
||||
train_inverse(data, x_s, t_s, T_meas, identify=identify, init_vals=init_vals, epochs=epochs, **weights)
|
||||
except KeyboardInterrupt:
|
||||
print("\nTraining interrotto. Il miglior modello trovato è stato salvato.")
|
||||
|
||||
elif choice == "3":
|
||||
from inverse.engine import evaluate_inverse
|
||||
evaluate_inverse()
|
||||
|
||||
elif choice == "4":
|
||||
from inverse.engine import generate_visualization_inverse
|
||||
generate_visualization_inverse()
|
||||
|
||||
elif choice == "0":
|
||||
print("Uscita.")
|
||||
sys.exit(0)
|
||||
|
||||
else:
|
||||
print("Scelta non valida.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,43 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import config as _cfg
|
||||
|
||||
# Dati di misura
|
||||
MEASUREMENTS_PATH = "results/measurements.csv"
|
||||
|
||||
# Parametri da identificare (valori veri in config.py)
|
||||
IDENTIFY = ['alpha', 'k', 'h_conv']
|
||||
|
||||
# Stime iniziali — deliberatamente lontane dai valori veri
|
||||
ALPHA_INIT = _cfg.ALPHA * 3.0 # vero: 0.01 → inizia a 0.03
|
||||
K_INIT = _cfg.K * 0.4 # vero: 1.0 → inizia a 0.4
|
||||
H_CONV_INIT = _cfg.H_CONV * 2.5 # vero: 10.0 → inizia a 25.0
|
||||
|
||||
# Training
|
||||
EPOCHS_INV = 10000
|
||||
LR_ADAM_INV = 1e-3
|
||||
PATIENCE_INV = 800
|
||||
|
||||
SCHED_FACTOR = 0.5
|
||||
SCHED_PATIENCE = 200
|
||||
SCHED_MIN_LR = 1e-6
|
||||
|
||||
# Pesi loss
|
||||
W_PDE = 1.0
|
||||
W_IC = 1.0
|
||||
W_BC = 5.0
|
||||
W_DATA = 10.0
|
||||
|
||||
# Architettura (stessa del forward PINN)
|
||||
HIDDEN_SIZE = _cfg.HIDDEN_SIZE
|
||||
N_HIDDEN_LAYERS = _cfg.N_HIDDEN_LAYERS
|
||||
|
||||
# Collocation
|
||||
N_F = _cfg.N_F
|
||||
N_IC = _cfg.N_IC
|
||||
N_BC = _cfg.N_BC
|
||||
|
||||
# Output
|
||||
MODELS_DIR = "models"
|
||||
MODEL_SAVE_PATH = "models/best_inverse_pinn.pth"
|
||||
@@ -0,0 +1,25 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
from inverse.config_inverse import MEASUREMENTS_PATH
|
||||
|
||||
|
||||
def load_measurements(device: torch.device):
|
||||
"""Carica measurements.csv e restituisce tensori (x_s, t_s, T_meas) sul device."""
|
||||
if not os.path.exists(MEASUREMENTS_PATH):
|
||||
raise FileNotFoundError(
|
||||
f"File misure non trovato: {MEASUREMENTS_PATH}\n"
|
||||
"Esegui prima 'python inverse/sample_sensors.py' per generare i dati."
|
||||
)
|
||||
df = pd.read_csv(MEASUREMENTS_PATH)
|
||||
x_s = torch.tensor(df["x"].values, dtype=torch.float32, device=device)
|
||||
t_s = torch.tensor(df["t"].values, dtype=torch.float32, device=device)
|
||||
T_meas = torch.tensor(df["T"].values, dtype=torch.float32, device=device)
|
||||
print(f"Misure caricate: {len(df)} punti da {MEASUREMENTS_PATH}")
|
||||
print(f" Sensori x: {sorted(df['x'].unique().tolist())}")
|
||||
print(f" T range: [{df['T'].min():.2f}, {df['T'].max():.2f}] °C")
|
||||
return x_s, t_s, T_meas
|
||||
@@ -0,0 +1,212 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import random
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.optim as optim
|
||||
from torch.optim.lr_scheduler import ReduceLROnPlateau
|
||||
|
||||
import config
|
||||
from inverse.config_inverse import (
|
||||
N_F, N_IC, N_BC,
|
||||
EPOCHS_INV, LR_ADAM_INV, PATIENCE_INV,
|
||||
SCHED_FACTOR, SCHED_PATIENCE, SCHED_MIN_LR,
|
||||
MODEL_SAVE_PATH, MODELS_DIR,
|
||||
)
|
||||
from inverse.model import InverseHeatPINN
|
||||
from inverse.loss import inverse_heat_pinn_loss
|
||||
|
||||
|
||||
def _get_device():
|
||||
if torch.cuda.is_available():
|
||||
try:
|
||||
t = torch.zeros(2, 2).cuda()
|
||||
torch.mm(t, t)
|
||||
return torch.device('cuda')
|
||||
except RuntimeError:
|
||||
pass
|
||||
if torch.backends.mps.is_available():
|
||||
return torch.device('mps')
|
||||
return torch.device('cpu')
|
||||
|
||||
|
||||
def _set_seed(seed=42):
|
||||
random.seed(seed)
|
||||
np.random.seed(seed)
|
||||
torch.manual_seed(seed)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(seed)
|
||||
|
||||
|
||||
def prepare_data_inverse():
|
||||
_set_seed(42)
|
||||
device = _get_device()
|
||||
|
||||
x_f = torch.rand(N_F, device=device) * config.L
|
||||
t_f = torch.rand(N_F, device=device) * config.T_END
|
||||
|
||||
# Clustering vicino a X_SRC e T_STEP (stessa strategia del forward PINN)
|
||||
n_extra = N_F // 4
|
||||
x_near = config.X_SRC + (torch.rand(n_extra, device=device) - 0.5) * config.L * 0.1
|
||||
x_near = x_near.clamp(0, config.L)
|
||||
t_near = torch.rand(n_extra, device=device) * config.T_END
|
||||
x_step = torch.rand(n_extra, device=device) * config.L
|
||||
t_step = config.T_STEP + (torch.rand(n_extra, device=device) - 0.5) * 0.1
|
||||
t_step = t_step.clamp(0, config.T_END)
|
||||
|
||||
x_f = torch.cat([x_f, x_near, x_step])
|
||||
t_f = torch.cat([t_f, t_near, t_step])
|
||||
|
||||
x_ic = torch.rand(N_IC, device=device) * config.L
|
||||
t_bc = torch.rand(N_BC, device=device) * config.T_END
|
||||
|
||||
return {'device': device, 'x_f': x_f, 't_f': t_f, 'x_ic': x_ic, 't_bc': t_bc}
|
||||
|
||||
|
||||
def train_inverse(data, x_s, t_s, T_meas, identify=('alpha', 'k', 'h_conv'), init_vals=None, epochs=None,
|
||||
w_pde=None, w_ic=None, w_bc=None, w_data=None):
|
||||
device = data['device']
|
||||
if epochs is None:
|
||||
epochs = EPOCHS_INV
|
||||
model = InverseHeatPINN(identify=identify, init_vals=init_vals).to(device)
|
||||
|
||||
print(f"\n--- Inverse PINN Training (Adam) su {device} ---")
|
||||
print(f"Stime iniziali: α={model.alpha.item():.4f} k={model.k.item():.4f} h={model.h_conv.item():.4f}")
|
||||
print(f"Valori veri: α={config.ALPHA:.4f} k={config.K:.4f} h={config.H_CONV:.4f}\n")
|
||||
|
||||
optimizer = optim.Adam(model.parameters(), lr=LR_ADAM_INV)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min',
|
||||
factor=SCHED_FACTOR,
|
||||
patience=SCHED_PATIENCE,
|
||||
min_lr=SCHED_MIN_LR)
|
||||
|
||||
os.makedirs(MODELS_DIR, exist_ok=True)
|
||||
best_loss = float('inf')
|
||||
patience_counter = 0
|
||||
|
||||
def _save_best(model):
|
||||
torch.save({
|
||||
'state_dict': model.state_dict(),
|
||||
'identify': model.identify,
|
||||
'alpha': model.alpha.item(),
|
||||
'k': model.k.item(),
|
||||
'h_conv': model.h_conv.item(),
|
||||
}, MODEL_SAVE_PATH)
|
||||
|
||||
model.train()
|
||||
try:
|
||||
for epoch in range(epochs):
|
||||
optimizer.zero_grad()
|
||||
loss, L_pde, L_ic, L_bc, L_data = inverse_heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'],
|
||||
x_s, t_s, T_meas,
|
||||
**{k: v for k, v in {'w_pde': w_pde, 'w_ic': w_ic, 'w_bc': w_bc, 'w_data': w_data}.items() if v is not None},
|
||||
)
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
scheduler.step(loss.item())
|
||||
|
||||
if loss.item() < best_loss - 1e-8:
|
||||
best_loss = loss.item()
|
||||
patience_counter = 0
|
||||
_save_best(model)
|
||||
else:
|
||||
patience_counter += 1
|
||||
|
||||
if patience_counter >= PATIENCE_INV:
|
||||
print(f"Early stopping a epoca {epoch + 1}")
|
||||
break
|
||||
|
||||
if (epoch + 1) % 100 == 0:
|
||||
print(
|
||||
f"Epoch [{epoch+1}/{epochs}] | Loss: {loss.item():.6f} "
|
||||
f"| PDE: {L_pde.item():.5f} IC: {L_ic.item():.5f} "
|
||||
f"BC: {L_bc.item():.5f} Data: {L_data.item():.5f} "
|
||||
f"| α={model.alpha.item():.5f} k={model.k.item():.5f} h={model.h_conv.item():.4f}"
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
print(f"\nInterrotto a epoca {epoch + 1}. Salvo stato corrente...")
|
||||
_save_best(model)
|
||||
raise
|
||||
|
||||
print("\nTraining completato. Modello salvato.")
|
||||
|
||||
|
||||
def evaluate_inverse():
|
||||
device = _get_device()
|
||||
|
||||
if not os.path.exists(MODEL_SAVE_PATH):
|
||||
print("Modello non trovato. Esegui prima il training.")
|
||||
return
|
||||
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
model = InverseHeatPINN().to(device)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
model.eval()
|
||||
|
||||
alpha_id = model.alpha.item()
|
||||
k_id = model.k.item()
|
||||
h_conv_id = model.h_conv.item()
|
||||
|
||||
print("\n--- Parametri identificati vs veri ---")
|
||||
print(f"{'Param':<10} {'Vero':>10} {'Identificato':>14} {'Errore %':>10}")
|
||||
print("-" * 48)
|
||||
for name, true_val, id_val in [
|
||||
('alpha', config.ALPHA, alpha_id),
|
||||
('k', config.K, k_id),
|
||||
('h_conv', config.H_CONV, h_conv_id),
|
||||
]:
|
||||
err = abs(id_val - true_val) / true_val * 100
|
||||
print(f"{name:<10} {true_val:>10.5f} {id_val:>14.5f} {err:>9.2f}%")
|
||||
|
||||
# Errore L2 del campo T vs FDM
|
||||
from fdm.solver import solve as fdm_solve
|
||||
T_fdm, x_fdm, t_fdm = fdm_solve()
|
||||
|
||||
nx, nt = 100, 100
|
||||
x_vals = torch.linspace(0, config.L, nx, device=device)
|
||||
t_vals = torch.linspace(0, config.T_END, nt, device=device)
|
||||
xx, tt = torch.meshgrid(x_vals, t_vals, indexing='ij')
|
||||
inp = torch.stack([xx.reshape(-1), tt.reshape(-1)], dim=1)
|
||||
with torch.no_grad():
|
||||
T_pred = model(inp).reshape(nx, nt).cpu().numpy()
|
||||
|
||||
x_idx = np.linspace(0, T_fdm.shape[0] - 1, nx, dtype=int)
|
||||
t_idx = np.linspace(0, T_fdm.shape[1] - 1, nt, dtype=int)
|
||||
T_fdm_ds = T_fdm[np.ix_(x_idx, t_idx)]
|
||||
|
||||
l2_rel = np.sqrt(np.mean((T_pred - T_fdm_ds) ** 2)) / np.sqrt(np.mean(T_fdm_ds ** 2))
|
||||
max_err = np.max(np.abs(T_pred - T_fdm_ds))
|
||||
print(f"\nErrore L2 relativo campo T vs FDM: {l2_rel:.6f}")
|
||||
print(f"Errore massimo assoluto: {max_err:.4f} °C")
|
||||
|
||||
|
||||
def generate_visualization_inverse():
|
||||
device = _get_device()
|
||||
|
||||
if not os.path.exists(MODEL_SAVE_PATH):
|
||||
print("Modello non trovato. Esegui prima il training.")
|
||||
return
|
||||
|
||||
ckpt = torch.load(MODEL_SAVE_PATH, map_location=device, weights_only=True)
|
||||
identify = ckpt.get('identify', ['alpha', 'k', 'h_conv'])
|
||||
model = InverseHeatPINN(identify=identify).to(device)
|
||||
model.load_state_dict(ckpt['state_dict'])
|
||||
model.eval()
|
||||
|
||||
nx, nt = 100, 100
|
||||
x_vals = torch.linspace(0, config.L, nx, device=device)
|
||||
t_vals = torch.linspace(0, config.T_END, nt, device=device)
|
||||
xx, tt = torch.meshgrid(x_vals, t_vals, indexing='ij')
|
||||
with torch.no_grad():
|
||||
T_pred = model(torch.stack([xx.reshape(-1), tt.reshape(-1)], dim=1)).reshape(nx, nt).cpu().numpy()
|
||||
|
||||
from fdm.solver import solve as fdm_solve
|
||||
T_fdm, _, _ = fdm_solve()
|
||||
|
||||
identified_params = {'α': model.alpha.item(), 'k': model.k.item(), 'h': model.h_conv.item()}
|
||||
|
||||
from inverse.visualizer import visualize_inverse
|
||||
visualize_inverse(T_pred, x_vals.cpu().numpy(), t_vals.cpu().numpy(), T_fdm, identified_params)
|
||||
@@ -0,0 +1,64 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import config
|
||||
from inverse.config_inverse import W_PDE, W_IC, W_BC, W_DATA
|
||||
|
||||
|
||||
def inverse_heat_pinn_loss(model, x_f, t_f, x_ic, t_bc, x_s, t_s, T_meas,
|
||||
w_pde=W_PDE, w_ic=W_IC, w_bc=W_BC, w_data=W_DATA):
|
||||
# Parametri appresi — NON detachare, i gradienti devono fluire
|
||||
alpha = model.alpha
|
||||
k = model.k
|
||||
h_conv = model.h_conv
|
||||
|
||||
# Scale calcolate dai parametri correnti (necessario per segnale di gradiente verso i params)
|
||||
T_char = config.Q_VAL * config.L / k
|
||||
sigma = config.GAUSS_SIGMA
|
||||
src_peak = alpha * config.Q_VAL / (k * sigma * (2 * torch.pi) ** 0.5)
|
||||
pde_scale = torch.clamp((T_char / config.T_END) ** 2 + src_peak ** 2, min=1e-8)
|
||||
bc_scale = torch.clamp(
|
||||
torch.max((config.Q_VAL / k) ** 2, (h_conv * T_char / k) ** 2), min=1e-8
|
||||
)
|
||||
|
||||
# --- PDE residual ---
|
||||
x_f = x_f.detach().requires_grad_(True)
|
||||
t_f = t_f.detach().requires_grad_(True)
|
||||
T_f = model(torch.stack([x_f, t_f], dim=1))
|
||||
dT_dt, dT_dx = torch.autograd.grad(T_f.sum(), [t_f, x_f], create_graph=True)
|
||||
d2T_dx2 = torch.autograd.grad(dT_dx.sum(), x_f, create_graph=True)[0]
|
||||
|
||||
Q_t = torch.where(t_f >= config.T_STEP,
|
||||
torch.tensor(config.Q_VAL, device=t_f.device, dtype=t_f.dtype),
|
||||
torch.tensor(0.0, device=t_f.device, dtype=t_f.dtype))
|
||||
gauss = torch.exp(-0.5 * ((x_f - config.X_SRC) / sigma) ** 2) / (sigma * (2 * torch.pi) ** 0.5)
|
||||
source = (alpha / k) * Q_t * gauss
|
||||
|
||||
L_pde = ((dT_dt - alpha * d2T_dx2 - source) ** 2).mean() / pde_scale
|
||||
|
||||
# --- IC: T(x, 0) = T0 ---
|
||||
T_ic_pred = model(torch.stack([x_ic, torch.zeros_like(x_ic)], dim=1))
|
||||
L_ic = ((T_ic_pred - config.T0) ** 2).mean() / (T_char ** 2 + 1e-8)
|
||||
|
||||
# --- BC x=0: -k dT/dx = h(T - T_amb) → dT/dx - h/k*(T - T_amb) = 0 ---
|
||||
x_left = torch.zeros(t_bc.shape[0], device=t_bc.device).requires_grad_(True)
|
||||
T_left = model(torch.stack([x_left, t_bc.detach()], dim=1))
|
||||
dT_dx_left = torch.autograd.grad(T_left.sum(), x_left, create_graph=True)[0]
|
||||
L_bc_left = ((dT_dx_left - (h_conv / k) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / bc_scale
|
||||
|
||||
# --- BC x=L: k dT/dx = -h(T - T_amb) → dT/dx + h/k*(T - T_amb) = 0 ---
|
||||
x_right = torch.full((t_bc.shape[0],), config.L, device=t_bc.device).requires_grad_(True)
|
||||
T_right = model(torch.stack([x_right, t_bc.detach()], dim=1))
|
||||
dT_dx_right = torch.autograd.grad(T_right.sum(), x_right, create_graph=True)[0]
|
||||
L_bc_right = ((dT_dx_right + (h_conv / k) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / bc_scale
|
||||
|
||||
L_bc = L_bc_left + L_bc_right
|
||||
|
||||
# --- Data fit ---
|
||||
T_pred_s = model(torch.stack([x_s, t_s], dim=1)).squeeze()
|
||||
L_data = ((T_pred_s - T_meas) ** 2).mean() / (T_char ** 2 + 1e-8)
|
||||
|
||||
total = w_pde * L_pde + w_ic * L_ic + w_bc * L_bc + w_data * L_data
|
||||
return total, L_pde, L_ic, L_bc, L_data
|
||||
@@ -0,0 +1,55 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import config
|
||||
from inverse.config_inverse import (
|
||||
HIDDEN_SIZE, N_HIDDEN_LAYERS,
|
||||
ALPHA_INIT, K_INIT, H_CONV_INIT,
|
||||
)
|
||||
|
||||
|
||||
class InverseHeatPINN(nn.Module):
|
||||
def __init__(self, identify=('alpha', 'k', 'h_conv'), init_vals=None):
|
||||
super().__init__()
|
||||
layers = [nn.Linear(2, HIDDEN_SIZE), nn.Tanh()]
|
||||
for _ in range(N_HIDDEN_LAYERS - 1):
|
||||
layers += [nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE), nn.Tanh()]
|
||||
layers.append(nn.Linear(HIDDEN_SIZE, 1))
|
||||
self.net = nn.Sequential(*layers)
|
||||
self.identify = list(identify)
|
||||
|
||||
if init_vals is None:
|
||||
init_vals = {}
|
||||
|
||||
_inits = {'alpha': ALPHA_INIT, 'k': K_INIT, 'h_conv': H_CONV_INIT}
|
||||
_true = {'alpha': config.ALPHA, 'k': config.K, 'h_conv': config.H_CONV}
|
||||
|
||||
for name in ('alpha', 'k', 'h_conv'):
|
||||
val = init_vals.get(name, _inits[name]) if name in identify else _true[name]
|
||||
log_val = torch.log(torch.tensor(float(val)))
|
||||
if name in identify:
|
||||
setattr(self, f'log_{name}', nn.Parameter(log_val))
|
||||
else:
|
||||
self.register_buffer(f'log_{name}', log_val)
|
||||
|
||||
@property
|
||||
def alpha(self):
|
||||
return torch.exp(self.log_alpha)
|
||||
|
||||
@property
|
||||
def k(self):
|
||||
return torch.exp(self.log_k)
|
||||
|
||||
@property
|
||||
def h_conv(self):
|
||||
return torch.exp(self.log_h_conv)
|
||||
|
||||
def forward(self, xt):
|
||||
x = xt[:, :1] / config.L
|
||||
t = xt[:, 1:] / config.T_END
|
||||
# Output scaling identica al forward PINN, ma usa i parametri appresi
|
||||
T_char = config.T_AMB + (config.Q_VAL * config.L / self.k)
|
||||
return config.T_AMB + (config.Q_VAL * config.L / self.k) * self.net(torch.cat([x, t], dim=1))
|
||||
@@ -0,0 +1,112 @@
|
||||
"""
|
||||
Campionamento sensori dalla soluzione FDM.
|
||||
|
||||
Legge results/fdm/fdm_solution.csv (o lancia il solver FDM se non esiste),
|
||||
chiede le posizioni dei sensori e la frequenza di campionamento, salva
|
||||
results/fdm/measurements.csv con colonne: x, t, T.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
FDM_CSV = "results/fdm/fdm_solution.csv"
|
||||
OUT_CSV = "results/measurements.csv"
|
||||
|
||||
|
||||
def _load_or_solve():
|
||||
if os.path.exists(FDM_CSV):
|
||||
print(f"Carico soluzione FDM da {FDM_CSV} ...")
|
||||
df = pd.read_csv(FDM_CSV)
|
||||
x_vals = np.sort(df["x"].unique())
|
||||
t_vals = np.sort(df["t"].unique())
|
||||
T_matrix = df.pivot(index="x", columns="t", values="T").values
|
||||
return T_matrix, x_vals, t_vals
|
||||
else:
|
||||
print(f"{FDM_CSV} non trovato. Lancio il solver FDM ...")
|
||||
from fdm.solver import solve, save_csv
|
||||
T_matrix, x_vals, t_vals = solve()
|
||||
save_csv(T_matrix, x_vals, t_vals, FDM_CSV)
|
||||
return T_matrix, x_vals, t_vals
|
||||
|
||||
|
||||
def _ask_sensors(L: float) -> list[float]:
|
||||
print(f"\nDominio spaziale: [0, {L}] m")
|
||||
print("Inserisci le posizioni dei sensori separate da virgola (es: 0.2, 0.5, 0.8):")
|
||||
while True:
|
||||
raw = input(" x sensori [m]: ").strip()
|
||||
try:
|
||||
positions = [float(v.strip()) for v in raw.split(",") if v.strip()]
|
||||
if not positions:
|
||||
raise ValueError
|
||||
out_of_range = [p for p in positions if p < 0 or p > L]
|
||||
if out_of_range:
|
||||
print(f" Valori fuori range [0, {L}]: {out_of_range}. Riprova.")
|
||||
continue
|
||||
return positions
|
||||
except ValueError:
|
||||
print(" Input non valido. Usa numeri separati da virgola.")
|
||||
|
||||
|
||||
def _ask_freq(dt_fdm: float) -> float:
|
||||
max_freq = 1.0 / dt_fdm
|
||||
print(f"\nFrequenza massima (passo FDM): {max_freq:.2f} Hz")
|
||||
print("Inserisci la frequenza di campionamento [Hz]:")
|
||||
while True:
|
||||
raw = input(" f [Hz]: ").strip()
|
||||
try:
|
||||
f = float(raw)
|
||||
if f <= 0:
|
||||
raise ValueError
|
||||
if f > max_freq:
|
||||
print(f" Frequenza troppo alta (max {max_freq:.2f} Hz). Viene usata la frequenza massima.")
|
||||
f = max_freq
|
||||
return f
|
||||
except ValueError:
|
||||
print(" Input non valido. Inserisci un numero positivo.")
|
||||
|
||||
|
||||
def main():
|
||||
print("=" * 40)
|
||||
print(" Campionamento sensori FDM")
|
||||
print("=" * 40)
|
||||
|
||||
import config
|
||||
T_matrix, x_vals, t_vals = _load_or_solve()
|
||||
|
||||
sensor_positions = _ask_sensors(config.L)
|
||||
freq = _ask_freq(t_vals[1] - t_vals[0])
|
||||
|
||||
# Indici temporali al passo di campionamento
|
||||
dt_sample = 1.0 / freq
|
||||
t_sampled = np.arange(t_vals[0], t_vals[-1] + dt_sample * 0.5, dt_sample)
|
||||
t_indices = [int(np.argmin(np.abs(t_vals - t))) for t in t_sampled]
|
||||
t_indices = sorted(set(t_indices))
|
||||
|
||||
# Campionamento
|
||||
rows = []
|
||||
for x_s in sensor_positions:
|
||||
i_x = int(np.argmin(np.abs(x_vals - x_s)))
|
||||
x_actual = x_vals[i_x]
|
||||
for i_t in t_indices:
|
||||
rows.append({"x": x_actual, "t": t_vals[i_t], "T": T_matrix[i_x, i_t]})
|
||||
|
||||
df_out = pd.DataFrame(rows).sort_values(["x", "t"]).reset_index(drop=True)
|
||||
|
||||
os.makedirs(os.path.dirname(os.path.abspath(OUT_CSV)), exist_ok=True)
|
||||
df_out.to_csv(OUT_CSV, index=False)
|
||||
|
||||
n_sensors = len(sensor_positions)
|
||||
n_times = len(t_indices)
|
||||
print(f"\nSalvato: {OUT_CSV}")
|
||||
print(f" Sensori: {n_sensors} | Istanti: {n_times} | Righe totali: {len(df_out)}")
|
||||
print(f" Posizioni effettive: {sorted(df_out['x'].unique().tolist())}")
|
||||
print(f" Intervallo T: [{df_out['T'].min():.2f}, {df_out['T'].max():.2f}] °C")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,118 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from datetime import datetime
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
from plotly.subplots import make_subplots
|
||||
import config
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
|
||||
def visualize_inverse(T_pred, x_vals, t_vals, T_fdm, identified_params: dict):
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
out_dir = os.path.join(BASE_DIR, 'results', 'inverse', timestamp)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
x_indices = np.linspace(0, T_fdm.shape[0] - 1, len(x_vals), dtype=int)
|
||||
t_indices = np.linspace(0, T_fdm.shape[1] - 1, len(t_vals), dtype=int)
|
||||
T_fdm_ds = T_fdm[np.ix_(x_indices, t_indices)]
|
||||
|
||||
param_str = ' | '.join(f'{k}={v:.4f}' for k, v in identified_params.items())
|
||||
subtitle = f'Parametri identificati: {param_str}'
|
||||
|
||||
colorscale = 'Hot_r'
|
||||
zmin = float(min(np.min(T_pred), np.min(T_fdm_ds)))
|
||||
zmax = float(max(np.max(T_pred), np.max(T_fdm_ds)))
|
||||
|
||||
# --- Heatmap ---
|
||||
fig_map = make_subplots(
|
||||
rows=1, cols=2,
|
||||
subplot_titles=["Inverse PINN T(x,t)", "FDM Reference T(x,t)"],
|
||||
shared_yaxes=True,
|
||||
)
|
||||
fig_map.add_trace(go.Heatmap(
|
||||
z=T_pred.T, x=x_vals, y=t_vals,
|
||||
colorscale=colorscale, zmin=zmin, zmax=zmax,
|
||||
colorbar=dict(x=0.46, title='T [°C]'),
|
||||
), row=1, col=1)
|
||||
fig_map.add_trace(go.Heatmap(
|
||||
z=T_fdm_ds.T, x=x_vals, y=t_vals,
|
||||
colorscale=colorscale, zmin=zmin, zmax=zmax,
|
||||
colorbar=dict(x=1.01, title='T [°C]'),
|
||||
), row=1, col=2)
|
||||
fig_map.update_xaxes(title_text='x')
|
||||
fig_map.update_yaxes(title_text='t', row=1, col=1)
|
||||
fig_map.update_layout(title_text=f'Inverse PINN vs FDM<br><sup>{subtitle}</sup>', height=450)
|
||||
map_path = os.path.join(out_dir, 'heatmap.html')
|
||||
fig_map.write_html(map_path)
|
||||
print(f"Heatmap saved → {map_path}")
|
||||
|
||||
# --- Animated profile T(x) ---
|
||||
n_frames = len(t_vals)
|
||||
frames = [
|
||||
go.Frame(
|
||||
data=[
|
||||
go.Scatter(x=x_vals, y=T_pred[:, i], mode='lines',
|
||||
line=dict(color='royalblue', width=2), name='Inverse PINN'),
|
||||
go.Scatter(x=x_vals, y=T_fdm_ds[:, i], mode='lines',
|
||||
line=dict(color='firebrick', width=2, dash='dash'), name='FDM'),
|
||||
],
|
||||
name=str(i),
|
||||
layout=go.Layout(title_text=f'Inverse PINN vs FDM | t = {t_vals[i]:.3f}'),
|
||||
)
|
||||
for i in range(n_frames)
|
||||
]
|
||||
fig_anim = go.Figure(
|
||||
data=frames[0].data,
|
||||
layout=go.Layout(
|
||||
title=f'Inverse PINN vs FDM | t = {t_vals[0]:.3f}',
|
||||
xaxis=dict(title='x [m]', range=[-0.02, config.L + 0.02]),
|
||||
yaxis=dict(title='T [°C]', range=[zmin - 1, zmax + 1]),
|
||||
legend=dict(x=0.75, y=0.95),
|
||||
updatemenus=[dict(
|
||||
type='buttons', showactive=False, y=1.15, x=0.5, xanchor='center',
|
||||
buttons=[
|
||||
dict(label='▶ Play', method='animate',
|
||||
args=[None, dict(frame=dict(duration=40, redraw=False),
|
||||
fromcurrent=True, mode='immediate')]),
|
||||
dict(label='⏸ Pause', method='animate',
|
||||
args=[[None], dict(frame=dict(duration=0, redraw=False),
|
||||
mode='immediate')]),
|
||||
],
|
||||
)],
|
||||
sliders=[dict(
|
||||
steps=[dict(method='animate', args=[[str(i)],
|
||||
dict(mode='immediate', frame=dict(duration=0, redraw=False))],
|
||||
label=f'{t_vals[i]:.2f}') for i in range(n_frames)],
|
||||
transition=dict(duration=0), x=0.05, y=0, len=0.9,
|
||||
currentvalue=dict(prefix='t = ', font=dict(size=14)),
|
||||
)],
|
||||
),
|
||||
frames=frames,
|
||||
)
|
||||
anim_path = os.path.join(out_dir, 'animation.html')
|
||||
fig_anim.write_html(anim_path)
|
||||
print(f"Animation saved → {anim_path}")
|
||||
|
||||
# --- Time-series a x=0, x=L/2, x=L ---
|
||||
nx = len(x_vals)
|
||||
points = [(0, 'x=0', 'blue'), (nx // 2, 'x=L/2', 'green'), (nx - 1, 'x=L', 'red')]
|
||||
fig_ts = go.Figure()
|
||||
for idx, label, color in points:
|
||||
fig_ts.add_trace(go.Scatter(x=t_vals, y=T_pred[idx, :], mode='lines',
|
||||
line=dict(color=color, width=2), name=f'Inv.PINN {label}'))
|
||||
fig_ts.add_trace(go.Scatter(x=t_vals, y=T_fdm_ds[idx, :], mode='lines',
|
||||
line=dict(color=color, width=2, dash='dash'), name=f'FDM {label}'))
|
||||
fig_ts.add_vline(x=config.T_STEP, line=dict(color='red', dash='dash', width=1.5),
|
||||
annotation_text='Heat ON', annotation_position='top right')
|
||||
fig_ts.update_layout(
|
||||
title=f'Inverse PINN vs FDM — Time Series<br><sup>{subtitle}</sup>',
|
||||
xaxis_title='t', yaxis_title='T(x,t)',
|
||||
legend=dict(x=0.01, y=0.99), height=500,
|
||||
)
|
||||
comparison_path = os.path.join(out_dir, 'comparison.html')
|
||||
fig_ts.write_html(comparison_path)
|
||||
print(f"Comparison saved → {comparison_path}")
|
||||
@@ -6,58 +6,66 @@ import config
|
||||
class HeatPINN(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(2, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 128), nn.Tanh(),
|
||||
nn.Linear(128, 1),
|
||||
)
|
||||
h = config.HIDDEN_SIZE
|
||||
layers = [nn.Linear(2, h), nn.Tanh()]
|
||||
for _ in range(config.N_HIDDEN_LAYERS - 1):
|
||||
layers += [nn.Linear(h, h), nn.Tanh()]
|
||||
layers.append(nn.Linear(h, 1))
|
||||
self.net = nn.Sequential(*layers)
|
||||
|
||||
def forward(self, x):
|
||||
# Output scaled to physical range: T_AMB + (Q*L/K) * net
|
||||
# net learns dimensionless perturbation in [0,1] range
|
||||
T_scale = config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(x)
|
||||
return T_scale
|
||||
def forward(self, xt):
|
||||
x = xt[:, :1] / config.L
|
||||
t = xt[:, 1:] / config.T_END
|
||||
return config.T_AMB + (config.Q_VAL * config.L / config.K) * self.net(torch.cat([x, t], dim=1))
|
||||
|
||||
|
||||
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc, w_pde=1.0, w_ic=1.0, w_bc=10.0):
|
||||
# Characteristic scales for normalization
|
||||
T_char = config.Q_VAL * config.L / config.K # ~50 °C — temperature scale
|
||||
grad_char = (config.Q_VAL / config.K) ** 2 # ~2500 — gradient scale²
|
||||
# Precomputed loss scales (depend only on config constants)
|
||||
_T_char = config.Q_VAL * config.L / config.K # ~150 °C — temperature scale
|
||||
# _pde_scale covers both dT/dt and the Gaussian source peak (dominant with small sigma)
|
||||
_src_peak = config.ALPHA * config.Q_VAL / (config.K * config.GAUSS_SIGMA * (2 * 3.141592653589793) ** 0.5)
|
||||
_pde_scale = max((_T_char / config.T_END) ** 2, _src_peak ** 2) + 1e-8
|
||||
# Robin BC residual scale: max(dT/dx, H_CONV/K * T_char) — convective term dominates when H*L/K >> 1
|
||||
_bc_scale = max(config.Q_VAL / config.K,
|
||||
config.H_CONV * _T_char / config.K) ** 2
|
||||
|
||||
|
||||
def heat_pinn_loss(model, x_f, t_f, x_ic, t_bc,
|
||||
w_pde=None, w_ic=None, w_bc=None):
|
||||
if w_pde is None: w_pde = config.W_PDE
|
||||
if w_ic is None: w_ic = config.W_IC
|
||||
if w_bc is None: w_bc = config.W_BC
|
||||
T_char = _T_char
|
||||
|
||||
# PDE residual: dT/dt - alpha * d2T/dx2 - source(x,t) = 0 (normalized by T_char/t_char)
|
||||
x_f = x_f.detach().requires_grad_(True)
|
||||
t_f = t_f.detach().requires_grad_(True)
|
||||
T_f = model(torch.stack([x_f, t_f], dim=1))
|
||||
dT_dt = torch.autograd.grad(T_f.sum(), t_f, create_graph=True)[0]
|
||||
dT_dx = torch.autograd.grad(T_f.sum(), x_f, create_graph=True)[0]
|
||||
dT_dt, dT_dx = torch.autograd.grad(T_f.sum(), [t_f, x_f], create_graph=True)
|
||||
d2T_dx2 = torch.autograd.grad(dT_dx.sum(), x_f, create_graph=True)[0]
|
||||
sigma = 0.02
|
||||
Q_t_f = torch.where(t_f >= config.T_STEP,
|
||||
torch.tensor(config.Q_VAL, device=t_f.device, dtype=t_f.dtype),
|
||||
torch.tensor(0.0, device=t_f.device, dtype=t_f.dtype))
|
||||
sigma = config.GAUSS_SIGMA
|
||||
gauss = torch.exp(-0.5 * ((x_f - config.X_SRC) / sigma) ** 2) / (sigma * (2 * torch.pi) ** 0.5)
|
||||
source_term = (config.ALPHA / config.K) * Q_t_f * gauss
|
||||
pde_scale = (T_char / config.T_END) ** 2 + 1e-8
|
||||
L_pde = ((dT_dt - config.ALPHA * d2T_dx2 - source_term) ** 2).mean() / pde_scale
|
||||
L_pde = ((dT_dt - config.ALPHA * d2T_dx2 - source_term) ** 2).mean() / _pde_scale
|
||||
|
||||
# IC: T(x, 0) = T0 — normalized by T_char²
|
||||
T_ic_pred = model(torch.stack([x_ic, torch.zeros_like(x_ic)], dim=1))
|
||||
T_ic_true = torch.full_like(T_ic_pred, config.T0)
|
||||
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (T_char ** 2 + 1e-8)
|
||||
L_ic = ((T_ic_pred - T_ic_true) ** 2).mean() / (_T_char ** 2 + 1e-8)
|
||||
|
||||
# BC x=0: Robin — dT/dx + H_CONV/K * (T(0,t) - T_AMB) = 0
|
||||
x_left = torch.zeros(t_bc.shape[0], device=t_bc.device).requires_grad_(True)
|
||||
T_left = model(torch.stack([x_left, t_bc.detach()], dim=1))
|
||||
dT_dx_left = torch.autograd.grad(T_left.sum(), x_left, create_graph=True)[0]
|
||||
L_bc_left = ((dT_dx_left + (config.H_CONV / config.K) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / grad_char
|
||||
L_bc_left = ((dT_dx_left - (config.H_CONV / config.K) * (T_left.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
|
||||
|
||||
# BC x=L: Robin — dT/dx + H_CONV/K * (T(L,t) - T_AMB) = 0
|
||||
x_right = torch.full((t_bc.shape[0],), config.L, device=t_bc.device).requires_grad_(True)
|
||||
T_right = model(torch.stack([x_right, t_bc.detach()], dim=1))
|
||||
dT_dx_right = torch.autograd.grad(T_right.sum(), x_right, create_graph=True)[0]
|
||||
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / grad_char
|
||||
L_bc_right = ((dT_dx_right + (config.H_CONV / config.K) * (T_right.squeeze() - config.T_AMB)) ** 2).mean() / _bc_scale
|
||||
|
||||
L_bc = L_bc_left + L_bc_right
|
||||
total = w_pde * L_pde + w_ic * L_ic + w_bc * L_bc
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
addopts = -v --tb=short
|
||||
markers =
|
||||
slow: test lenti (training completo) — escludi con -m "not slow"
|
||||
@@ -1,4 +1,5 @@
|
||||
torch>=2.0.0
|
||||
pytest>=7.0.0
|
||||
pandas>=2.0.0
|
||||
numpy>=1.24.0
|
||||
scikit-learn>=1.3.0
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def device():
|
||||
from engine import _get_device
|
||||
return _get_device()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def small_data():
|
||||
from engine import prepare_data
|
||||
return prepare_data(N_f=200, N_ic=50, N_bc=50)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pinn_model(device):
|
||||
from model import HeatPINN
|
||||
return HeatPINN().to(device)
|
||||
@@ -0,0 +1,73 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import math
|
||||
import config
|
||||
|
||||
|
||||
def test_x_src_within_domain():
|
||||
assert 0.0 <= config.X_SRC <= config.L
|
||||
|
||||
|
||||
def test_t_step_before_t_end():
|
||||
assert 0.0 < config.T_STEP < config.T_END
|
||||
|
||||
|
||||
def test_gauss_sigma_positive():
|
||||
assert config.GAUSS_SIGMA > 0.0
|
||||
|
||||
|
||||
def test_physics_positive():
|
||||
assert config.ALPHA > 0.0
|
||||
assert config.K > 0.0
|
||||
assert config.H_CONV > 0.0
|
||||
assert config.L > 0.0
|
||||
assert config.T_END > 0.0
|
||||
|
||||
|
||||
def test_training_hyperparameters_positive():
|
||||
assert config.PATIENCE > 0
|
||||
assert config.EPOCHS > 0
|
||||
assert config.LR_ADAM > 0.0
|
||||
assert config.SCHED_MIN_LR > 0.0
|
||||
assert config.SCHED_FACTOR > 0.0
|
||||
assert config.SCHED_PATIENCE > 0
|
||||
|
||||
|
||||
def test_lr_ordering():
|
||||
"""Min LR deve essere inferiore all'LR iniziale."""
|
||||
assert config.SCHED_MIN_LR < config.LR_ADAM
|
||||
|
||||
|
||||
def test_sched_patience_lt_patience():
|
||||
"""Lo scheduler deve poter agire prima che scatti l'early stopping."""
|
||||
assert config.SCHED_PATIENCE < config.PATIENCE
|
||||
|
||||
|
||||
def test_cfl_stability():
|
||||
"""La griglia FDM deve soddisfare la condizione CFL (r ≤ 0.5)."""
|
||||
dx = config.L / (config.NX - 1)
|
||||
dt = config.T_END / (config.NT - 1)
|
||||
r = config.ALPHA * dt / dx ** 2
|
||||
assert r <= 0.5, f"CFL violata: r={r:.4f} > 0.5"
|
||||
|
||||
|
||||
def test_grid_dimensions():
|
||||
assert config.NX >= 2
|
||||
assert config.NT >= 2
|
||||
assert config.N_F >= 1
|
||||
assert config.N_IC >= 1
|
||||
assert config.N_BC >= 1
|
||||
|
||||
|
||||
def test_pde_scale_covers_source_peak():
|
||||
"""_pde_scale in model.py deve coprire il picco gaussiano della sorgente."""
|
||||
from model import _pde_scale
|
||||
src_peak = config.ALPHA * config.Q_VAL / (
|
||||
config.K * config.GAUSS_SIGMA * math.sqrt(2 * math.pi)
|
||||
)
|
||||
assert _pde_scale >= src_peak ** 2 - 1e-6, (
|
||||
f"_pde_scale={_pde_scale:.1f} < src_peak²={src_peak**2:.1f}: "
|
||||
"la loss PDE non è normalizzata correttamente"
|
||||
)
|
||||
@@ -0,0 +1,87 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import pytest
|
||||
import numpy as np
|
||||
import torch
|
||||
import config
|
||||
|
||||
|
||||
# ── FDM ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_fdm_full_run():
|
||||
"""Il solver FDM produce un campo di temperatura fisicamente corretto."""
|
||||
from fdm.solver import solve
|
||||
T, x, t = solve()
|
||||
|
||||
assert T.shape == (config.NX, config.NT)
|
||||
assert np.isfinite(T).all()
|
||||
assert T[:, -1].mean() > config.T0 # la sorgente ha scaldato il dominio
|
||||
assert T.max() > config.T0 # picco sopra la IC
|
||||
assert T.min() >= config.T0 - 1e-6 # nessun raffreddamento sotto T0 (no sorgenti fredde)
|
||||
|
||||
|
||||
def test_fdm_visualizer_creates_html(tmp_path, monkeypatch):
|
||||
"""Il visualizer FDM scrive almeno un file HTML senza errori."""
|
||||
import fdm.visualizer as fdm_vis
|
||||
monkeypatch.setattr(fdm_vis, 'BASE_DIR', str(tmp_path))
|
||||
|
||||
from fdm.solver import solve
|
||||
T, x, t = solve()
|
||||
fdm_vis.visualize_fdm(T, x, t)
|
||||
|
||||
html_files = list(tmp_path.rglob('*.html'))
|
||||
assert len(html_files) >= 1, "Nessun file HTML generato dal visualizer FDM"
|
||||
|
||||
|
||||
def test_pinn_visualizer_creates_html(tmp_path, monkeypatch):
|
||||
"""Il visualizer PINN scrive i tre file HTML senza errori."""
|
||||
import visualizer as pinn_vis
|
||||
monkeypatch.setattr(pinn_vis, 'BASE_DIR', str(tmp_path))
|
||||
|
||||
from fdm.solver import solve as fdm_solve
|
||||
T_fdm, _, _ = fdm_solve()
|
||||
|
||||
nx, nt = 20, 20
|
||||
x_vals = np.linspace(0, config.L, nx)
|
||||
t_vals = np.linspace(0, config.T_END, nt)
|
||||
T_pred = np.full((nx, nt), config.T_AMB) # predizione costante (dummy)
|
||||
|
||||
pinn_vis.visualize_heat_field(T_pred, x_vals, t_vals, T_fdm)
|
||||
|
||||
html_files = list(tmp_path.rglob('*.html'))
|
||||
assert len(html_files) == 3, f"Attesi 3 HTML, trovati {len(html_files)}"
|
||||
|
||||
|
||||
# ── PINN training (lento) ─────────────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_pinn_training_saves_checkpoint(tmp_path, monkeypatch):
|
||||
"""Training per 30 epoche: il checkpoint viene salvato."""
|
||||
import engine
|
||||
save_path = str(tmp_path / 'model.pth')
|
||||
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
|
||||
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
|
||||
|
||||
from engine import prepare_data, train_model
|
||||
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
|
||||
train_model(data, epochs=30, patience=30)
|
||||
|
||||
assert os.path.exists(save_path)
|
||||
ckpt = torch.load(save_path, map_location='cpu', weights_only=True)
|
||||
assert 'state_dict' in ckpt
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_pinn_evaluate_after_training(tmp_path, monkeypatch):
|
||||
"""evaluate_model gira senza errori dopo un training minimo."""
|
||||
import engine
|
||||
save_path = str(tmp_path / 'model.pth')
|
||||
monkeypatch.setattr(engine, 'MODEL_SAVE_PATH', save_path)
|
||||
monkeypatch.setattr(engine, 'MODELS_DIR', str(tmp_path))
|
||||
|
||||
from engine import prepare_data, train_model, evaluate_model
|
||||
data = prepare_data(N_f=300, N_ic=100, N_bc=100)
|
||||
train_model(data, epochs=30, patience=30)
|
||||
evaluate_model(data)
|
||||
@@ -0,0 +1,67 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import config
|
||||
from engine import set_seed, _get_device, prepare_data
|
||||
|
||||
|
||||
def test_set_seed_reproducibility():
|
||||
set_seed(42)
|
||||
r1 = torch.rand(10)
|
||||
set_seed(42)
|
||||
r2 = torch.rand(10)
|
||||
torch.testing.assert_close(r1, r2)
|
||||
|
||||
|
||||
def test_get_device_valid():
|
||||
device = _get_device()
|
||||
assert isinstance(device, torch.device)
|
||||
assert device.type in ('cpu', 'cuda', 'mps')
|
||||
|
||||
|
||||
def test_prepare_data_keys():
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
assert set(data.keys()) == {'device', 'x_f', 't_f', 'x_ic', 't_bc'}
|
||||
|
||||
|
||||
def test_prepare_data_shapes():
|
||||
N_f, N_ic, N_bc = 100, 50, 50
|
||||
data = prepare_data(N_f=N_f, N_ic=N_ic, N_bc=N_bc)
|
||||
# engine.py aggiunge 2 * (N_f // 4) punti di clustering
|
||||
expected_f = N_f + 2 * (N_f // 4)
|
||||
assert data['x_f'].shape == (expected_f,)
|
||||
assert data['t_f'].shape == (expected_f,)
|
||||
assert data['x_ic'].shape == (N_ic,)
|
||||
assert data['t_bc'].shape == (N_bc,)
|
||||
|
||||
|
||||
def test_prepare_data_x_bounds():
|
||||
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
|
||||
assert data['x_f'].min().item() >= 0.0
|
||||
assert data['x_f'].max().item() <= config.L
|
||||
assert data['x_ic'].min().item() >= 0.0
|
||||
assert data['x_ic'].max().item() <= config.L
|
||||
|
||||
|
||||
def test_prepare_data_t_bounds():
|
||||
data = prepare_data(N_f=500, N_ic=100, N_bc=100)
|
||||
assert data['t_f'].min().item() >= 0.0
|
||||
assert data['t_f'].max().item() <= config.T_END
|
||||
|
||||
|
||||
def test_prepare_data_device_consistency():
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
expected = data['device'].type
|
||||
for key in ('x_f', 't_f', 'x_ic', 't_bc'):
|
||||
assert data[key].device.type == expected, f"{key} sul device sbagliato"
|
||||
|
||||
|
||||
def test_prepare_data_deterministic():
|
||||
"""Due chiamate con lo stesso seed (fissato in prepare_data) producono dati identici."""
|
||||
d1 = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
d2 = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
torch.testing.assert_close(d1['x_f'], d2['x_f'])
|
||||
torch.testing.assert_close(d1['t_f'], d2['t_f'])
|
||||
torch.testing.assert_close(d1['x_ic'], d2['x_ic'])
|
||||
@@ -0,0 +1,65 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
from engine import prepare_data
|
||||
from model import HeatPINN, heat_pinn_loss
|
||||
|
||||
|
||||
def test_data_to_model_forward():
|
||||
"""prepare_data → forward: shape e device coerenti, nessun NaN."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
xt = torch.stack([data['x_f'], data['t_f']], dim=1)
|
||||
out = model(xt)
|
||||
assert out.shape == (data['x_f'].shape[0], 1)
|
||||
assert out.device.type == data['device'].type
|
||||
assert torch.isfinite(out).all()
|
||||
|
||||
|
||||
def test_full_loss_pipeline():
|
||||
"""prepare_data → heat_pinn_loss: tutti i componenti finiti e non-negativi."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
total, L_pde, L_ic, L_bc = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
for name, v in [('total', total), ('L_pde', L_pde), ('L_ic', L_ic), ('L_bc', L_bc)]:
|
||||
assert torch.isfinite(v), f"{name} non è finita"
|
||||
assert v.item() >= 0.0, f"{name} è negativa"
|
||||
|
||||
|
||||
def test_backward_gradients_finite():
|
||||
"""Il backward della loss non produce NaN/Inf nei parametri."""
|
||||
data = prepare_data(N_f=100, N_ic=50, N_bc=50)
|
||||
model = HeatPINN().to(data['device'])
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
|
||||
optimizer.zero_grad()
|
||||
total, _, _, _ = heat_pinn_loss(
|
||||
model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc']
|
||||
)
|
||||
total.backward()
|
||||
for p in model.parameters():
|
||||
assert p.grad is not None
|
||||
assert torch.isfinite(p.grad).all(), "gradiente NaN/Inf"
|
||||
|
||||
|
||||
def test_training_loop_stable():
|
||||
"""20 step di Adam non producono NaN/Inf nei parametri né nella loss."""
|
||||
data = prepare_data(N_f=200, N_ic=100, N_bc=100)
|
||||
model = HeatPINN().to(data['device'])
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
|
||||
args = (model, data['x_f'], data['t_f'], data['x_ic'], data['t_bc'])
|
||||
|
||||
for _ in range(20):
|
||||
optimizer.zero_grad()
|
||||
loss, _, _, _ = heat_pinn_loss(*args)
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
for p in model.parameters():
|
||||
assert torch.isfinite(p).all(), "parametro NaN/Inf dopo training"
|
||||
|
||||
total, _, _, _ = heat_pinn_loss(*args)
|
||||
assert torch.isfinite(total)
|
||||
@@ -0,0 +1,102 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import torch
|
||||
import config
|
||||
from model import HeatPINN, heat_pinn_loss
|
||||
|
||||
|
||||
# ── HeatPINN.forward ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_forward_output_shape(pinn_model, device):
|
||||
xt = torch.zeros(64, 2, device=device)
|
||||
xt[:, 0] = torch.rand(64) * config.L
|
||||
xt[:, 1] = torch.rand(64) * config.T_END
|
||||
assert pinn_model(xt).shape == (64, 1)
|
||||
|
||||
|
||||
def test_forward_finite(pinn_model, device):
|
||||
xt = torch.zeros(100, 2, device=device)
|
||||
xt[:, 0] = torch.rand(100) * config.L
|
||||
xt[:, 1] = torch.rand(100) * config.T_END
|
||||
assert torch.isfinite(pinn_model(xt)).all()
|
||||
|
||||
|
||||
def test_forward_zero_weights_returns_t_amb(device):
|
||||
"""Con pesi nulli net(x,t)=0 ⇒ forward restituisce T_AMB per ogni input."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
xt = torch.zeros(20, 2, device=device)
|
||||
xt[:, 0] = torch.rand(20) * config.L
|
||||
xt[:, 1] = torch.rand(20) * config.T_END
|
||||
out = model(xt)
|
||||
torch.testing.assert_close(out, torch.full_like(out, config.T_AMB), atol=1e-5, rtol=0.0)
|
||||
|
||||
|
||||
def test_forward_t_normalization(device):
|
||||
"""t viene normalizzato a [0,1]: il modello deve restituire output finiti
|
||||
anche a t=T_END senza saturazione di Tanh."""
|
||||
model = HeatPINN().to(device)
|
||||
torch.nn.init.xavier_uniform_(model.net[0].weight)
|
||||
xt = torch.tensor([[0.5, 0.0], [0.5, config.T_END]], device=device)
|
||||
out = model(xt)
|
||||
assert out.shape == (2, 1)
|
||||
assert torch.isfinite(out).all()
|
||||
|
||||
|
||||
# ── heat_pinn_loss ────────────────────────────────────────────────────────────
|
||||
|
||||
def _dummy_inputs(device, n_f=100, n_ic=50, n_bc=50):
|
||||
x_f = torch.rand(n_f, device=device) * config.L
|
||||
t_f = torch.rand(n_f, device=device) * config.T_END
|
||||
x_ic = torch.rand(n_ic, device=device) * config.L
|
||||
t_bc = torch.rand(n_bc, device=device) * config.T_END
|
||||
return x_f, t_f, x_ic, t_bc
|
||||
|
||||
|
||||
def test_loss_returns_four_values(pinn_model, device):
|
||||
result = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
|
||||
assert len(result) == 4
|
||||
|
||||
|
||||
def test_loss_components_non_negative(pinn_model, device):
|
||||
total, L_pde, L_ic, L_bc = heat_pinn_loss(pinn_model, *_dummy_inputs(device))
|
||||
assert total.item() >= 0.0
|
||||
assert L_pde.item() >= 0.0
|
||||
assert L_ic.item() >= 0.0
|
||||
assert L_bc.item() >= 0.0
|
||||
|
||||
|
||||
def test_loss_finite(pinn_model, device):
|
||||
for v in heat_pinn_loss(pinn_model, *_dummy_inputs(device)):
|
||||
assert torch.isfinite(v), f"loss non finita: {v}"
|
||||
|
||||
|
||||
def test_loss_weight_doubles_pde_contribution(pinn_model, device):
|
||||
"""Raddoppiare w_pde con w_ic=w_bc=0 deve raddoppiare il totale."""
|
||||
inputs = _dummy_inputs(device)
|
||||
total1, L_pde1, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=1.0, w_ic=0.0, w_bc=0.0)
|
||||
total2, L_pde2, _, _ = heat_pinn_loss(pinn_model, *inputs, w_pde=2.0, w_ic=0.0, w_bc=0.0)
|
||||
# L_pde deve essere identico tra le due chiamate (stesso modello, stessi dati)
|
||||
torch.testing.assert_close(L_pde1, L_pde2, atol=1e-5, rtol=1e-4)
|
||||
torch.testing.assert_close(total2, 2.0 * total1, atol=1e-5, rtol=1e-4)
|
||||
|
||||
|
||||
def test_ic_loss_zero_when_net_is_zero(device):
|
||||
"""Con net=0 ⇒ T = T_AMB = T0 ⇒ L_ic = 0."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
_, _, L_ic, _ = heat_pinn_loss(model, *_dummy_inputs(device))
|
||||
assert L_ic.item() < 1e-8
|
||||
|
||||
|
||||
def test_bc_loss_zero_when_net_is_zero(device):
|
||||
"""Con net=0 ⇒ T = T_AMB e dT/dx = 0 ⇒ Robin BC soddisfatta ⇒ L_bc = 0."""
|
||||
model = HeatPINN().to(device)
|
||||
for p in model.parameters():
|
||||
p.data.zero_()
|
||||
_, _, _, L_bc = heat_pinn_loss(model, *_dummy_inputs(device))
|
||||
assert L_bc.item() < 1e-8
|
||||
+10
-17
@@ -1,15 +1,17 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
from plotly.subplots import make_subplots
|
||||
import config
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
ANIMATIONS_DIR = os.path.join(BASE_DIR, 'animations')
|
||||
|
||||
|
||||
def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
os.makedirs(ANIMATIONS_DIR, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
out_dir = os.path.join(BASE_DIR, 'results', 'pinn', timestamp)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
# Downsample T_fdm from shape (NX_fdm, NT_fdm) to match PINN grid
|
||||
nx_pred = len(x_vals)
|
||||
@@ -44,9 +46,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
fig_map.update_yaxes(title_text='t', row=1, col=1)
|
||||
fig_map.update_layout(title_text='Heat Equation PINN vs FDM', height=450)
|
||||
|
||||
map_path = _next_path('heatmap', '.html')
|
||||
map_path = os.path.join(out_dir, 'heatmap.html')
|
||||
fig_map.write_html(map_path)
|
||||
print(f"Heatmap saved → {map_path}")
|
||||
print(f"Heatmap saved → {map_path}")
|
||||
|
||||
# --- Animated profile T(x) evolving in time ---
|
||||
n_frames = len(t_vals)
|
||||
@@ -92,9 +94,9 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
frames=frames,
|
||||
)
|
||||
|
||||
anim_path = _next_path('heat_animation', '.html')
|
||||
anim_path = os.path.join(out_dir, 'animation.html')
|
||||
fig_anim.write_html(anim_path)
|
||||
print(f"Animation saved → {anim_path}")
|
||||
print(f"Animation saved → {anim_path}")
|
||||
|
||||
# --- Time-series comparison at fixed spatial points ---
|
||||
# Spatial indices for x=0, x=L/2, x=L
|
||||
@@ -141,15 +143,6 @@ def visualize_heat_field(T_pred, x_vals, t_vals, T_fdm):
|
||||
height=500,
|
||||
)
|
||||
|
||||
comparison_path = _next_path('comparison', '.html')
|
||||
comparison_path = os.path.join(out_dir, 'comparison.html')
|
||||
fig_ts.write_html(comparison_path)
|
||||
print(f"Time-series saved → {comparison_path}")
|
||||
|
||||
|
||||
def _next_path(prefix, ext):
|
||||
i = 1
|
||||
while True:
|
||||
path = os.path.join(ANIMATIONS_DIR, f'{prefix}_{i:03d}{ext}')
|
||||
if not os.path.exists(path):
|
||||
return path
|
||||
i += 1
|
||||
print(f"Comparison saved → {comparison_path}")
|
||||
|
||||
Reference in New Issue
Block a user