"""Common utility module with shared functions for the miner project.""" import hashlib import logging import threading import config log = logging.getLogger(__name__) def double_sha256(data: bytes) -> bytes: """Compute double SHA-256 for the given data.""" return hashlib.sha256(hashlib.sha256(data).digest()).digest() def encode_varint(value: int) -> str: """Encode a number as VarInt according to the Bitcoin protocol.""" if value < 0xFD: return value.to_bytes(1, "little").hex() if value <= 0xFFFF: return "fd" + value.to_bytes(2, "little").hex() if value <= 0xFFFFFFFF: return "fe" + value.to_bytes(4, "little").hex() if value <= 0xFFFFFFFFFFFFFFFF: return "ff" + value.to_bytes(8, "little").hex() raise ValueError("Value exceeds maximum VarInt limit (2^64-1)") def decode_nbits(nBits: int) -> str: """Decode the nBits field into a 256-bit target in hexadecimal format.""" exponent = (nBits >> 24) & 0xFF significand = nBits & 0x007FFFFF return f"{(significand << (8 * (exponent - 3))):064x}" def calculate_target(template, difficulty_factor: float, network: str) -> str: """Compute a modified target based on network and difficulty factor.""" if network == "regtest": if difficulty_factor < 0: difficulty_factor = 0.1 else: difficulty_factor = 1.0 nBits_int = int(template["bits"], 16) original_target = decode_nbits(nBits_int) if difficulty_factor == 0: return original_target max_target = 0x00000000FFFF0000000000000000000000000000000000000000000000000000 target_value = int(max_target / difficulty_factor) return f"{min(target_value, (1 << 256) - 1):064x}" def watchdog_bestblock( rpc_conn, stop_event: threading.Event, new_block_event: threading.Event, get_best_block_hash_func, ) -> None: """ Periodically check if there is a new best block. When detected, set new_block_event and stop_event to interrupt current mining within the next batch. """ log.info("Watchdog started.") try: last_hash = get_best_block_hash_func(rpc_conn) except Exception as e: log.error("Watchdog: unable to get initial hash: %s", e) return while not stop_event.wait(config.CHECK_INTERVAL): try: new_hash = get_best_block_hash_func(rpc_conn) if new_hash and new_hash != last_hash: log.info("New best block: %s", new_hash) last_hash = new_hash new_block_event.set() stop_event.set() # interrupt current miner return except Exception as e: log.error("Watchdog error: %s", e) log.info("Watchdog stopped.")