Files
palladum-lightning/tests/fixtures.py
Rusty Russell 369338347d pytest: add fixture for checking packet sizes.
This requires access to dumpcap.  On Ubuntu, at least, this means you
need to be in the "wireshark" group.

We may also need:
	sudo ethtool -K lo gro off gso off tso off

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
2026-02-18 14:13:25 +10:30

194 lines
6.3 KiB
Python

from utils import TEST_NETWORK, VALGRIND # noqa: F401,F403
from pyln.testing.fixtures import directory, test_base_dir, test_name, chainparams, node_factory, bitcoind, teardown_checks, db_provider, executor, setup_logging, jsonschemas # noqa: F401,F403
from pyln.testing import utils
from utils import COMPAT
from pathlib import Path
import os
import pytest
import re
import shutil
import subprocess
import tempfile
import time
@pytest.fixture
def node_cls():
return LightningNode
class LightningNode(utils.LightningNode):
def __init__(self, *args, **kwargs):
# Yes, we really want to test the local development version, not
# something in out path.
kwargs["executable"] = "lightningd/lightningd"
utils.LightningNode.__init__(self, *args, **kwargs)
# Avoid socket path name too long on Linux
if os.uname()[0] == 'Linux' and \
len(str(self.lightning_dir / TEST_NETWORK / 'lightning-rpc')) >= 108:
self.daemon.opts['rpc-file'] = '/proc/self/cwd/lightning-rpc'
# This is a recent innovation, and we don't want to nail pyln-testing to this version.
self.daemon.opts['dev-crash-after'] = 3600
# We have some valgrind suppressions in the `tests/`
# directory, so we can add these to the valgrind configuration
# (not generally true when running pyln-testing, hence why
# it's being done in this specialization, and not in the
# library).
if self.daemon.cmd_line[0] == 'valgrind':
suppressions_path = Path(__file__).parent / "valgrind-suppressions.txt"
self.daemon.cmd_prefix += [
f"--suppressions={suppressions_path}",
"--gen-suppressions=all"
]
# If we opted into checking the DB statements we will attach the dblog
# plugin before starting the node
check_dblog = os.environ.get("TEST_CHECK_DBSTMTS", None) == "1"
db_type = os.environ.get("TEST_DB_PROVIDER", "sqlite3")
if db_type == 'sqlite3' and check_dblog:
dblog = os.path.join(os.path.dirname(__file__), 'plugins', 'dblog.py')
has_dblog = len([o for o in self.daemon.cmd_line if 'dblog.py' in o]) > 0
if not has_dblog:
# Add as an expanded option so we don't clobber other options.
self.daemon.opts['plugin={}'.format(dblog)] = None
self.daemon.opts['dblog-file'] = 'dblog.sqlite3'
if db_type == 'postgres' and ('disable-plugin', 'bookkeeper') not in self.daemon.opts.items():
accts_db = self.db.provider.get_db('', 'accounts', 0)
self.daemon.opts['bookkeeper-db'] = accts_db.get_dsn()
class CompatLevel(object):
"""An object that encapsulates the compat-level of our build.
"""
def __init__(self):
makefile = os.path.join(os.path.dirname(__file__), "..", "Makefile")
with open(makefile, 'r') as f:
lines = [l for l in f if l.startswith('COMPAT_CFLAGS')]
assert(len(lines) == 1)
line = lines[0]
flags = re.findall(r'COMPAT_V([0-9]+)=1', line)
self.compat_flags = flags
def __call__(self, version):
return COMPAT and version in self.compat_flags
@pytest.fixture
def compat():
return CompatLevel()
def is_compat(version):
compat = CompatLevel()
return compat(version)
def dumpcap_usable():
def have_binary(name):
return shutil.which(name) is not None
if not have_binary("dumpcap") or not have_binary("tshark"):
return False
try:
with tempfile.TemporaryDirectory() as td:
pcap = Path(td) / "probe.pcap"
proc = subprocess.Popen(
[
"dumpcap",
"-i", "lo",
"-w", str(pcap),
"-f", "tcp",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(0.2)
proc.terminate()
proc.wait(timeout=1)
return pcap.exists() and pcap.stat().st_size > 0
except (PermissionError, subprocess.SubprocessError, OSError):
return False
@pytest.fixture(scope="session")
def have_pcap_tools():
if not dumpcap_usable():
pytest.skip("dumpcap/tshark not available or insufficient privileges")
class TcpCapture:
def __init__(self, tmpdir):
self.tmpdir = Path(tmpdir)
self.pcap = self.tmpdir / "traffic.pcap"
self.proc = None
self.port = None
def start(self, port):
assert self.proc is None, "capture already started"
self.port = int(port)
self.proc = subprocess.Popen(
[
"dumpcap",
"-i", "lo",
"-w", str(self.pcap),
"-f", f"tcp port {self.port}",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# allow filter attach
time.sleep(0.2)
def stop(self):
if self.proc:
self.proc.terminate()
self.proc.wait(timeout=2)
self.proc = None
def assert_constant_payload(self):
tshark_cmd = [
"tshark",
"-r", str(self.pcap),
"-Y", "tcp.len > 0",
"-T", "fields",
"-e", "tcp.len",
]
out = subprocess.check_output(tshark_cmd, text=True)
lengths = [int(x) for x in out.splitlines() if x.strip()]
assert lengths, f"No TCP payload packets captured on port {self.port}"
uniq = set(lengths)
assert len(uniq) == 1, (
f"Non-constant TCP payload sizes on port {self.port}: "
f"{sorted(uniq)}:"
+ subprocess.check_output(["tshark", "-r", str(self.pcap)], text=True)
)
@pytest.fixture
def tcp_capture(have_pcap_tools, tmp_path):
# You will need permissions. Most distributions have a group which has
# permissions to use dumpcap:
# $ ls -l /usr/bin/dumpcap
# -rwxr-xr-- 1 root wireshark 229112 Apr 16 2024 /usr/bin/dumpcap
# $ getcap /usr/bin/dumpcap
# /usr/bin/dumpcap cap_net_admin,cap_net_raw=eip
# So you just need to be in the wireshark group.
cap = TcpCapture(tmp_path)
yield cap
cap.stop()
cap.assert_constant_payload()