transaction: (move-only) move serialize_preimage to base class
This commit is contained in:
@@ -970,6 +970,100 @@ class Transaction:
|
|||||||
return b""
|
return b""
|
||||||
raise UnknownTxinType("cannot construct scriptSig")
|
raise UnknownTxinType("cannot construct scriptSig")
|
||||||
|
|
||||||
|
def serialize_preimage(
|
||||||
|
self,
|
||||||
|
txin_index: int,
|
||||||
|
*,
|
||||||
|
sighash_cache: SighashCache = None,
|
||||||
|
) -> bytes:
|
||||||
|
nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True)
|
||||||
|
nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False)
|
||||||
|
inputs = self.inputs()
|
||||||
|
outputs = self.outputs()
|
||||||
|
txin = inputs[txin_index]
|
||||||
|
sighash = txin.sighash
|
||||||
|
if sighash is None:
|
||||||
|
sighash = Sighash.DEFAULT if txin.is_taproot() else Sighash.ALL
|
||||||
|
if not Sighash.is_valid(sighash, is_taproot=txin.is_taproot()):
|
||||||
|
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!")
|
||||||
|
if sighash_cache is None:
|
||||||
|
sighash_cache = SighashCache()
|
||||||
|
if txin.is_segwit():
|
||||||
|
if txin.is_taproot():
|
||||||
|
scache = sighash_cache.get_witver1_data_for_tx(self)
|
||||||
|
sighash_epoch = b"\x00"
|
||||||
|
hash_type = int.to_bytes(sighash, length=1, byteorder="little", signed=False)
|
||||||
|
# txdata
|
||||||
|
preimage_txdata = bytearray()
|
||||||
|
preimage_txdata += nVersion
|
||||||
|
preimage_txdata += nLocktime
|
||||||
|
if sighash & 0x80 != Sighash.ANYONECANPAY:
|
||||||
|
preimage_txdata += scache.sha_prevouts
|
||||||
|
preimage_txdata += scache.sha_amounts
|
||||||
|
preimage_txdata += scache.sha_scriptpubkeys
|
||||||
|
preimage_txdata += scache.sha_sequences
|
||||||
|
if sighash & 3 not in (Sighash.NONE, Sighash.SINGLE):
|
||||||
|
preimage_txdata += scache.sha_outputs
|
||||||
|
# inputdata
|
||||||
|
preimage_inputdata = bytearray()
|
||||||
|
spend_type = bytes([0]) # (ext_flag * 2) + annex_present
|
||||||
|
preimage_inputdata += spend_type
|
||||||
|
if sighash & 0x80 == Sighash.ANYONECANPAY:
|
||||||
|
preimage_inputdata += txin.prevout.serialize_to_network()
|
||||||
|
preimage_inputdata += int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
|
||||||
|
preimage_inputdata += var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
|
||||||
|
preimage_inputdata += int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
|
||||||
|
else:
|
||||||
|
preimage_inputdata += int.to_bytes(txin_index, length=4, byteorder="little", signed=False)
|
||||||
|
# TODO sha_annex
|
||||||
|
# outputdata
|
||||||
|
preimage_outputdata = bytearray()
|
||||||
|
if sighash & 3 == Sighash.SINGLE:
|
||||||
|
try:
|
||||||
|
txout = outputs[txin_index]
|
||||||
|
except IndexError:
|
||||||
|
raise Exception("Using SIGHASH_SINGLE without a corresponding output") from None
|
||||||
|
# note: we could cache this to avoid some potential DOS vectors:
|
||||||
|
preimage_outputdata += sha256(txout.serialize_to_network())
|
||||||
|
return bytes(sighash_epoch + hash_type + preimage_txdata + preimage_inputdata + preimage_outputdata)
|
||||||
|
else: # segwit (witness v0)
|
||||||
|
scache = sighash_cache.get_witver0_data_for_tx(self)
|
||||||
|
if not (sighash & Sighash.ANYONECANPAY):
|
||||||
|
hashPrevouts = scache.hashPrevouts
|
||||||
|
else:
|
||||||
|
hashPrevouts = bytes(32)
|
||||||
|
if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
|
||||||
|
hashSequence = scache.hashSequence
|
||||||
|
else:
|
||||||
|
hashSequence = bytes(32)
|
||||||
|
if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
|
||||||
|
hashOutputs = scache.hashOutputs
|
||||||
|
elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs):
|
||||||
|
# note: we could cache this to avoid some potential DOS vectors:
|
||||||
|
hashOutputs = sha256d(outputs[txin_index].serialize_to_network())
|
||||||
|
else:
|
||||||
|
hashOutputs = bytes(32)
|
||||||
|
outpoint = txin.prevout.serialize_to_network()
|
||||||
|
preimage_script = self.get_preimage_script(txin)
|
||||||
|
scriptCode = var_int(len(preimage_script)) + preimage_script
|
||||||
|
amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
|
||||||
|
nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
|
||||||
|
nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
|
||||||
|
preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
|
||||||
|
return preimage
|
||||||
|
else: # legacy sighash (pre-segwit)
|
||||||
|
if sighash != Sighash.ALL:
|
||||||
|
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)")
|
||||||
|
preimage_script = self.get_preimage_script(txin)
|
||||||
|
txins = var_int(len(inputs)) + b"".join(
|
||||||
|
txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"")
|
||||||
|
for k, txin in enumerate(inputs))
|
||||||
|
txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs)
|
||||||
|
nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
|
||||||
|
preimage = nVersion + txins + txouts + nLocktime + nHashType
|
||||||
|
return preimage
|
||||||
|
raise Exception("should not reach this")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_preimage_script(cls, txin: 'PartialTxInput') -> bytes:
|
def get_preimage_script(cls, txin: 'PartialTxInput') -> bytes:
|
||||||
if txin.witness_script:
|
if txin.witness_script:
|
||||||
@@ -2246,100 +2340,6 @@ class PartialTransaction(Transaction):
|
|||||||
self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
|
self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
|
||||||
self.invalidate_ser_cache()
|
self.invalidate_ser_cache()
|
||||||
|
|
||||||
def serialize_preimage(
|
|
||||||
self,
|
|
||||||
txin_index: int,
|
|
||||||
*,
|
|
||||||
sighash_cache: SighashCache = None,
|
|
||||||
) -> bytes:
|
|
||||||
nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True)
|
|
||||||
nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False)
|
|
||||||
inputs = self.inputs()
|
|
||||||
outputs = self.outputs()
|
|
||||||
txin = inputs[txin_index]
|
|
||||||
sighash = txin.sighash
|
|
||||||
if sighash is None:
|
|
||||||
sighash = Sighash.DEFAULT if txin.is_taproot() else Sighash.ALL
|
|
||||||
if not Sighash.is_valid(sighash, is_taproot=txin.is_taproot()):
|
|
||||||
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!")
|
|
||||||
if sighash_cache is None:
|
|
||||||
sighash_cache = SighashCache()
|
|
||||||
if txin.is_segwit():
|
|
||||||
if txin.is_taproot():
|
|
||||||
scache = sighash_cache.get_witver1_data_for_tx(self)
|
|
||||||
sighash_epoch = b"\x00"
|
|
||||||
hash_type = int.to_bytes(sighash, length=1, byteorder="little", signed=False)
|
|
||||||
# txdata
|
|
||||||
preimage_txdata = bytearray()
|
|
||||||
preimage_txdata += nVersion
|
|
||||||
preimage_txdata += nLocktime
|
|
||||||
if sighash & 0x80 != Sighash.ANYONECANPAY:
|
|
||||||
preimage_txdata += scache.sha_prevouts
|
|
||||||
preimage_txdata += scache.sha_amounts
|
|
||||||
preimage_txdata += scache.sha_scriptpubkeys
|
|
||||||
preimage_txdata += scache.sha_sequences
|
|
||||||
if sighash & 3 not in (Sighash.NONE, Sighash.SINGLE):
|
|
||||||
preimage_txdata += scache.sha_outputs
|
|
||||||
# inputdata
|
|
||||||
preimage_inputdata = bytearray()
|
|
||||||
spend_type = bytes([0]) # (ext_flag * 2) + annex_present
|
|
||||||
preimage_inputdata += spend_type
|
|
||||||
if sighash & 0x80 == Sighash.ANYONECANPAY:
|
|
||||||
preimage_inputdata += txin.prevout.serialize_to_network()
|
|
||||||
preimage_inputdata += int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
|
|
||||||
preimage_inputdata += var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
|
|
||||||
preimage_inputdata += int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
|
|
||||||
else:
|
|
||||||
preimage_inputdata += int.to_bytes(txin_index, length=4, byteorder="little", signed=False)
|
|
||||||
# TODO sha_annex
|
|
||||||
# outputdata
|
|
||||||
preimage_outputdata = bytearray()
|
|
||||||
if sighash & 3 == Sighash.SINGLE:
|
|
||||||
try:
|
|
||||||
txout = outputs[txin_index]
|
|
||||||
except IndexError:
|
|
||||||
raise Exception("Using SIGHASH_SINGLE without a corresponding output") from None
|
|
||||||
# note: we could cache this to avoid some potential DOS vectors:
|
|
||||||
preimage_outputdata += sha256(txout.serialize_to_network())
|
|
||||||
return bytes(sighash_epoch + hash_type + preimage_txdata + preimage_inputdata + preimage_outputdata)
|
|
||||||
else: # segwit (witness v0)
|
|
||||||
scache = sighash_cache.get_witver0_data_for_tx(self)
|
|
||||||
if not (sighash & Sighash.ANYONECANPAY):
|
|
||||||
hashPrevouts = scache.hashPrevouts
|
|
||||||
else:
|
|
||||||
hashPrevouts = bytes(32)
|
|
||||||
if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
|
|
||||||
hashSequence = scache.hashSequence
|
|
||||||
else:
|
|
||||||
hashSequence = bytes(32)
|
|
||||||
if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
|
|
||||||
hashOutputs = scache.hashOutputs
|
|
||||||
elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs):
|
|
||||||
# note: we could cache this to avoid some potential DOS vectors:
|
|
||||||
hashOutputs = sha256d(outputs[txin_index].serialize_to_network())
|
|
||||||
else:
|
|
||||||
hashOutputs = bytes(32)
|
|
||||||
outpoint = txin.prevout.serialize_to_network()
|
|
||||||
preimage_script = self.get_preimage_script(txin)
|
|
||||||
scriptCode = var_int(len(preimage_script)) + preimage_script
|
|
||||||
amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
|
|
||||||
nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
|
|
||||||
nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
|
|
||||||
preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
|
|
||||||
return preimage
|
|
||||||
else: # legacy sighash (pre-segwit)
|
|
||||||
if sighash != Sighash.ALL:
|
|
||||||
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)")
|
|
||||||
preimage_script = self.get_preimage_script(txin)
|
|
||||||
txins = var_int(len(inputs)) + b"".join(
|
|
||||||
txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"")
|
|
||||||
for k, txin in enumerate(inputs))
|
|
||||||
txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs)
|
|
||||||
nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
|
|
||||||
preimage = nVersion + txins + txouts + nLocktime + nHashType
|
|
||||||
return preimage
|
|
||||||
raise Exception("should not reach this")
|
|
||||||
|
|
||||||
def sign(self, keypairs: Mapping[bytes, bytes]) -> None:
|
def sign(self, keypairs: Mapping[bytes, bytes]) -> None:
|
||||||
# keypairs: pubkey_bytes -> secret_bytes
|
# keypairs: pubkey_bytes -> secret_bytes
|
||||||
sighash_cache = SighashCache()
|
sighash_cache = SighashCache()
|
||||||
|
|||||||
Reference in New Issue
Block a user