from __future__ import annotations import argparse import importlib import logging import multiprocessing as mp import os import sys import time import config log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Utilities # --------------------------------------------------------------------------- def _extranonce2(base: str, idx: int) -> str: """Return `base + idx` in hex, preserving the same width.""" return f"{int(base, 16) + idx:0{len(base)}x}" # --------------------------------------------------------------------------- # Worker # --------------------------------------------------------------------------- def _worker(idx: int, base_ex2: str, q: mp.Queue) -> None: """Start a mining process and send structured events to the supervisor.""" try: os.sched_setaffinity(0, {idx}) except (AttributeError, OSError): pass # Workers send structured events through the queue; verbose logs are suppressed logging.basicConfig( level=logging.WARNING, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) main = importlib.import_module("main") try: main.main( event_queue=q, worker_idx=idx, extranonce2=_extranonce2(base_ex2, idx), ) except KeyboardInterrupt: pass # --------------------------------------------------------------------------- # Supervisor # --------------------------------------------------------------------------- def _clear_lines(n: int) -> None: for _ in range(n): sys.stdout.write("\033[F\033[K") sys.stdout.flush() def _aggregate(q: mp.Queue, n: int) -> str: """ Receive structured events from workers and update the dashboard. Return "restart" when a block is found and submitted. """ rates: list[float] = [0.0] * n attempts: list[int] = [0] * n block_hash: str | None = None winner_idx: int | None = None winner_rate: float | None = None t_start = time.time() last_print = 0.0 lines_printed = 0 while True: try: tag, idx, val = q.get(timeout=0.1) if tag == "status": rates[idx] = val["rate"] attempts[idx] = val["attempts"] elif tag == "found": winner_idx = idx winner_rate = val.get("rate") if val else None elif tag == "hash": block_hash = val elif tag == "submit": _clear_lines(lines_printed) elapsed = time.time() - t_start total_att = sum(attempts) avg_rate_k = total_att / elapsed / 1000 if elapsed else 0.0 print("=" * 78) print("[✓] BLOCK FOUND AND SUBMITTED") print(f" • Hash: {block_hash or 'N/D'}") if winner_idx is not None: print(f" • Worker: {winner_idx}") if winner_rate is not None: print(f" • Worker hashrate: {winner_rate:.2f} kH/s") print(f" • Average total hashrate: {avg_rate_k:,.2f} kH/s") print(f" • Total attempts: {total_att:,}") print("=" * 78) return "restart" except Exception: pass # empty queue now = time.time() if now - last_print >= 1.0: if lines_printed > 0: _clear_lines(lines_printed) tot_rate = sum(rates) tot_att = sum(attempts) ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now)) lines = [ f"{ts} | MINING STATUS", "=" * 40, f"Total: {tot_rate:,.2f} kH/s | Attempts: {tot_att:,}", "-" * 40, ] for i in range(n): lines.append(f"Worker {i:<2}: {rates[i]:.2f} kH/s | Attempts: {attempts[i]:,}") print("\n".join(lines), flush=True) lines_printed = len(lines) last_print = now # --------------------------------------------------------------------------- # Start/restart loop # --------------------------------------------------------------------------- def launch(n: int, base_ex2: str) -> None: log.info("Per-process extranonce2:") for i in range(n): log.info(" • Process %d: extranonce2=%s", i, _extranonce2(base_ex2, i)) while True: q = mp.Queue() workers = [ mp.Process(target=_worker, args=(i, base_ex2, q), daemon=True) for i in range(n) ] for p in workers: p.start() try: reason = _aggregate(q, n) finally: for p in workers: if p.is_alive(): p.terminate() for p in workers: p.join() if reason != "restart": break print("\nRestarting workers...\n") time.sleep(1) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser("Multiprocess launcher for main.py miner") parser.add_argument( "-n", "--num-procs", type=int, default=config.NUM_PROCESSORS, help=f"Number of workers (default: {config.NUM_PROCESSORS})", ) parser.add_argument( "--base-extranonce2", default=config.EXTRANONCE2, help=f"Hex base for EXTRANONCE2 (default: {config.EXTRANONCE2})", ) return parser.parse_args() if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) mp.set_start_method("spawn", force=True) args = _parse_args() from rpc import test_rpc_connection test_rpc_connection() print(f"\nStarting mining with {args.num_procs} processes (extranonce2 base={args.base_extranonce2})\n") launch(args.num_procs, args.base_extranonce2)