blockchain: generalize difficulty adjustment for per-chain PoW constants

get_target(): replace hardcoded Bitcoin constants (CHUNK_SIZE, 14-day
timespan, module-level MAX_TARGET) with per-chain values from constants.net.
Handle POW_GENESIS_BITS so that chains whose genesis nBits differs from
target_to_bits(MAX_TARGET) return the correct initial difficulty for period 0.
Map checkpoint indices correctly when adj_interval != CHUNK_SIZE.

verify_chunk(): add a separate code path for chains where the retarget
interval is shorter than CHUNK_SIZE (e.g. BTCP: 120 vs 2016).  In this
case multiple retargets can occur within a single chunk; because the headers
are not yet on disk during verification, reading via read_header() would
raise MissingHeader and reject the entire chunk.  Fix by reading from the
in-memory data buffer via a local helper _read_hdr(), and tracking
current_target across period boundaries inline.

can_connect(), chainwork_of_header_at_height(): use adj_interval instead of
CHUNK_SIZE when computing the difficulty-period index so that BTCP's 120-block
retarget windows are respected
This commit is contained in:
2026-04-29 10:08:09 +02:00
parent e0d04af154
commit d1088c036e
+85 -17
View File
@@ -321,20 +321,68 @@ class Blockchain(Logger):
raise InvalidHeader(f"insufficient proof of work: {pow_hash_as_num} vs target {target}")
def verify_chunk(self, index: int, data: bytes) -> None:
adj_interval = constants.net.DIFFICULTY_ADJUSTMENT_INTERVAL
num = len(data) // HEADER_SIZE
start_height = index * CHUNK_SIZE
prev_hash = self.get_hash(start_height - 1)
target = self.get_target(index-1)
if adj_interval == CHUNK_SIZE:
# Standard Bitcoin: one target per chunk, retargets align with chunk boundaries.
target = self.get_target(index - 1)
for i in range(num):
height = start_height + i
try:
expected_header_hash = self.get_hash(height)
except MissingHeader:
expected_header_hash = None
raw_header = data[i*HEADER_SIZE : (i+1)*HEADER_SIZE]
header = deserialize_header(raw_header, index*CHUNK_SIZE + i)
raw_header = data[i * HEADER_SIZE:(i + 1) * HEADER_SIZE]
header = deserialize_header(raw_header, height)
self.verify_header(header, prev_hash, target, expected_header_hash)
prev_hash = hash_header(header)
else:
# Shorter retarget interval (e.g. BTCP 120 blocks): multiple retargets
# can occur within a single chunk. Headers being verified are not yet on
# disk, so we must read them from the data buffer rather than via
# read_header(), which would return None and raise MissingHeader.
def _read_hdr(height: int) -> Optional[dict]:
if height < start_height:
return self.read_header(height)
offset = height - start_height
if 0 <= offset < num:
return deserialize_header(
data[offset * HEADER_SIZE:(offset + 1) * HEADER_SIZE], height)
return None
# Initialise target from the retarget period that covers start_height.
# get_target(-1) returns MAX_TARGET, so period 0 is handled correctly.
current_target = self.get_target(start_height // adj_interval - 1)
for i in range(num):
height = start_height + i
# Retarget at every period boundary (except genesis).
if height > 0 and height % adj_interval == 0:
first_h = _read_hdr(height - adj_interval)
last_h = _read_hdr(height - 1)
if not first_h or not last_h:
raise MissingHeader()
old_target = self.bits_to_target(last_h['bits'])
target_timespan = constants.net.POW_TARGET_TIMESPAN
max_factor = constants.net.MAX_ADJUSTMENT_FACTOR
max_target = constants.net.MAX_TARGET
timespan = last_h['timestamp'] - first_h['timestamp']
timespan = max(timespan, target_timespan // max_factor)
timespan = min(timespan, target_timespan * max_factor)
new_target = min(max_target, (old_target * timespan) // target_timespan)
current_target = self.bits_to_target(self.target_to_bits(new_target))
try:
expected_header_hash = self.get_hash(height)
except MissingHeader:
expected_header_hash = None
raw_header = data[i * HEADER_SIZE:(i + 1) * HEADER_SIZE]
header = deserialize_header(raw_header, start_height + i)
self.verify_header(header, prev_hash, current_target, expected_header_hash)
prev_hash = hash_header(header)
@with_lock
def path(self):
@@ -530,26 +578,44 @@ class Blockchain(Logger):
return hash_header(header)
def get_target(self, index: int) -> int:
# compute target from chunk x, used in chunk x+1
# index: difficulty-adjustment-period index (units of DIFFICULTY_ADJUSTMENT_INTERVAL blocks)
# returns the expected target for period index+1 (i.e. the target computed from period index)
adj_interval = constants.net.DIFFICULTY_ADJUSTMENT_INTERVAL
max_target = constants.net.MAX_TARGET
target_timespan = constants.net.POW_TARGET_TIMESPAN
max_factor = constants.net.MAX_ADJUSTMENT_FACTOR
if constants.net.TESTNET:
return 0
if index == -1:
return MAX_TARGET
if index < len(self.checkpoints):
h, t = self.checkpoints[index]
# Use the genesis nBits if it differs from target_to_bits(MAX_TARGET).
# For Bitcoin they are equal; for BTCP the genesis is 0x1e0ffff0 while
# powLimit encodes to 0x1e0fffff, so we must use the explicit value.
genesis_bits = constants.net.POW_GENESIS_BITS
if genesis_bits is not None:
return self.bits_to_target(genesis_bits)
return max_target
# Checkpoints are indexed in units of CHUNK_SIZE blocks.
# When adj_interval == CHUNK_SIZE (Bitcoin) they map 1:1.
# For other chains (e.g. BTCP, adj_interval=120) we compute the
# checkpoint chunk that covers the start of this period.
if adj_interval == CHUNK_SIZE:
cp_idx = index
else:
cp_idx = (index * adj_interval) // CHUNK_SIZE - 1
if 0 <= cp_idx < len(self.checkpoints):
h, t = self.checkpoints[cp_idx]
return t
# new target
first = self.read_header(index * CHUNK_SIZE)
last = self.read_header((index+1) * CHUNK_SIZE - 1)
# compute new target from the headers spanning period `index`
first = self.read_header(index * adj_interval)
last = self.read_header((index + 1) * adj_interval - 1)
if not first or not last:
raise MissingHeader()
bits = last.get('bits')
target = self.bits_to_target(bits)
nActualTimespan = last.get('timestamp') - first.get('timestamp')
nTargetTimespan = 14 * 24 * 60 * 60
nActualTimespan = max(nActualTimespan, nTargetTimespan // 4)
nActualTimespan = min(nActualTimespan, nTargetTimespan * 4)
new_target = min(MAX_TARGET, (target * nActualTimespan) // nTargetTimespan)
nActualTimespan = max(nActualTimespan, target_timespan // max_factor)
nActualTimespan = min(nActualTimespan, target_timespan * max_factor)
new_target = min(max_target, (target * nActualTimespan) // target_timespan)
# not any target can be represented in 32 bits:
new_target = self.bits_to_target(self.target_to_bits(new_target))
return new_target
@@ -594,8 +660,9 @@ class Blockchain(Logger):
def chainwork_of_header_at_height(self, height: int) -> int:
"""work done by single header at given height"""
chunk_idx = height // CHUNK_SIZE - 1
target = self.get_target(chunk_idx)
adj_interval = constants.net.DIFFICULTY_ADJUSTMENT_INTERVAL
period_idx = height // adj_interval - 1
target = self.get_target(period_idx)
work = ((2 ** 256 - target - 1) // (target + 1)) + 1
return work
@@ -641,7 +708,8 @@ class Blockchain(Logger):
if prev_hash != header.get('prev_block_hash'):
return False
try:
target = self.get_target(height // CHUNK_SIZE - 1)
adj_interval = constants.net.DIFFICULTY_ADJUSTMENT_INTERVAL
target = self.get_target(height // adj_interval - 1)
except MissingHeader:
return False
try: