python文件压缩加解密过程sm4版

2025-06-19 13:54:05 by wst

python高级

需求:

要求使用国密sm4对文件进行加解密;

 

方法:

上一篇文章

 

代码:

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@文件        :sm4.py
@说明        :国密SM4-GCM加密/解密实现
@时间        :2025/06/18 16:03:06
@作者        :wanshitao
@版本        :1.0
'''
import os
import hmac
import time
import json
import base64
from gmssl import sm4, sm3, func
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP


class SM4GCM:
    def __init__(self, key, iv):
        """
        初始化SM4-GCM加密器
        :param key: 16字节的密钥
        :param iv: 16字节的初始向量
        """
        self.key = key
        self.iv = iv

    def encrypt(self, plaintext, aad=b""):
        """
        GCM模式加密
        :param plaintext: 明文数据,二进制字符串
        :param aad: 附加认证数据(可选)
        :return: (ciphertext, tag)
        """
        # 初始化加密器
        cipher = sm4.CryptSM4()
        cipher.set_key(self.key, sm4.SM4_ENCRYPT)
        # 加密数据(使用CBC模式)
        ciphertext = cipher.crypt_cbc(self.iv, plaintext)
        # 计算认证标签
        tag = self._compute_gmac(self.key, self.iv, ciphertext, aad)

        return ciphertext + tag  # 返回密文和认证标签的组合

    def decrypt(self, encrypted_data, aad=b""):
        """
        GCM模式解密
        :param encrypted_data: 二进制制的密文
        :param aad: 附加认证数据(可选)
        :return: 明文
        """
        # 分离加密数据和认证标签
        tag = encrypted_data[-32:]  # 最后16字节是认证标签
        ciphertext = encrypted_data[:-32]  # 其余部分是加密数据

        # 验证认证标签
        expected_tag = self._compute_gmac(self.key, self.iv, ciphertext, aad)
        if not self._constant_time_compare(tag, expected_tag):
            raise ValueError("Tag verification failed")
        # 初始化解密器
        cipher = sm4.CryptSM4()
        cipher.set_key(self.key, sm4.SM4_DECRYPT)
        # 解密数据(使用CBC模式)
        plaintext = cipher.crypt_cbc(self.iv, ciphertext)
        return plaintext

    @staticmethod
    def _compute_gmac(key, iv, ciphertext, aad):
        """
        计算GMAC认证标签
        :param key: 密钥
        :param iv: 初始向量
        :param ciphertext: 密文
        :param aad: 附加认证数据
        :return: 认证标签
        """
        # 使用HMAC-SM3代替GMAC
        data_to_hash = iv + aad + ciphertext
        # 创建一个符合hashlib接口的SM3哈希函数
        class SM3Hash:
            def __init__(self):
                self._hash = []
                self.digest_size = 32  # SM3哈希值的长度为32字节
                self.block_size = 64  # SM3的块大小为64字节

            def update(self, msg):
                self._hash.extend(func.bytes_to_list(msg))

            def digest(self):
                return bytes.fromhex(sm3.sm3_hash(self._hash))

            def copy(self):
                # 创建一个新的SM3Hash对象并复制当前状态
                copy_obj = SM3Hash()
                copy_obj._hash = self._hash[:]
                return copy_obj

        # 使用HMAC
        hmac_sm3 = hmac.new(key, data_to_hash, digestmod=SM3Hash)
        return hmac_sm3.digest()

    @staticmethod
    def _constant_time_compare(a, b):
        """
        恒定时间比较两个字节序列
        :param a: 字节序列1
        :param b: 字节序列2
        :return: 如果相等返回True,否则返回False
        """
        if len(a) != len(b):
            return False
        result = 0
        for x, y in zip(a, b):
            result |= x ^ y
        return result == 0


def generate_rsa_keys(key_size=2048):
    """生成RSA公钥/私钥对"""
    key = RSA.generate(key_size)
    private_key = key.export_key()
    public_key = key.publickey().export_key()
    return private_key, public_key

def rsa_encrypt(data, public_key):
    """使用RSA公钥加密数据"""
    rsa_key = RSA.import_key(public_key)
    cipher = PKCS1_OAEP.new(rsa_key)
    return cipher.encrypt(data)

def rsa_decrypt(encrypted_data, private_key):
    """使用RSA私钥解密数据"""
    rsa_key = RSA.import_key(private_key)
    cipher = PKCS1_OAEP.new(rsa_key)
    return cipher.decrypt(encrypted_data)

def encrypt_file(input_path, public_key, output_path=None):
    """
    SM4-GCM加密文件
    返回:
    {
        "encrypted_key": "RSA加密的SM4密钥(B64)",
        "nonce": "Nonce(B64)",
        "encrypted_data": "加密文件路径或B64数据",
        "timestamp": "加密时间"
    }
    """
    start_time = time.time()

    # 读取文件内容
    with open(input_path, 'rb') as f:
        file_data = f.read()

    # 生成随机SM4密钥和nonce
    sm4_key = os.urandom(16)
    nonce = os.urandom(16)

    # 使用SM4-GCM模式加密数据
    cipher = SM4GCM(sm4_key, nonce)
    encrypted_data = cipher.encrypt(file_data)  # 包含加密数据+标签

    # 用RSA公钥加密SM4密钥
    encrypted_sm4_key = rsa_encrypt(sm4_key, public_key)

    # 准备结果JSON
    result = {
        "encrypted_key": base64.b64encode(encrypted_sm4_key).decode('utf-8'),
        "nonce": base64.b64encode(nonce).decode('utf-8'),
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
        "original_size": len(file_data),
        "encrypted_size": len(encrypted_data)
    }

    # 输出到文件或返回Base64数据
    if output_path:
        with open(output_path, 'wb') as f:
            f.write(encrypted_data)
        result["encrypted_data"] = output_path
        result["source_file"] = input_path
    else:
        result["encrypted_data"] = base64.b64encode(encrypted_data).decode('utf-8')

    duration = time.time() - start_time
    result["encryption_time"] = f"{duration:.4f}秒"

    return result

def decrypt_file(metadata, private_key, output_path=None):
    """
    SM4-GCM解密文件
    输入: encrypt_file返回的JSON数据
    """
    start_time = time.time()

    # 从JSON中提取必要信息
    encrypted_sm4_key = base64.b64decode(metadata["encrypted_key"])
    nonce = base64.b64decode(metadata["nonce"])

    # 获取加密数据
    if isinstance(metadata["encrypted_data"], str) and os.path.exists(metadata["encrypted_data"]):
        # 从文件读取
        with open(metadata["encrypted_data"], 'rb') as f:
            encrypted_data = f.read()
    elif "encrypted_data" in metadata:
        # 从Base64数据读取
        encrypted_data = base64.b64decode(metadata["encrypted_data"])
    else:
        raise ValueError("缺少加密数据")

    # 用RSA私钥解密SM4密钥
    sm4_key = rsa_decrypt(encrypted_sm4_key, private_key)

    # 使用SM4-GCM解密数据
    cipher = SM4GCM(sm4_key, nonce)
    try:
        plaintext = cipher.decrypt(encrypted_data)
    except ValueError as e:
        result = {"status": "failed", "error": str(e)}
        return result

    # 结果数据
    result = {
        "status": "success",
        "decryption_time": f"{time.time() - start_time:.4f}秒",
        "original_size": metadata["original_size"],
        "decrypted_size": len(plaintext)
    }

    # 写入输出文件或返回Base64数据
    if output_path:
        with open(output_path, 'wb') as f:
            f.write(plaintext)
        result["decrypted_file"] = output_path
        if "source_file" in metadata:
            result["source_file"] = metadata["source_file"]
    else:
        result["decrypted_data"] = base64.b64encode(plaintext).decode('utf-8')

    return result

# ==================== 使用示例 ====================
if __name__ == "__main__":
    print("国密SM4-GCM加密方案 (密钥分离版)")
    print("加密数据仅包含SM4-GCM输出,密钥通过JSON传输")

    # 生成密钥对
    print("生成2048位RSA密钥对...")
    private_key, public_key = generate_rsa_keys()

    # 保存密钥
    with open("private_key.pem", "wb") as f:
        f.write(private_key)
    with open("public_key.pem", "wb") as f:
        f.write(public_key)

    # 创建测试文件
    test_file = "测试文件.txt"
    with open(test_file, "w", encoding="utf-8") as f:
        f.write("国密SM4-GCM分离密钥方案测试文件\n")
        f.write("加密数据文件只包含SM4加密输出\n")
        f.write("密钥信息存储在JSON文件中\n")
        f.write("=" * 50 + "\n")
        f.write("机密级别:绝密\n")

    # 加密文件
    print("\n加密文件...")
    encryption_result = encrypt_file(
        input_path=test_file,
        public_key=public_key,
        output_path="加密数据.bin"
    )

    # 保存加密元数据
    with open("加密信息.json", "w", encoding="utf-8") as f:
        json.dump(encryption_result, f, indent=2, ensure_ascii=False)

    print("加密完成,加密信息:")
    print(json.dumps(encryption_result, indent=2, ensure_ascii=False))

    # 解密文件
    print("\n解密文件...")
    with open("加密信息.json", "r", encoding="utf-8") as f:
        metadata = json.load(f)

    decryption_result = decrypt_file(
        metadata=metadata,
        private_key=private_key,
        output_path="解密文件.txt"
    )

    print("解密结果:")
    print(json.dumps(decryption_result, indent=2, ensure_ascii=False))

    # 验证解密内容
    print("\n验证解密内容:")
    with open("解密文件.txt", "r", encoding="utf-8") as f:
        print(f.read())

    print("\n测试篡改检测...")
    # 篡改加密文件
    with open("加密数据.bin", "rb") as f:
        file_data = f.read()

    # 在加密数据部分修改一个字节
    tampered_data = file_data[:100] + bytes([file_data[100] ^ 0x01]) + file_data[101:]

    with open("篡改数据.bin", "wb") as f:
        f.write(tampered_data)

    # 创建篡改的元数据
    tampered_metadata = metadata.copy()
    tampered_metadata["encrypted_data"] = "篡改数据.bin"

    # 尝试解密被篡改的文件
    print("尝试解密被篡改的文件...")
    try:
        tamper_result = decrypt_file(
            metadata=tampered_metadata,
            private_key=private_key,
            output_path="篡改解密.txt"
        )
        print("解密结果:", json.dumps(tamper_result, indent=2))
    except Exception as e:
        print(f"解密失败: {e}")

    print("\n所有操作完成")

 


Comments(0) Add Your Comment

Not Comment!