pem.py, x509.py: rm unused code
now that bip70 support is removed, all this is only used to check expiration of self-signed electrum server certs in interface.py
This commit is contained in:
@@ -30,8 +30,6 @@
|
|||||||
|
|
||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
from .x509 import ASN1_Node, bytestr_to_int, decode_OID
|
|
||||||
|
|
||||||
|
|
||||||
def a2b_base64(s):
|
def a2b_base64(s):
|
||||||
try:
|
try:
|
||||||
@@ -40,9 +38,6 @@ def a2b_base64(s):
|
|||||||
raise SyntaxError("base64 error: %s" % e)
|
raise SyntaxError("base64 error: %s" % e)
|
||||||
return b
|
return b
|
||||||
|
|
||||||
def b2a_base64(b):
|
|
||||||
return binascii.b2a_base64(b)
|
|
||||||
|
|
||||||
|
|
||||||
def dePem(s, name):
|
def dePem(s, name):
|
||||||
"""Decode a PEM string into a bytearray of its payload.
|
"""Decode a PEM string into a bytearray of its payload.
|
||||||
@@ -109,83 +104,3 @@ def dePemList(s, name):
|
|||||||
bList.append(retBytes)
|
bList.append(retBytes)
|
||||||
s = s[end+len(postfix) : ]
|
s = s[end+len(postfix) : ]
|
||||||
|
|
||||||
def pem(b, name):
|
|
||||||
"""Encode a payload bytearray into a PEM string.
|
|
||||||
|
|
||||||
The input will be base64 encoded, then wrapped in a PEM prefix/postfix
|
|
||||||
based on the name string, e.g. for name="CERTIFICATE":
|
|
||||||
|
|
||||||
-----BEGIN CERTIFICATE-----
|
|
||||||
MIIBXDCCAUSgAwIBAgIBADANBgkqhkiG9w0BAQUFADAPMQ0wCwYDVQQDEwRUQUNL
|
|
||||||
...
|
|
||||||
KoZIhvcNAQEFBQADAwA5kw==
|
|
||||||
-----END CERTIFICATE-----
|
|
||||||
"""
|
|
||||||
s1 = b2a_base64(b)[:-1] # remove terminating \n
|
|
||||||
s2 = b""
|
|
||||||
while s1:
|
|
||||||
s2 += s1[:64] + b"\n"
|
|
||||||
s1 = s1[64:]
|
|
||||||
s = ("-----BEGIN %s-----\n" % name).encode('ascii') + s2 + \
|
|
||||||
("-----END %s-----\n" % name).encode('ascii')
|
|
||||||
return s
|
|
||||||
|
|
||||||
def pemSniff(inStr, name):
|
|
||||||
searchStr = "-----BEGIN %s-----" % name
|
|
||||||
return searchStr in inStr
|
|
||||||
|
|
||||||
|
|
||||||
def parse_private_key(s):
|
|
||||||
"""Parse a string containing a PEM-encoded <privateKey>."""
|
|
||||||
if pemSniff(s, "PRIVATE KEY"):
|
|
||||||
bytes = dePem(s, "PRIVATE KEY")
|
|
||||||
return _parsePKCS8(bytes)
|
|
||||||
elif pemSniff(s, "RSA PRIVATE KEY"):
|
|
||||||
bytes = dePem(s, "RSA PRIVATE KEY")
|
|
||||||
return _parseSSLeay(bytes)
|
|
||||||
else:
|
|
||||||
raise SyntaxError("Not a PEM private key file")
|
|
||||||
|
|
||||||
|
|
||||||
def _parsePKCS8(_bytes):
|
|
||||||
s = ASN1_Node(_bytes)
|
|
||||||
root = s.root()
|
|
||||||
version_node = s.first_child(root)
|
|
||||||
version = bytestr_to_int(s.get_value_of_type(version_node, 'INTEGER'))
|
|
||||||
if version != 0:
|
|
||||||
raise SyntaxError("Unrecognized PKCS8 version")
|
|
||||||
rsaOID_node = s.next_node(version_node)
|
|
||||||
ii = s.first_child(rsaOID_node)
|
|
||||||
rsaOID = decode_OID(s.get_value_of_type(ii, 'OBJECT IDENTIFIER'))
|
|
||||||
if rsaOID != '1.2.840.113549.1.1.1':
|
|
||||||
raise SyntaxError("Unrecognized AlgorithmIdentifier")
|
|
||||||
privkey_node = s.next_node(rsaOID_node)
|
|
||||||
value = s.get_value_of_type(privkey_node, 'OCTET STRING')
|
|
||||||
return _parseASN1PrivateKey(value)
|
|
||||||
|
|
||||||
|
|
||||||
def _parseSSLeay(bytes):
|
|
||||||
return _parseASN1PrivateKey(ASN1_Node(bytes))
|
|
||||||
|
|
||||||
|
|
||||||
def bytesToNumber(s):
|
|
||||||
return int(binascii.hexlify(s), 16)
|
|
||||||
|
|
||||||
|
|
||||||
def _parseASN1PrivateKey(s):
|
|
||||||
s = ASN1_Node(s)
|
|
||||||
root = s.root()
|
|
||||||
version_node = s.first_child(root)
|
|
||||||
version = bytestr_to_int(s.get_value_of_type(version_node, 'INTEGER'))
|
|
||||||
if version != 0:
|
|
||||||
raise SyntaxError("Unrecognized RSAPrivateKey version")
|
|
||||||
n = s.next_node(version_node)
|
|
||||||
e = s.next_node(n)
|
|
||||||
d = s.next_node(e)
|
|
||||||
p = s.next_node(d)
|
|
||||||
q = s.next_node(p)
|
|
||||||
dP = s.next_node(q)
|
|
||||||
dQ = s.next_node(dP)
|
|
||||||
qInv = s.next_node(dQ)
|
|
||||||
return list(map(lambda x: bytesToNumber(s.get_value_of_type(x, 'INTEGER')), [n, e, d, p, q, dP, dQ, qInv]))
|
|
||||||
|
|
||||||
|
|||||||
+10
-23
@@ -100,18 +100,6 @@ def decode_OID(s):
|
|||||||
return '.'.join(map(str, r))
|
return '.'.join(map(str, r))
|
||||||
|
|
||||||
|
|
||||||
def encode_OID(oid):
|
|
||||||
x = [int(i) for i in oid.split('.')]
|
|
||||||
s = chr(x[0] * 40 + x[1])
|
|
||||||
for i in x[2:]:
|
|
||||||
ss = chr(i % 128)
|
|
||||||
while i > 128:
|
|
||||||
i //= 128
|
|
||||||
ss = chr(128 + i % 128) + ss
|
|
||||||
s += ss
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
class ASN1_Node(bytes):
|
class ASN1_Node(bytes):
|
||||||
def get_node(self, ix):
|
def get_node(self, ix):
|
||||||
# return index of first byte, first content byte and last byte.
|
# return index of first byte, first content byte and last byte.
|
||||||
@@ -315,8 +303,11 @@ class X509(object):
|
|||||||
|
|
||||||
|
|
||||||
@profiler
|
@profiler
|
||||||
def load_certificates(ca_path):
|
def load_ca_certs():
|
||||||
|
# kept for console use
|
||||||
|
import certifi
|
||||||
from . import pem
|
from . import pem
|
||||||
|
ca_path = certifi.where()
|
||||||
ca_list = {}
|
ca_list = {}
|
||||||
ca_keyID = {}
|
ca_keyID = {}
|
||||||
# ca_path = '/tmp/tmp.txt'
|
# ca_path = '/tmp/tmp.txt'
|
||||||
@@ -326,22 +317,18 @@ def load_certificates(ca_path):
|
|||||||
for b in bList:
|
for b in bList:
|
||||||
try:
|
try:
|
||||||
x = X509(b)
|
x = X509(b)
|
||||||
x.check_date()
|
except Exception as e:
|
||||||
except BaseException as e:
|
|
||||||
# with open('/tmp/tmp.txt', 'w') as f:
|
# with open('/tmp/tmp.txt', 'w') as f:
|
||||||
# f.write(pem.pem(b, 'CERTIFICATE').decode('ascii'))
|
# f.write(pem.pem(b, 'CERTIFICATE').decode('ascii'))
|
||||||
_logger.info(f"cert error: {e}")
|
_logger.info(f"cert error: {e}")
|
||||||
continue
|
continue
|
||||||
|
try:
|
||||||
|
x.check_date()
|
||||||
|
except CertificateError as e:
|
||||||
|
_logger.info(f"cert has expired: {e}")
|
||||||
|
continue
|
||||||
fp = x.getFingerprint()
|
fp = x.getFingerprint()
|
||||||
ca_list[fp] = x
|
ca_list[fp] = x
|
||||||
ca_keyID[x.get_keyID()] = fp
|
ca_keyID[x.get_keyID()] = fp
|
||||||
|
|
||||||
return ca_list, ca_keyID
|
return ca_list, ca_keyID
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import certifi
|
|
||||||
|
|
||||||
ca_path = certifi.where()
|
|
||||||
ca_list, ca_keyID = load_certificates(ca_path)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user