2025-06-19 13:54:05 by wst
python高级需求:
要求使用国密sm4对文件进行加解密;
方法:
代码:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@文件 :sm4.py
@说明 :国密SM4-GCM加密/解密实现
@时间 :2025/06/18 16:03:06
@作者 :wanshitao
@版本 :1.0
'''
import os
import hmac
import time
import json
import base64
from gmssl import sm4, sm3, func
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
class SM4GCM:
def __init__(self, key, iv):
"""
初始化SM4-GCM加密器
:param key: 16字节的密钥
:param iv: 16字节的初始向量
"""
self.key = key
self.iv = iv
def encrypt(self, plaintext, aad=b""):
"""
GCM模式加密
:param plaintext: 明文数据,二进制字符串
:param aad: 附加认证数据(可选)
:return: (ciphertext, tag)
"""
# 初始化加密器
cipher = sm4.CryptSM4()
cipher.set_key(self.key, sm4.SM4_ENCRYPT)
# 加密数据(使用CBC模式)
ciphertext = cipher.crypt_cbc(self.iv, plaintext)
# 计算认证标签
tag = self._compute_gmac(self.key, self.iv, ciphertext, aad)
return ciphertext + tag # 返回密文和认证标签的组合
def decrypt(self, encrypted_data, aad=b""):
"""
GCM模式解密
:param encrypted_data: 二进制制的密文
:param aad: 附加认证数据(可选)
:return: 明文
"""
# 分离加密数据和认证标签
tag = encrypted_data[-32:] # 最后16字节是认证标签
ciphertext = encrypted_data[:-32] # 其余部分是加密数据
# 验证认证标签
expected_tag = self._compute_gmac(self.key, self.iv, ciphertext, aad)
if not self._constant_time_compare(tag, expected_tag):
raise ValueError("Tag verification failed")
# 初始化解密器
cipher = sm4.CryptSM4()
cipher.set_key(self.key, sm4.SM4_DECRYPT)
# 解密数据(使用CBC模式)
plaintext = cipher.crypt_cbc(self.iv, ciphertext)
return plaintext
@staticmethod
def _compute_gmac(key, iv, ciphertext, aad):
"""
计算GMAC认证标签
:param key: 密钥
:param iv: 初始向量
:param ciphertext: 密文
:param aad: 附加认证数据
:return: 认证标签
"""
# 使用HMAC-SM3代替GMAC
data_to_hash = iv + aad + ciphertext
# 创建一个符合hashlib接口的SM3哈希函数
class SM3Hash:
def __init__(self):
self._hash = []
self.digest_size = 32 # SM3哈希值的长度为32字节
self.block_size = 64 # SM3的块大小为64字节
def update(self, msg):
self._hash.extend(func.bytes_to_list(msg))
def digest(self):
return bytes.fromhex(sm3.sm3_hash(self._hash))
def copy(self):
# 创建一个新的SM3Hash对象并复制当前状态
copy_obj = SM3Hash()
copy_obj._hash = self._hash[:]
return copy_obj
# 使用HMAC
hmac_sm3 = hmac.new(key, data_to_hash, digestmod=SM3Hash)
return hmac_sm3.digest()
@staticmethod
def _constant_time_compare(a, b):
"""
恒定时间比较两个字节序列
:param a: 字节序列1
:param b: 字节序列2
:return: 如果相等返回True,否则返回False
"""
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= x ^ y
return result == 0
def generate_rsa_keys(key_size=2048):
"""生成RSA公钥/私钥对"""
key = RSA.generate(key_size)
private_key = key.export_key()
public_key = key.publickey().export_key()
return private_key, public_key
def rsa_encrypt(data, public_key):
"""使用RSA公钥加密数据"""
rsa_key = RSA.import_key(public_key)
cipher = PKCS1_OAEP.new(rsa_key)
return cipher.encrypt(data)
def rsa_decrypt(encrypted_data, private_key):
"""使用RSA私钥解密数据"""
rsa_key = RSA.import_key(private_key)
cipher = PKCS1_OAEP.new(rsa_key)
return cipher.decrypt(encrypted_data)
def encrypt_file(input_path, public_key, output_path=None):
"""
SM4-GCM加密文件
返回:
{
"encrypted_key": "RSA加密的SM4密钥(B64)",
"nonce": "Nonce(B64)",
"encrypted_data": "加密文件路径或B64数据",
"timestamp": "加密时间"
}
"""
start_time = time.time()
# 读取文件内容
with open(input_path, 'rb') as f:
file_data = f.read()
# 生成随机SM4密钥和nonce
sm4_key = os.urandom(16)
nonce = os.urandom(16)
# 使用SM4-GCM模式加密数据
cipher = SM4GCM(sm4_key, nonce)
encrypted_data = cipher.encrypt(file_data) # 包含加密数据+标签
# 用RSA公钥加密SM4密钥
encrypted_sm4_key = rsa_encrypt(sm4_key, public_key)
# 准备结果JSON
result = {
"encrypted_key": base64.b64encode(encrypted_sm4_key).decode('utf-8'),
"nonce": base64.b64encode(nonce).decode('utf-8'),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"original_size": len(file_data),
"encrypted_size": len(encrypted_data)
}
# 输出到文件或返回Base64数据
if output_path:
with open(output_path, 'wb') as f:
f.write(encrypted_data)
result["encrypted_data"] = output_path
result["source_file"] = input_path
else:
result["encrypted_data"] = base64.b64encode(encrypted_data).decode('utf-8')
duration = time.time() - start_time
result["encryption_time"] = f"{duration:.4f}秒"
return result
def decrypt_file(metadata, private_key, output_path=None):
"""
SM4-GCM解密文件
输入: encrypt_file返回的JSON数据
"""
start_time = time.time()
# 从JSON中提取必要信息
encrypted_sm4_key = base64.b64decode(metadata["encrypted_key"])
nonce = base64.b64decode(metadata["nonce"])
# 获取加密数据
if isinstance(metadata["encrypted_data"], str) and os.path.exists(metadata["encrypted_data"]):
# 从文件读取
with open(metadata["encrypted_data"], 'rb') as f:
encrypted_data = f.read()
elif "encrypted_data" in metadata:
# 从Base64数据读取
encrypted_data = base64.b64decode(metadata["encrypted_data"])
else:
raise ValueError("缺少加密数据")
# 用RSA私钥解密SM4密钥
sm4_key = rsa_decrypt(encrypted_sm4_key, private_key)
# 使用SM4-GCM解密数据
cipher = SM4GCM(sm4_key, nonce)
try:
plaintext = cipher.decrypt(encrypted_data)
except ValueError as e:
result = {"status": "failed", "error": str(e)}
return result
# 结果数据
result = {
"status": "success",
"decryption_time": f"{time.time() - start_time:.4f}秒",
"original_size": metadata["original_size"],
"decrypted_size": len(plaintext)
}
# 写入输出文件或返回Base64数据
if output_path:
with open(output_path, 'wb') as f:
f.write(plaintext)
result["decrypted_file"] = output_path
if "source_file" in metadata:
result["source_file"] = metadata["source_file"]
else:
result["decrypted_data"] = base64.b64encode(plaintext).decode('utf-8')
return result
# ==================== 使用示例 ====================
if __name__ == "__main__":
print("国密SM4-GCM加密方案 (密钥分离版)")
print("加密数据仅包含SM4-GCM输出,密钥通过JSON传输")
# 生成密钥对
print("生成2048位RSA密钥对...")
private_key, public_key = generate_rsa_keys()
# 保存密钥
with open("private_key.pem", "wb") as f:
f.write(private_key)
with open("public_key.pem", "wb") as f:
f.write(public_key)
# 创建测试文件
test_file = "测试文件.txt"
with open(test_file, "w", encoding="utf-8") as f:
f.write("国密SM4-GCM分离密钥方案测试文件\n")
f.write("加密数据文件只包含SM4加密输出\n")
f.write("密钥信息存储在JSON文件中\n")
f.write("=" * 50 + "\n")
f.write("机密级别:绝密\n")
# 加密文件
print("\n加密文件...")
encryption_result = encrypt_file(
input_path=test_file,
public_key=public_key,
output_path="加密数据.bin"
)
# 保存加密元数据
with open("加密信息.json", "w", encoding="utf-8") as f:
json.dump(encryption_result, f, indent=2, ensure_ascii=False)
print("加密完成,加密信息:")
print(json.dumps(encryption_result, indent=2, ensure_ascii=False))
# 解密文件
print("\n解密文件...")
with open("加密信息.json", "r", encoding="utf-8") as f:
metadata = json.load(f)
decryption_result = decrypt_file(
metadata=metadata,
private_key=private_key,
output_path="解密文件.txt"
)
print("解密结果:")
print(json.dumps(decryption_result, indent=2, ensure_ascii=False))
# 验证解密内容
print("\n验证解密内容:")
with open("解密文件.txt", "r", encoding="utf-8") as f:
print(f.read())
print("\n测试篡改检测...")
# 篡改加密文件
with open("加密数据.bin", "rb") as f:
file_data = f.read()
# 在加密数据部分修改一个字节
tampered_data = file_data[:100] + bytes([file_data[100] ^ 0x01]) + file_data[101:]
with open("篡改数据.bin", "wb") as f:
f.write(tampered_data)
# 创建篡改的元数据
tampered_metadata = metadata.copy()
tampered_metadata["encrypted_data"] = "篡改数据.bin"
# 尝试解密被篡改的文件
print("尝试解密被篡改的文件...")
try:
tamper_result = decrypt_file(
metadata=tampered_metadata,
private_key=private_key,
output_path="篡改解密.txt"
)
print("解密结果:", json.dumps(tamper_result, indent=2))
except Exception as e:
print(f"解密失败: {e}")
print("\n所有操作完成")