Technical Audit Document
This document is written for security researchers, cryptography auditors, and developers. It precisely describes the implementation details, known limitations, and threat model of the security-related modules in Nephele Workshop.
All core source code is included inline for white-box review. Sensitive constants (such as watermark passwords) have been redacted.
This document corresponds to the state of the Nephele Workshop v0.5.2-beta codebase.
提示
All source code snippets in this document are released under the MIT License. You are free to copy, modify, and use them commercially; simply retain the copyright notice.
1. Audit Scope
1.1 List of Files Under Audit
| Feature | File | Audit Scope |
|---|---|---|
| Digital Timestamping | tools/rights/logic.py | File hashing, batch timestamping, deep verification |
tools/rights/utils.py | Merkle Tree | |
tools/rights/tsa_client.py | RFC 3161 TSA client | |
tools/rights/rights_packer.py | .nep container packaging | |
| Infringement Evidence Capture | tools/rights/url_evidence.py | URL evidence main flow, TLS, CAPTCHA |
core/browser/session.py | Playwright session / screenshot | |
| Invisible Watermark | tools/packer/watermark_protection.py | Fixed-length encoding, round-trip verification, exception fallback |
tools/packer/logic.py / agent_api.py | Business-layer invocation | |
core/workers/watermark_worker.py | Background extraction thread | |
blind_watermark (PyPI) | DWT+DCT+SVD underlying algorithm | |
| AI Metadata Detection | tools/validator/logic.py | Metadata reading, rule matching, evidence grading |
tools/validator/c2pa_verifier.py | Official C2PA SDK adapter, trust-state parsing | |
core/workers/ai_detector_worker.py | Batch detection thread |
1.2 Product Boundary
This audit does not cover:
- Payment / licensing modules (
core/license_manager.py,core/payment.py) - Authentication / JWT / CAPTCHA integration (
core/auth/) - The AI conversation Agent and cloud inference (
core/agent_loop.py,nephele-api/) - Client updates and SSL pinning (
core/updater.py,core/ssl_pinning.py)
Each of these modules has its own independent security boundary and threat model, which fall outside the scope of this document.
2. Digital Timestamping Core Implementation
2.1 File Hash Computation
def calculate_file_hash(file_path: Path, algorithm: str = 'sha256') -> str:
"""
计算文件的哈希值(极简版,仅用于非确权场景)
Args:
file_path: 文件路径
algorithm: 哈希算法,默认 'sha256'
Returns:
文件的十六进制哈希值
Raises:
RightsError: 文件不存在或读取失败
"""
if not file_path or not isinstance(file_path, Path):
raise RightsError(f"无效的文件路径: {file_path}")
if not file_path.exists():
raise RightsError(f"文件不存在: {file_path}")
if not file_path.is_file():
raise RightsError(f"路径不是文件: {file_path}")
try:
file_size = file_path.stat().st_size
if file_size > 10 * 1024 * 1024 * 1024: # 10GB 限制
raise RightsError(f"文件过大(超过10GB): {file_path}")
hash_obj = hashlib.new(algorithm)
with open(file_path, 'rb') as f:
chunk_size = 8192
while True:
chunk = f.read(chunk_size)
if not chunk:
break
hash_obj.update(chunk)
return hash_obj.hexdigest()
except PermissionError:
raise RightsError(f"没有权限读取文件: {file_path}")
except OSError as e:
raise RightsError(f"读取文件失败: {file_path}, 错误: {str(e)}")
except Exception as e:
raise RightsError(f"计算文件哈希失败: {file_path}, 错误: {str(e)}")Audit points:
- Algorithm: SHA-256, no salt, no key (not HMAC)
- Chunking: 8,192 bytes, streamed
- Upper limit: 10 GB, rejected if exceeded
2.2 Full Merkle Tree Implementation
class MerkleTree:
"""
Merkle Tree 实现,用于将多个文件的哈希值聚合成单个根哈希
优势:
- 支持 100+ 文件批量处理
- 单个根哈希可代表整个批次
- 节省 TSA 调用成本(1 次调用 vs N 次调用)
Known limitation (second-preimage resistance):
This implementation does NOT use domain separation prefixes for leaf vs
internal nodes (i.e. b'\\x00' for leaves, b'\\x01' for internal nodes as
recommended by RFC 6962 §2.1). Adding prefixes would change the root hash
computation and break backward compatibility with all existing .nep files
and the verification website (verify.arisfusion.com). A future tree_version
bump can introduce domain separation; the current version is safe for our
threat model (user-submitted files, not adversarial tree construction).
"""
def __init__(self, hash_algorithm: str = 'sha256'):
"""
初始化 Merkle Tree
Args:
hash_algorithm: 哈希算法,默认 'sha256'
"""
self.hash_algorithm = hash_algorithm
self.leaves: List[str] = []
self.tree: List[List[str]] = []
self.root_hash: Optional[str] = None
def add_leaf(self, data: bytes) -> str:
"""
添加叶子节点(文件哈希)
Args:
data: 文件数据或哈希值(bytes)
Returns:
叶子节点的哈希值
"""
hash_obj = hashlib.new(self.hash_algorithm)
hash_obj.update(data)
leaf_hash = hash_obj.hexdigest()
self.leaves.append(leaf_hash)
return leaf_hash
def add_file_hash(self, file_hash: str) -> None:
"""
直接添加文件哈希值(已计算好的)
Args:
file_hash: 文件的十六进制哈希值
"""
self.leaves.append(file_hash)
def build(self) -> str:
"""
构建 Merkle Tree 并返回根哈希
Returns:
根哈希值(十六进制字符串)
"""
if not self.leaves:
raise ValueError("Merkle Tree 没有叶子节点")
# 如果只有一个叶子节点,直接返回
if len(self.leaves) == 1:
self.root_hash = self.leaves[0]
return self.root_hash
# 构建树:从叶子节点开始,逐层向上
current_level = self.leaves.copy()
self.tree = [current_level]
while len(current_level) > 1:
next_level = []
# 成对处理节点
for i in range(0, len(current_level), 2):
if i + 1 < len(current_level):
# 两个节点:合并哈希
combined = current_level[i] + current_level[i + 1]
else:
# 奇数个节点:最后一个节点复制后与自己合并
combined = current_level[i] + current_level[i]
# 计算父节点哈希
hash_obj = hashlib.new(self.hash_algorithm)
hash_obj.update(combined.encode('utf-8'))
parent_hash = hash_obj.hexdigest()
next_level.append(parent_hash)
self.tree.append(next_level)
current_level = next_level
# 根哈希是最后一层的唯一节点
self.root_hash = current_level[0]
return self.root_hash
def get_proof(self, leaf_index: int) -> List[Dict]:
"""
获取指定叶子节点的 Merkle Proof(用于验证)
Args:
leaf_index: 叶子节点索引
Returns:
Merkle Proof 路径,每个元素为 {'hash': str, 'position': 'left'|'right'}
position 表示兄弟节点在合并时的位置
"""
if not self.tree:
self.build()
if leaf_index >= len(self.leaves):
raise IndexError(f"叶子节点索引超出范围: {leaf_index}")
proof = []
current_index = leaf_index
current_level = 0
while current_level < len(self.tree) - 1:
level = self.tree[current_level]
# 找到兄弟节点并记录位置
if current_index % 2 == 0:
# 当前是左节点,兄弟在右侧
sibling_index = current_index + 1
if sibling_index < len(level):
proof.append({'hash': level[sibling_index], 'position': 'right'})
else:
# 奇数情况,兄弟是自己(已复制)
proof.append({'hash': level[current_index], 'position': 'right'})
else:
# 当前是右节点,兄弟在左侧
sibling_index = current_index - 1
proof.append({'hash': level[sibling_index], 'position': 'left'})
# 移动到上一层
current_index = current_index // 2
current_level += 1
return proof
def verify_proof(self, leaf_hash: str, proof: List[Dict], root_hash: str) -> bool:
"""
验证 Merkle Proof
Args:
leaf_hash: 叶子节点哈希
proof: Merkle Proof 路径(由 get_proof 返回)
root_hash: 根哈希
Returns:
验证是否通过
"""
current_hash = leaf_hash
for step in proof:
sibling_hash = step['hash']
position = step['position']
# 按照 build() 相同的位置顺序合并:左 + 右
if position == 'right':
combined = current_hash + sibling_hash
else:
combined = sibling_hash + current_hash
hash_obj = hashlib.new(self.hash_algorithm)
hash_obj.update(combined.encode('utf-8'))
current_hash = hash_obj.hexdigest()
return current_hash == root_hashKnown security limitation (disclosed proactively):
The current implementation does not use the domain-separation prefixes recommended by RFC 6962 §2.1 (leaf nodes are not prefixed with \x00, internal nodes are not prefixed with \x01). This means that in an extreme adversarial scenario, a theoretical possibility exists for constructing a second-preimage.
Practical risk assessment:
- When the threat model is "a user generating a timestamp for their own work", the risk is negligible.
- If the threat model requires "resistance to maliciously constructed collisions", the current implementation does not meet that security level.
2.3 Full TSA Client Implementation
class TSAClient:
"""
RFC 3161 时间戳服务客户端
支持的服务:
- FreeTSA (https://freetsa.org/tsr) - 免费,无需注册
- DigiCert (http://timestamp.digicert.com)
- IdenTrust (http://timestamp.identrust.com)
- 其他 RFC 3161 兼容服务
"""
# 预定义的 TSA 服务提供商
PROVIDERS = {
'freetsa': {
'name': 'FreeTSA',
'url': 'https://freetsa.org/tsr',
'hashname': 'sha256',
'description': '免费时间戳服务,国际标准 RFC 3161',
'requires_auth': False,
'legal_strength': 3, # 1-5 评分
'price': 0
},
'digicert': {
'name': 'DigiCert',
'url': 'http://timestamp.digicert.com',
'hashname': 'sha256',
'description': 'DigiCert 免费时间戳服务',
'requires_auth': False,
'legal_strength': 4,
'price': 0
},
'identrust': {
'name': 'IdenTrust',
'url': 'http://timestamp.identrust.com',
'hashname': 'sha256',
'description': 'IdenTrust 免费时间戳服务',
'requires_auth': False,
'legal_strength': 4,
'price': 0
}
}
def __init__(
self,
provider: str = 'freetsa',
custom_url: Optional[str] = None,
hashname: str = 'sha256',
timeout: int = 30
):
"""
初始化 TSA 客户端
Args:
provider: 预定义的服务提供商名称 ('freetsa', 'digicert', 'identrust')
custom_url: 自定义 TSA URL(如果指定,则忽略 provider)
hashname: 哈希算法 ('sha256', 'sha512' 等)
timeout: 请求超时时间(秒)
"""
if not RFC3161_AVAILABLE:
raise ImportError(
"rfc3161ng 库未安装。请运行: pip install rfc3161ng"
)
if custom_url:
self.url = custom_url
self.provider_name = "Custom TSA"
self.provider_key = None
elif provider in self.PROVIDERS:
config = self.PROVIDERS[provider]
self.url = config['url']
self.provider_name = config['name']
self.provider_key = provider
hashname = config['hashname']
else:
raise ValueError(
f"未知的 TSA 提供商: {provider}。"
f"支持的提供商: {', '.join(self.PROVIDERS.keys())}"
)
self.hashname = hashname
self.timeout = timeout
# 初始化 rfc3161ng 时间戳器
# 注意:某些环境可能遇到 SSL 握手问题,这是正常的
# 我们的设计会自动降级到本地哈希
try:
self.stamper = rfc3161ng.RemoteTimestamper(
url=self.url,
hashname=self.hashname,
timeout=self.timeout
)
except Exception as e:
# 如果初始化失败,记录错误但不抛出异常
# 后续调用时会返回失败状态
self.stamper = None
self._init_error = str(e)
FAILOVER_ORDER: List[str] = ['digicert', 'freetsa', 'identrust']
def _call_with_retry(self, hash_bytes: bytes, max_retries: int = 3) -> bytes:
"""
带指数退避和提供商故障转移的 TSA 调用
Args:
hash_bytes: 要签名的哈希值(二进制)
max_retries: 每个提供商的最大重试次数
Returns:
TSR 令牌(二进制)
Raises:
Exception: 所有提供商均失败
"""
# 构建尝试顺序:当前提供商优先,然后是其他提供商
providers_to_try = []
if self.provider_key:
providers_to_try.append(self.provider_key)
for p in self.FAILOVER_ORDER:
if p != self.provider_key:
providers_to_try.append(p)
else:
# 自定义 URL,无故障转移
providers_to_try = [None]
last_error = None
for provider_key in providers_to_try:
if provider_key is not None:
config = self.PROVIDERS[provider_key]
url = config['url']
hashname = config['hashname']
provider_name = config['name']
else:
url = self.url
hashname = self.hashname
provider_name = self.provider_name
for attempt in range(max_retries):
try:
stamper = rfc3161ng.RemoteTimestamper(
url=url,
hashname=hashname,
timeout=self.timeout
)
tsr_token = stamper(digest=hash_bytes)
# 成功后更新当前提供商信息
if provider_key is not None:
self.provider_name = provider_name
self.provider_key = provider_key
self.url = url
self.stamper = stamper
return tsr_token
except Exception as e:
last_error = e
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
raise Exception(
f"所有 TSA 提供商均失败 (尝试: {', '.join(p or 'custom' for p in providers_to_try)}): {last_error}"
)
def timestamp_data(self, data: bytes) -> bytes:
"""
对原始数据生成时间戳
Args:
data: 要加时间戳的数据(二进制)
Returns:
TSR (Time-Stamp Response) 二进制令牌
Raises:
Exception: 时间戳请求失败
"""
try:
# 计算数据的哈希值
hash_obj = hashlib.new(self.hashname)
hash_obj.update(data)
data_hash = hash_obj.digest()
# 调用 TSA 服务获取时间戳(带重试和故障转移)
tsr_token = self._call_with_retry(data_hash)
return tsr_token
except Exception as e:
raise Exception(f"时间戳请求失败: {str(e)}")
def timestamp_file(self, file_path: Path, output_path: Optional[Path] = None) -> Dict:
"""
为文件生成时间戳(流式处理,不将整个文件载入内存)
Args:
file_path: 要加时间戳的文件路径
output_path: TSR 文件输出路径(可选,默认为 file_path.tsr)
Returns:
包含时间戳信息的字典
"""
file_path = Path(file_path)
if not file_path.exists():
return {
'success': False,
'message': f"文件不存在: {file_path}"
}
try:
# 流式计算文件哈希(8KB 分块,避免大文件内存溢出)
hash_obj = hashlib.new(self.hashname)
with open(file_path, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
hash_obj.update(chunk)
file_hash = hash_obj.hexdigest()
# 确定输出路径
if output_path is None:
output_path = file_path.parent / f"{file_path.stem}.tsr"
else:
output_path = Path(output_path)
# 委托给 timestamp_hash(处理 TSA 调用、重试和文件写入)
result = self.timestamp_hash(file_hash, output_path)
# 确保返回文件哈希
if result.get('success'):
result['hash'] = file_hash
return result
except Exception as e:
return {
'success': False,
'message': f"时间戳生成失败: {str(e)}"
}
def timestamp_hash(self, hash_value: str, output_path: Path) -> Dict:
"""
为已知的哈希值生成时间戳(用于 Merkle Root 等场景)
Args:
hash_value: 十六进制哈希值字符串
output_path: TSR 文件输出路径
Returns:
包含时间戳信息的字典
"""
try:
# 将十六进制哈希值转换为二进制
hash_bytes = bytes.fromhex(hash_value)
# 获取时间戳令牌(带重试和故障转移)
tsr_token = self._call_with_retry(hash_bytes)
# 保存 TSR 文件
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(tsr_token)
# 从 TSR 令牌中提取 TSA 认证时间(而非本机时钟)
tsa_timestamp = datetime.now().isoformat() # fallback
tsa_issuer = self.provider_name
try:
from asn1crypto import tsp, cms
# Try TimeStampResp first, then ContentInfo
signed_data = None
try:
ts_resp = tsp.TimeStampResp.load(tsr_token)
signed_data = ts_resp['time_stamp_token']['content']
except (ValueError, KeyError, TypeError):
try:
ci = cms.ContentInfo.load(tsr_token)
if ci['content_type'].native == 'signed_data':
signed_data = ci['content']
except (ValueError, KeyError, TypeError):
pass
if signed_data:
tst_info = signed_data['encap_content_info']['content'].parsed
gen_time = tst_info['gen_time'].native
if gen_time:
tsa_timestamp = gen_time.isoformat()
# Extract issuer CN from signer info
try:
signer_infos = signed_data['signer_infos']
if signer_infos:
sid = signer_infos[0]['sid']
if sid.name == 'issuer_and_serial_number':
for rdn in sid.chosen['issuer'].chosen:
for attr in rdn:
if attr['type'].dotted == '2.5.4.3':
tsa_issuer = attr['value'].native
break
except (KeyError, IndexError, ValueError):
pass
except ImportError:
pass # asn1crypto not available, use fallback values
return {
'success': True,
'timestamp': tsa_timestamp,
'hash': hash_value,
'issuer': tsa_issuer,
'tsr_path': str(output_path),
'algorithm': self.hashname.upper(),
'message': f"时间戳生成成功,TSR 文件: {output_path.name}"
}
except Exception as e:
return {
'success': False,
'message': f"时间戳生成失败: {str(e)}"
}
def verify_tsr(
self,
tsr_path: Path,
data: Optional[bytes] = None,
digest: Optional[bytes] = None,
) -> Dict:
"""
验证 TSR 时间戳令牌
Args:
tsr_path: TSR 文件路径
data: 原始数据(可选,库内部计算哈希后比对)
digest: 预计算的哈希值(可选,直接与 TSR 中记录的哈希比对)
data 和 digest 二选一;都不传则仅验证 TSR 结构
Returns:
验证结果字典,包含 'valid', 'message', 'issuer' 等字段。
当 data/digest 未提供时,结果包含 'partial_verification': True 标志。
"""
tsr_path = Path(tsr_path)
if not tsr_path.exists():
return {
'valid': False,
'message': f"TSR 文件不存在: {tsr_path}"
}
try:
with open(tsr_path, 'rb') as f:
tsr_token = f.read()
if digest is not None:
# 直接用预计算的哈希比对(用于 Merkle Root 等场景)
verified = rfc3161ng.check_timestamp(tsr_token, digest=digest)
if verified:
return {
'valid': True,
'message': '时间戳验证通过(数据完整性已确认)',
'issuer': self._extract_issuer_from_tsr(tsr_token),
}
else:
return {
'valid': False,
'message': '时间戳验证失败:哈希值不匹配'
}
elif data is not None:
# 传入原始数据,由库内部计算哈希后比对
verified = rfc3161ng.check_timestamp(tsr_token, data=data)
if verified:
return {
'valid': True,
'message': '时间戳验证通过(数据完整性已确认)',
'issuer': self._extract_issuer_from_tsr(tsr_token),
}
else:
return {
'valid': False,
'message': '时间戳验证失败:哈希值不匹配'
}
else:
# 结构验证:无原始数据时,验证 TSR 文件结构有效性
return self._verify_tsr_structure(tsr_token)
except Exception as e:
return {
'valid': False,
'message': f'时间戳验证失败: {str(e)}'
}
def _extract_issuer_from_tsr(self, tsr_token: bytes) -> str:
"""Extract the actual issuer CN from a TSR token using asn1crypto.
Falls back to self.provider_name if parsing fails or asn1crypto
is unavailable. This ensures verify_tsr() reports the real issuer
even after failover to a different TSA provider.
"""
try:
from asn1crypto import tsp, cms
signed_data = None
try:
ts_resp = tsp.TimeStampResp.load(tsr_token)
signed_data = ts_resp['time_stamp_token']['content']
except (ValueError, KeyError, TypeError):
try:
ci = cms.ContentInfo.load(tsr_token)
if ci['content_type'].native == 'signed_data':
signed_data = ci['content']
except (ValueError, KeyError, TypeError):
pass
if signed_data:
signer_infos = signed_data['signer_infos']
if signer_infos:
sid = signer_infos[0]['sid']
if sid.name == 'issuer_and_serial_number':
for rdn in sid.chosen['issuer'].chosen:
for attr in rdn:
if attr['type'].dotted == '2.5.4.3': # CN
return attr['value'].native
except (ImportError, Exception):
pass
return self.provider_name
def _verify_tsr_structure(self, tsr_token: bytes) -> Dict:
"""验证 TSR 令牌的 ASN.1 结构(不验证数据完整性)"""
if len(tsr_token) < 20:
return {'valid': False, 'message': 'TSR 文件过小,可能已损坏'}
if tsr_token[0] != 0x30:
return {'valid': False, 'message': 'TSR 文件不是有效的 ASN.1 DER 格式'}
# 尝试 asn1crypto 深度结构验证
try:
from asn1crypto import tsp, cms
# 格式 1: TimeStampResp
try:
ts_resp = tsp.TimeStampResp.load(tsr_token)
status = ts_resp['status']['status'].native
if status not in ('granted', 'granted_with_mods'):
return {
'valid': False,
'message': f'TSR 状态异常: {status}'
}
return {
'valid': True,
'message': '时间戳结构验证通过(未验证数据完整性)',
'issuer': self.provider_name,
'partial_verification': True
}
except (ValueError, KeyError, TypeError):
pass
# 格式 2: ContentInfo/SignedData (DigiCert 等)
try:
content_info = cms.ContentInfo.load(tsr_token)
if content_info['content_type'].native == 'signed_data':
return {
'valid': True,
'message': '时间戳结构验证通过(未验证数据完整性)',
'issuer': self.provider_name,
'partial_verification': True
}
except (ValueError, KeyError, TypeError):
pass
return {'valid': False, 'message': '无法解析 TSR 结构'}
except ImportError:
# asn1crypto 不可用,基础结构检查已通过(size + 0x30 tag)
return {
'valid': True,
'message': '时间戳基础结构检查通过(安装 asn1crypto 可获得深度验证)',
'issuer': self.provider_name,
'partial_verification': True
}
@classmethod
def get_provider_info(cls, provider: str) -> Optional[Dict]:
"""获取预定义服务提供商的信息"""
return cls.PROVIDERS.get(provider)
@classmethod
def list_providers(cls) -> Dict[str, Dict]:
"""列出所有预定义的服务提供商"""
return cls.PROVIDERS.copy()Audit highlights:
- Default constructor:
provider='freetsa' - The UI calls
batch_protect_works(tsa_provider='digicert'), so the user's actual default is DigiCert - Failover order:
['digicert', 'freetsa', 'identrust'] - Exponential backoff:
sleep(2 ** attempt), i.e. 1s, 2s, 4s - Up to 3 retries per provider
- The initial fallback value of
tsa_timestampintimestamp_hashisdatetime.now().isoformat()(local clock); it is only replaced with the TSA-asserted time afterasn1cryptoparses it successfully
2.4 Batch Timestamping Main Flow
def batch_protect_works(
file_paths: List[Path],
author_name: str,
inspiration: Optional[str] = None,
output_dir: Optional[Path] = None,
password: Optional[str] = None,
progress_callback=None,
tsa_provider: str = 'digicert',
tsa_timeout: int = 30,
cert_mode: str = 'simple',
) -> Dict:
"""
批量保护作品(数字存证核心流程)
流程:
1. 计算所有文件哈希
2. 构建 Merkle Tree
3. 生成 manifest.json
4. 生成缩略图拼贴
5. 调用 TSA 获取时间戳(使用根哈希)
6. 生成 PDF 报告
7. 打包为 .nep 文件
Args:
file_paths: 文件路径列表
author_name: 作者名称
inspiration: 创作灵感(可选)
output_dir: 输出目录
password: .nep 文件密码(可选)
progress_callback: 进度回调 (current, total, message)
Returns:
包含处理结果的字典
"""
from .utils import build_merkle_tree_from_files
from .rights_packer import RightsPacker
from .pdf_generator import PDFGenerator
if not file_paths:
raise RightsError("文件列表为空")
if output_dir is None:
output_dir = Path.cwd() / "digital_evidence"
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
try:
# 步骤 1: 计算文件哈希并构建 Merkle Tree
if progress_callback:
progress_callback(0, len(file_paths), "正在计算文件哈希...")
def hash_progress(current, total):
if progress_callback:
progress_callback(current, total, f"计算哈希: {current}/{total}")
tree = build_merkle_tree_from_files(file_paths, progress_callback=hash_progress)
root_hash = tree.root_hash
image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.bmp'}
# 步骤 2: 收集文件哈希和作品信息 + 计算感知哈希
# Reuse hashes already computed by Merkle Tree (avoid double I/O)
file_hashes = dict(tree.file_hashes)
works = []
fingerprints = [] # Perceptual hashes for image files
for file_path in file_paths:
file_hash = file_hashes.get(str(file_path), calculate_file_hash(file_path))
# 获取文件创建时间
try:
creation_time = datetime.fromtimestamp(file_path.stat().st_mtime)
except (OSError, ValueError, OverflowError):
creation_time = datetime.now()
works.append({
'title': file_path.stem,
'creation_date': creation_time.isoformat(),
'file_path': str(file_path),
'file_hash': file_hash
})
# Compute perceptual hash for image files (non-blocking, best-effort)
if file_path.suffix.lower() in image_extensions:
try:
from .fingerprint import compute_fingerprint
fp = compute_fingerprint(file_path, file_sha256=file_hash)
fingerprints.append(fp)
except Exception as e:
import logging as _logging
_logging.getLogger(__name__).debug(
"Skipping perceptual hash for %s: %s", file_path.name, e
)
# 步骤 3: 生成 manifest.json
if progress_callback:
progress_callback(len(file_paths), len(file_paths), "正在生成清单...")
packer = RightsPacker(output_dir / "evidence.nep", password=password)
manifest_data = packer.create_manifest(
author_name=author_name,
inspiration=inspiration,
works=works,
file_hashes=file_hashes
)
# 步骤 4: 生成缩略图(仅图片文件)
image_paths = [p for p in file_paths if p.suffix.lower() in image_extensions]
thumbnail_path = None
if image_paths:
if progress_callback:
progress_callback(len(file_paths), len(file_paths), "正在生成缩略图...")
thumbnail_path = output_dir / "thumbnail.jpg"
packer.generate_thumbnail(image_paths, thumbnail_path)
# 步骤 5: 调用 DigiCert 获取真实 RFC 3161 时间戳
if progress_callback:
progress_callback(len(file_paths), len(file_paths), "正在获取 DigiCert 时间戳...")
tsa_binary_path = output_dir / "proof.tsa" # RFC 3161 二进制
local_json_path = output_dir / "proof.json" # 本地降级 JSON
timestamp_file = tsa_binary_path # 最终实际写入的文件
try:
from .tsa_client import TSAClient
# 初始化 TSA 客户端(可配置提供商)
tsa_client = TSAClient(provider=tsa_provider, timeout=tsa_timeout)
# 用 Merkle Root Hash 获取时间戳
tsa_result = tsa_client.timestamp_hash(root_hash, tsa_binary_path)
if tsa_result['success']:
timestamp_file = tsa_binary_path
timestamp_info = {
'timestamp': tsa_result['timestamp'],
'hash': root_hash,
'issuer': tsa_result['issuer'],
'algorithm': tsa_result['algorithm'],
'valid': True,
'tsr_path': str(tsa_binary_path)
}
else:
# TSA 失败,使用本地时间戳(写 .json,不混淆 .tsa)
timestamp_file = local_json_path
timestamp_info = {
'timestamp': datetime.now().isoformat(),
'hash': root_hash,
'issuer': 'Nephele Workshop (本地)',Key behavior:
cert_modedefaults to"simple"and does not automatically detect source-file upgrades- The UI layer (
PipelineWindow.qml) explicitly passes"full"based oncertifySourceFiles.length > 0 rfc3161ngnot installed → forced downgrade to a local timestamp, without blocking the flow- When computing
manifest_sha256, its own key is excluded to prevent a circular dependency
2.5 .nep Packaging Implementation
def _write_zip_contents(zipf, manifest_data, thumbnail_path, timestamp_file,
pdf_report, additional_files, source_files):
"""将内容写入 ZIP 文件(供 pack() 内部使用)"""
# 1. 添加 manifest.json
manifest_json = json.dumps(manifest_data, indent=2, ensure_ascii=False)
zipf.writestr('manifest.json', manifest_json.encode('utf-8'))
# 2. 添加缩略图
if thumbnail_path and thumbnail_path.exists():
zipf.write(thumbnail_path, 'thumbnail.jpg')
# 3. 添加时间戳文件(保持原始扩展名:.tsa=RFC 3161 二进制,.json=本地降级)
if timestamp_file and timestamp_file.exists():
archive_name = 'proof.tsa' if timestamp_file.suffix == '.tsa' else 'proof.json'
zipf.write(timestamp_file, archive_name)
# 4. 添加 PDF 报告
if pdf_report and pdf_report.exists():
zipf.write(pdf_report, 'VerificationReport.pdf')
# 5. 原始作品文件 → works/ 目录
# Use indexed filenames (e.g. works/000_photo.jpg) to avoid collisions
# when multiple source files share the same basename. Must match the
# indexed keys in manifest_data['works_map'].
if source_files:
for idx, file_path in enumerate(source_files):
if isinstance(file_path, str):
file_path = Path(file_path)
if file_path.exists() and file_path.is_file():
zipf.write(file_path, f'works/{idx:03d}_{file_path.name}')
# 6. 添加额外文件
if additional_files:
for file_path in additional_files:
if file_path.exists():
zipf.write(file_path, f'additional/{file_path.name}')
Password-protection logic (the pack() method):
- If
pyzipperis available and a password is set: useAESZipFile+WZ_AES(AES-256) - If
pyzipperis unavailable but a password is set: fall back to standard ZIP and write a_warninginto the manifest - No password: standard
zipfile.ZipFile
2.6 Standalone Verifier (verify.arisfusion.com)
The .nep standalone verifier is publicly deployed at verify.arisfusion.com.
Deployment form:
- A single HTML file (about 2,200 lines), with no build tools, no bundling pipeline, and no external dependencies
- Pure client-side computation (
SubtleCrypto+ a pure-JS ASN.1 parser) - "View Page Source" in the browser is enough to audit the complete logic
Verification chain:
- The user uploads a
.nep→ the browser unpacks it locally - SHA-256 is recomputed over
works/→ a Merkle Tree is built in lexicographic order of file names proof.tsa(the RFC 3161 TSR ASN.1 structure) is parsed →messageImprint.hashedMessageis extracted- The local Merkle Root is compared against the digest embedded in the TSR
- The
genTimeandtsafields in the TSR are parsed to display the issuing authority and the time
Trust boundary:
- The verifier itself does not issue any timestamp; it only reads the
proof.tsaalready present inside the.nep - In the current version, cryptographic verification of the TSA signature (certificate chain + public-key chain) is a structural comparison + TSA public-key fingerprint match; full CA-chain verification is recommended via
openssl ts -verifyorrfc3161ngfor cross-confirmation - The source code is MIT-licensed, so anyone can stand up their own mirror or use it offline (simply save the HTML file)
2.7 Deep Verification Implementation
def verify_evidence_package(
tsa_path: Path,
file_paths: List[Path],
) -> Tuple[bool, Dict]:
"""
深度验证存证包:重算文件哈希 → 重建 Merkle Tree → 与 TSR 中签名的哈希比对。
验证链路:
文件列表 → SHA-256 → Merkle Tree → Root Hash → 与 TSR 内 messageImprint 比对
Args:
tsa_path: .tsa/.tsr 时间戳文件路径
file_paths: 被存证的原始文件路径列表(顺序必须与存证时一致)
Returns:
(是否通过, 验证结果字典)
"""
from .utils import build_merkle_tree_from_files
if not tsa_path.exists():
return False, {'valid': False, 'message': f"时间戳文件不存在: {tsa_path}"}
missing = [str(p) for p in file_paths if not p.exists()]
if missing:
return False, {'valid': False, 'message': f"原始文件缺失: {', '.join(missing)}"}
try:
# Step 1: 重算文件哈希,重建 Merkle Tree
tree = build_merkle_tree_from_files(file_paths)
computed_root = tree.root_hash
# Step 2: 检查是否为本地时间戳(无第三方签名,不具备证明力)
suffix = tsa_path.suffix.lower()
if suffix == '.json':
tsr_result = parse_tsr(tsa_path)
tsr_hash = tsr_result.get('hash', tsr_result.get('work_identity', ''))
if computed_root.lower() == tsr_hash.lower():
return False, {
'valid': False,
'message': "本地时间戳无第三方签名,不具备密码学证明力。文件哈希一致但无法证明时间。",
'root_hash': computed_root,
'local_only': True,
'file_count': len(file_paths),
}
else:
return False, {
'valid': False,
'message': "验证失败:Merkle Root 与本地时间戳记录不匹配",
'computed_root': computed_root,
'file_count': len(file_paths),
}
# Step 3: RFC 3161 TSR — 用 rfc3161ng 做真正的密码学签名验证
computed_digest = bytes.fromhex(computed_root)
try:
from .tsa_client import TSAClient
tsa_client = TSAClient()
verify_result = tsa_client.verify_tsr(tsa_path, digest=computed_digest)
if verify_result.get('valid'):
# 同时解析 TSR 获取时间戳详情(签发时间、签发方)
tsr_result = parse_tsr(tsa_path)
return True, {
'valid': True,
'message': f"深度验证通过:{len(file_paths)} 个文件的 Merkle Root 与 TSA 签名匹配(密码学验证)",
'timestamp': tsr_result.get('timestamp'),
'issuer': tsr_result.get('issuer'),
'root_hash': computed_root,
'file_count': len(file_paths),
}
else:
return False, {
'valid': False,
'message': f"TSA 签名验证失败:{verify_result.get('message', 'unknown')}",
'root_hash': computed_root,
'file_count': len(file_paths),
}
except ImportError:
# rfc3161ng 未安装,降级为结构性比对(明确标注)
tsr_result = parse_tsr(tsa_path)
tsr_hash_raw = tsr_result.get('hash', '')
tsr_hash = tsr_hash_raw.split(':', 1)[1] if ':' in tsr_hash_raw else tsr_hash_raw
if computed_root.lower() == tsr_hash.lower():
return True, {
'valid': True,
'message': f"结构验证通过(安装 rfc3161ng 可启用密码学签名验证)",
'timestamp': tsr_result.get('timestamp'),
'issuer': tsr_result.get('issuer'),
'root_hash': computed_root,
'file_count': len(file_paths),
'partial_verification': True,
}
else:
return False, {
'valid': False,
'message': "验证失败:Merkle Root 与 TSR 记录的哈希不匹配",
'computed_root': computed_root,
'tsr_hash': tsr_hash,
'file_count': len(file_paths),
}
except Exception as e:
return False, {'valid': False, 'message': f"深度验证失败: {e}"}
def batch_protect_works(3. Infringement Evidence Capture Core Implementation
3.1 Browser Screenshot
def screenshot(self, path: Optional[str] = None) -> dict:
"""截图保存"""
if not self._page:
return {"success": False, "message": "浏览器未打开"}
try:
return self._run(self._async_screenshot(path))
except Exception as e:
return {"success": False, "message": f"截图失败: {e}"}
async def _async_screenshot(self, path: Optional[str]) -> dict:
import tempfile
if not path:
path = os.path.join(tempfile.gettempdir(), "nephele_screenshot.png")
await self._page.screenshot(path=path, full_page=False)
return {"success": True, "message": f"截图已保存: {path}", "output_path": path}Key fact: full_page=False. This is a viewport screenshot, not a scrolling full-page screenshot. Content below the fold on very long pages is not visually captured.
3.2 Infringement Evidence Capture Main Flow
def capture(
self,
url: str,
progress_callback=None,
) -> Dict:
"""
Execute the full URL evidence capture pipeline.
Args:
url: Target URL to capture
progress_callback: Optional (step, total, message) callback
Returns:
{
"success": bool,
"evidence_id": str,
"output_dir": str,
"manifest": dict,
"timestamp_info": dict,
"message": str,
}
"""
total_phases = 5
self._record(f"Starting evidence capture for: {url}")
try:
# Phase 1: Environment + DNS + TLS certificate
if progress_callback:
progress_callback(1, total_phases, "Collecting environment info...")
environment = self.collect_environment()
dns_info = self.resolve_dns(url)
tls_info = self.capture_tls_certificate(url)
# Phase 2: Browser capture (navigate + screenshot + HTML + images)
if progress_callback:
progress_callback(2, total_phases, "Capturing page...")
artifacts = self.capture_page(url, progress_callback=None)
# Phase 3: Hash all artifacts (including TLS cert files)
if progress_callback:
progress_callback(3, total_phases, "Computing hashes...")
file_hashes = self.hash_artifacts(artifacts)
# Also hash TLS cert + response headers files
for extra_key in ("der_path", "pem_path"):
p = Path(tls_info.get(extra_key, ""))
if p.exists():
file_hashes[p.name] = {
"sha256": self._sha256_file(p),
"size": p.stat().st_size,
"type": "tls_certificate",
}
resp_headers_path = artifacts.get("response_headers_path")
if resp_headers_path and Path(resp_headers_path).exists():
p = Path(resp_headers_path)
file_hashes[p.name] = {
"sha256": self._sha256_file(p),
"size": p.stat().st_size,
"type": "response_headers",
}
# Phase 4: Generate manifest (save log first so it's included)
log_path = Path(self.save_log())
self._log_committed = True # Log content is now frozen for hashing
file_hashes[log_path.name] = {
"sha256": self._sha256_file(log_path),
"size": log_path.stat().st_size,
"type": "operation_log",
}
if progress_callback:
progress_callback(4, total_phases, "Generating manifest...")
manifest = self.generate_manifest(
target_url=url,
environment=environment,
dns_info=dns_info,
artifacts=artifacts,
file_hashes=file_hashes,
tls_info=tls_info,
)
# Phase 5: Timestamp
if progress_callback:
progress_callback(5, total_phases, "Requesting timestamp...")
ts_info = self.timestamp_manifest(manifest)
# DO NOT re-save operation log — the version already hashed in manifest
# is the authoritative one. Any further _record() calls only live in memory.
image_count = len(artifacts.get("images", []))
# Phase 6: Package as .nep (tamper-proof archive)
# Uses files on disk (which match manifest hashes)
nep_path = self._package_nep()
return {
"success": True,
"evidence_id": self._evidence_id,
"output_dir": str(self._output_dir),
"nep_path": str(nep_path),
"manifest": manifest,
"timestamp_info": ts_info,
"page_title": artifacts.get("page_title", ""),
"image_count": image_count,
"file_count": len(file_hashes),
"message": (
f"URL evidence captured: {len(file_hashes)} files, "
f"{image_count} images, timestamp by {ts_info.get('issuer', 'N/A')}"
),
}
except URLEvidenceError as e:
self._record(f"FATAL: {e}")
if not self._log_committed:
self.save_log()
return {
"success": False,
"evidence_id": self._evidence_id,
"output_dir": str(self._output_dir),
"message": str(e),
}
except Exception as e:
self._record(f"UNEXPECTED ERROR: {e}")
if not self._log_committed:
self.save_log()
logger.exception("URL evidence capture failed")
return {
"success": False,
"evidence_id": self._evidence_id,
"output_dir": str(self._output_dir),
"message": f"Unexpected error: {e}",
}
# ===== Helpers =====
Log immutability guarantee:
save_log()is called before the manifest is generated- After writing,
self._log_committed = Trueis set - Thereafter
_record()only appends to an in-memory list and no longer writes to disk - The
file_hashesin the manifest include the SHA-256 of the log file - Therefore the manifest hash anchors the state of the log after it has been "frozen"
3.3 TLS Certificate Capture
def capture_tls_certificate(self, url: str) -> Dict:
"""
Capture the server's TLS certificate chain.
This proves the connection was made to the authentic server —
you can't forge a CA-signed certificate.
"""
import ssl
from urllib.parse import urlparse
self._record("Capturing TLS server certificate")
parsed = urlparse(url if "://" in url else f"https://{url}")
hostname = parsed.hostname or ""
port = parsed.port or 443
if not hostname:
return {"error": "Invalid hostname"}
try:
ctx = ssl.create_default_context()
with ctx.wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
server_hostname=hostname,
) as sock:
sock.settimeout(10)
sock.connect((hostname, port))
cert = sock.getpeercert()
cert_der = sock.getpeercert(binary_form=True)
# Save DER certificate to file
cert_path = self._output_dir / "server_certificate.der"
cert_path.write_bytes(cert_der)
# Also save human-readable PEM
import base64
pem_data = (
"-----BEGIN CERTIFICATE-----\n"
+ base64.encodebytes(cert_der).decode()
+ "-----END CERTIFICATE-----\n"
)
pem_path = self._output_dir / "server_certificate.pem"
pem_path.write_text(pem_data, encoding="utf-8")
# Extract key fields
subject = dict(x[0] for x in cert.get("subject", ()))
issuer = dict(x[0] for x in cert.get("issuer", ()))
cert_info = {
"subject_cn": subject.get("commonName", ""),
"issuer_cn": issuer.get("commonName", ""),
"issuer_org": issuer.get("organizationName", ""),
"not_before": cert.get("notBefore", ""),
"not_after": cert.get("notAfter", ""),
"serial_number": cert.get("serialNumber", ""),
"san": [
entry[1]
for entry in cert.get("subjectAltName", ())
if entry[0] == "DNS"
],
"der_path": str(cert_path),
"pem_path": str(pem_path),
"der_sha256": hashlib.sha256(cert_der).hexdigest(),
}
self._record(
f"TLS cert captured: {cert_info['subject_cn']} "
f"(issuer: {cert_info['issuer_org']}, "
f"serial: {cert_info['serial_number'][:16]}...)"
)
return cert_info
except Exception as e:
self._record(f"TLS certificate capture failed: {e}")
return {"error": str(e)}
# ===== Step 2: DNS resolution =====
Failure scenarios: self-signed certificate, ssl.SSLError, connection timeout → returns {"error": ...}, non-fatal.
3.4 CAPTCHA Detection and Handling
_CAPTCHA_KEYWORDS = (
"验证码", "验证", "captcha", "verify", "challenge",
"human verification", "robot", "机器人",
)
def _is_captcha_page(self, title: str, url: str) -> bool:
"""Detect if the loaded page is a CAPTCHA or anti-bot challenge."""
text = (title or "").lower()
url_lower = (url or "").lower()
for kw in self._CAPTCHA_KEYWORDS:
if kw in text or kw in url_lower:
return True
return False
Handling flow:
- Navigate headless
- Detect title keywords → determine CAPTCHA
- Close the headless browser and open a visible browser
- Re-navigate
- Poll with
time.sleep(2), up to 120 seconds - After timeout, capture the current state
Blocking risk: during polling, time.sleep(2) blocks the current thread.
4. Invisible Watermark Technical Audit
4.1 Architecture Overview
The invisible watermark module is split into three layers:
| Layer | File | Responsibility |
|---|---|---|
| Underlying library | blind_watermark (PyPI) | DWT+DCT+SVD core embedding/extraction algorithm |
| Engine layer | tools/packer/watermark_protection.py | Wrapper layer: fixed-length encoding, round-trip verification, alpha preservation, exception fallback |
| Business layer | tools/packer/logic.py / agent_api.py | Packaging-parameter orchestration, overlay of visible and invisible watermarks |
| Worker layer | core/workers/watermark_worker.py | Background-thread extraction to avoid blocking the UI |
Everything runs locally, with zero network dependency.
4.2 Underlying Library Algorithm (blind_watermark)
The underlying library Nephele uses is blind_watermark (github.com/guofei9987/blind_watermark), which adopts a three-stage hybrid-domain embedding strategy: DWT (Discrete Wavelet Transform) → DCT (Discrete Cosine Transform) → SVD (Singular Value Decomposition).
4.2.1 The WaterMark Wrapper Class
blind_watermark/blind_watermark.py
class WaterMark:
def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
self.password_wm = password_wm
self.wm_bit = None
self.wm_size = 0
def read_img(self, filename=None, img=None):
if img is None:
img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
self.bwm_core.read_img_arr(img=img)
return img
def read_wm(self, wm_content, mode='img'):
if mode == 'bit':
self.wm_bit = np.array(wm_content)
# ... img / str 模式省略 ...
self.wm_size = self.wm_bit.size
# 水印加密:用 password_wm 作为种子对 bit 序列做伪随机置乱
np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
self.bwm_core.read_wm(self.wm_bit)
def embed(self, filename=None, compression_ratio=None):
embed_img = self.bwm_core.embed()
if filename is not None:
cv2.imwrite(filename=filename, img=embed_img)
return embed_img
def extract_decrypt(self, wm_avg):
# 逆置乱:根据相同的 seed 生成相同的 shuffle index,再逆序还原
wm_index = np.arange(self.wm_size)
np.random.RandomState(self.password_wm).shuffle(wm_index)
wm_avg[wm_index] = wm_avg.copy()
return wm_avg
def extract(self, filename=None, embed_img=None, wm_shape=None, mode='img'):
if filename is not None:
embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
self.wm_size = np.array(wm_shape).prod()
if mode in ('str', 'bit'):
wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
else:
wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
wm = self.extract_decrypt(wm_avg=wm_avg)
return wmKey facts:
password_wmis used for the pseudo-random scrambling of the watermark bit sequence (np.random.RandomState.shuffle)password_imgis passed toWaterMarkCoreand is used for the selection-scrambling of image blocks- The encryption is essentially a "deterministic shuffle based on a known seed", not modern cryptographic encryption
4.2.2 The WaterMarkCore Core Engine
blind_watermark/bwm_core.py
class WaterMarkCore:
def __init__(self, password_img=1, mode='common', processes=None):
self.block_shape = np.array([4, 4])
self.password_img = password_img
self.d1, self.d2 = 36, 20 # 量化步长:越大鲁棒性越强,但失真越大
self.pool = AutoPool(mode=mode, processes=processes)Image preprocessing (read_img_arr):
def read_img_arr(self, img):
# 处理透明图
self.alpha = None
if img.shape[2] == 4:
if img[:, :, 3].min() < 255:
self.alpha = img[:, :, 3]
img = img[:, :, :3]
# BGR -> YUV,补白边使像素变偶数(DWT 要求)
self.img = img.astype(np.float32)
self.img_shape = self.img.shape[:2]
self.img_YUV = cv2.copyMakeBorder(
cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
cv2.BORDER_CONSTANT, value=(0, 0, 0)
)
# 对 Y/U/V 三个通道分别做 1 级 Haar DWT
self.ca_shape = [(i + 1) // 2 for i in self.img_shape]
self.ca_block_shape = (
self.ca_shape[0] // self.block_shape[0],
self.ca_shape[1] // self.block_shape[1],
self.block_shape[0], self.block_shape[1]
)
for channel in range(3):
self.ca[channel], self.hvd[channel] = dwt2(
self.img_YUV[:, :, channel], 'haar'
)
# 将 CA(近似系数)转为 4D 分块数组
self.ca_block[channel] = np.lib.stride_tricks.as_strided(
self.ca[channel].astype(np.float32),
self.ca_block_shape,
strides=4 * np.array([
self.ca_shape[1] * self.block_shape[0],
self.block_shape[1], self.ca_shape[1], 1
])
)Audit points:
- Color space: BGR → YUV; the watermark is embedded in the DWT approximation sub-band of the Y (luminance) channel
- DWT levels: only 1-level Haar, not a multi-level decomposition
- Block size: fixed at
4×4, cutting the CA sub-band into non-overlapping small blocks
4.2.3 Block-Level Embedding Flow (block_add_wm_slow)
def block_add_wm_slow(self, arg):
block, shuffler, i = arg
wm_1 = self.wm_bit[i % self.wm_size]
# Step 1: 对 4x4 块做 DCT
block_dct = dct(block)
# Step 2: flatten 后按 shuffler 打乱顺序(块内置乱)
block_dct_shuffled = block_dct.flatten()[shuffler].reshape(self.block_shape)
# Step 3: SVD 分解
u, s, v = svd(block_dct_shuffled)
# Step 4: 在奇异值上嵌入 1 bit 水印
# 量化公式:把 s[0] 量化到 d1 的整数倍,再根据 wm_1 偏移 1/4 个步长
s[0] = (s[0] // self.d1 + 1/4 + 1/2 * wm_1) * self.d1
if self.d2:
s[1] = (s[1] // self.d2 + 1/4 + 1/2 * wm_1) * self.d2
# Step 5: 逆 SVD
block_dct_flatten = np.dot(u, np.dot(np.diag(s), v)).flatten()
# Step 6: 逆置乱
block_dct_flatten[shuffler] = block_dct_flatten.copy()
# Step 7: 逆 DCT
return idct(block_dct_flatten.reshape(self.block_shape))Mathematical principle:
The embedding formula (using s[0] as an example):
s'[0] = (floor(s[0] / d1) + 1/4 + 1/2 * w) * d1
where w ∈ {0, 1} is the watermark bit. During extraction:
w = 1 if (s[0] mod d1) > (d1 / 2) else 0
d1=36 means the quantization interval per bit is 36, and the maximum perturbation to a coefficient is about 0.75 × d1 = 27.
4.2.4 Block-Level Extraction Flow (block_get_wm_slow)
def block_get_wm_slow(self, args):
block, shuffler = args
block_dct_shuffled = dct(block).flatten()[shuffler].reshape(self.block_shape)
u, s, v = svd(block_dct_shuffled)
# 从 s[0] 提取 bit
wm = (s[0] % self.d1 > self.d1 / 2) * 1
if self.d2:
# s[1] 作为辅助,加权平均
tmp = (s[1] % self.d2 > self.d2 / 2) * 1
wm = (wm * 3 + tmp * 1) / 4
return wmAudit points:
d2(default 20) is the auxiliary quantization step;s[0]has weight 3 ands[1]has weight 1- When
d2=0, it degenerates to single-singular-value extraction
4.2.5 Global Embedding Flow (embed)
def embed(self):
self.init_block_index()
embed_ca = copy.deepcopy(self.ca)
# 生成块选择置乱序列(跨块置乱)
self.idx_shuffle = random_strategy1(
self.password_img, self.block_num,
self.block_shape[0] * self.block_shape[1]
)
for channel in range(3):
# 对每个块并行执行 block_add_wm
tmp = self.pool.map(self.block_add_wm, [
(self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i], i)
for i in range(self.block_num)
])
# 写回 4D 数组
for i in range(self.block_num):
self.ca_block[channel][self.block_index[i]] = tmp[i]
# 4D -> 2D,拼接回 CA 子带
self.ca_part[channel] = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
embed_ca[channel][:self.part_shape[0], :self.part_shape[1]] = self.ca_part[channel]
# 逆 DWT
embed_YUV[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")
# 合并三通道,YUV -> BGR,裁剪回原始尺寸
embed_img_YUV = np.stack(embed_YUV, axis=2)
embed_img_YUV = embed_img_YUV[:self.img_shape[0], :self.img_shape[1]]
embed_img = cv2.cvtColor(embed_img_YUV, cv2.COLOR_YUV2BGR)
embed_img = np.clip(embed_img, a_min=0, a_max=255)
if self.alpha is not None:
embed_img = cv2.merge([embed_img.astype(np.uint8), self.alpha])
return embed_imgKey facts:
- Cyclic embedding: the watermark bit sequence is embedded repeatedly in a cycle across the
block_numblocks (wm_bit[i % wm_size]) - Three independent channels: each of the Y/U/V channels embeds a full copy of the watermark; extraction averages them
- In-block scrambling (
idx_shuffle[i]): the order of the 16 DCT coefficients inside each 4×4 block is shuffled - Cross-block order (
block_index): the block traversal order is a fixed row-column scan and is not scrambled
4.2.6 K-Means Binarization (one_dim_kmeans)
def one_dim_kmeans(inputs):
threshold = 0
e_tol = 10 ** (-6)
center = [inputs.min(), inputs.max()]
for i in range(300):
threshold = (center[0] + center[1]) / 2
is_class01 = inputs > threshold
center = [inputs[~is_class01].mean(), inputs[is_class01].mean()]
if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol:
threshold = (center[0] + center[1]) / 2
break
is_class01 = inputs > threshold
return is_class01Used in the extract_with_kmeans mode (Nephele's mode="bit" does not pass through this path; it directly returns wm_avg).
4.2.7 Random Scrambling Strategy
def random_strategy1(seed, size, block_shape):
return np.random.RandomState(seed) \
.random(size=(size, block_shape)) \
.argsort(axis=1)Generates a size × block_shape random matrix and applies argsort per row to obtain each row's scramble index. For password_img, size = block_num and block_shape = 16.
4.3 Engine Wrapper Layer Source
On top of the underlying library, Nephele adds fixed-length encoding, round-trip verification, alpha-channel preservation, and exception fallback.
4.3.1 Constants and Utility Functions
"""
Nephele Workshop - Watermark Protection Module (local)
Embeds/extracts invisible blind watermarks using blind_watermark locally.
No network dependency — works offline.
Uses file-based I/O + bit mode for reliable embedding/extraction
(numpy array mode has known issues with blind_watermark library).
Public API unchanged:
protect_image(image, level, copyright_info) -> Image
extract_watermark(image) -> str | None
save_with_watermark(image, output_path) -> bool
Developer: ArisFusion Studio
"""
import logging
import tempfile
from enum import Enum
from pathlib import Path
from typing import Optional
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
# Watermark payload: fixed 32 bytes (256 bits)
# Enough for 10 Chinese chars (UTF-8, 3 bytes each) or 32 ASCII chars
WATERMARK_BYTES = 32
WATERMARK_BITS = WATERMARK_BYTES * 8
_WM_PASSWORD_IMG = 2024
_WM_PASSWORD_WM = 1314
class ProtectionLevel(Enum):
NONE = "none"
INVISIBLE = "invisible"
LEVEL_ALIASES = {"maximum": "invisible"}
def _text_to_bits(text: str) -> list[int]:
"""Convert text to fixed-length bit array via UTF-8."""
raw = text.encode("utf-8")[:WATERMARK_BYTES]
padded = raw.ljust(WATERMARK_BYTES, b"\x00")
bits = []
for byte in padded:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
return bits
def _bits_to_text(bits: list) -> str:
"""Convert bit array back to text via UTF-8."""
raw = bytearray()
for i in range(0, len(bits), 8):
chunk = bits[i:i+8]
if len(chunk) < 8:
break
val = 0
for b in chunk:
val = (val << 1) | (1 if b > 0.5 else 0)
raw.append(val)
return raw.rstrip(b"\x00").decode("utf-8", errors="replace")Audit points:
- UTF-8 fixed-length truncation: overly long text is silently truncated to 32 bytes
- Threshold decision: during extraction
b > 0.5is treated as 1, giving some tolerance to noise
4.3.2 Embedding Engine
class WatermarkEngine:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def embed(self, image: Image.Image, text: str) -> Image.Image:
"""Embed invisible watermark using file-based blind_watermark."""
try:
from blind_watermark import WaterMark
rgb = image.convert("RGB")
alpha = image.split()[3] if image.mode == "RGBA" else None
with tempfile.TemporaryDirectory() as tmpdir:
orig_path = str(Path(tmpdir) / "orig.png")
wm_path = str(Path(tmpdir) / "watermarked.png")
rgb.save(orig_path, format="PNG")
bits = _text_to_bits(text)
bwm = WaterMark(password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM)
bwm.read_img(orig_path)
bwm.read_wm(np.array(bits), mode="bit")
bwm.embed(wm_path)
result_img = Image.open(wm_path).convert("RGB")
# Verify extraction round-trip
extracted_bits = WaterMark(
password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
).extract(wm_path, wm_shape=WATERMARK_BITS, mode="bit")
extracted_text = _bits_to_text(extracted_bits)
if extracted_text == text[:WATERMARK_BYTES]:
logger.info("[Watermark] Verified: '%s'", extracted_text)
else:
logger.warning("[Watermark] Verify mismatch: '%s' -> '%s'",
text[:WATERMARK_BYTES], extracted_text)
if alpha:
result_img = result_img.convert("RGBA")
result_img.putalpha(alpha)
return result_img
except Exception as e:
logger.error("Embed failed: %s", e, exc_info=True)
return image
Audit points:
- Singleton pattern:
WatermarkEngineis a singleton, but a newblind_watermark.WaterMarkinstance is created each time - File-level I/O: it works via a
TemporaryDirectory+ a temporary PNG file, sidestepping the dtype/shape compatibility bugs of the numpy-array mode - A verification failure only logs a warning and still returns the watermarked image
- Alpha channel: RGBA input is first converted to RGB for embedding, then alpha is restored afterward
- Exception fallback: any exception returns the original
image, with the caller unaware of the failure
4.3.3 Extraction Engine
def extract(self, image: Image.Image) -> Optional[str]:
"""Extract invisible watermark using file-based blind_watermark."""
try:
from blind_watermark import WaterMark
rgb = image.convert("RGB")
bit_len = WATERMARK_BITS
with tempfile.TemporaryDirectory() as tmpdir:
img_path = str(Path(tmpdir) / "check.png")
rgb.save(img_path, format="PNG")
extracted_bits = WaterMark(
password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
).extract(img_path, wm_shape=bit_len, mode="bit")
text = _bits_to_text(extracted_bits)
return text.strip() if text.strip() else None
except Exception as e:
logger.warning("Extract failed: %s", e)
return None
Audit points:
- A failed extraction returns
None, making it impossible to distinguish "the image has no watermark" from "an error occurred during extraction" - An empty string (all-zero padding) likewise returns
Noneafterstrip()
4.3.4 Public API
def protect_image(
image: Image.Image,
level: str = "none",
copyright_info: str = "ARIS"
) -> Image.Image:
level = LEVEL_ALIASES.get(level, level)
if level == "invisible":
return WatermarkEngine().embed(image, copyright_info)
return image
def extract_watermark(image: Image.Image) -> Optional[str]:
return WatermarkEngine().extract(image)
def save_with_watermark(image: Image.Image, output_path: str) -> bool:
try:
image.save(output_path, format='PNG')
return True
except Exception as e:
logger.warning("Save failed: %s", e)
return False4.4 Business-Layer Invocation
def pack_image(
input_path: str,
watermark_path: Optional[str] = None,
output_dir: Optional[str] = None,
watermark_mode: str = "center",
watermark_opacity: float = 0.3,
preview_max_size: int = 1920,
thumbnail_max_size: int = 500,
protection_level: str = "none",
copyright_info: str = "© ArisFusion Studio",
output_folder_name: str = "Delivery_Pack",
) -> dict:
"""One-click image packing: HD + preview + thumbnail."""
try:
src = Path(input_path)
if not src.exists():
return api_err(f"文件不存在: {input_path}")
mode_map = {"center": WatermarkMode.CENTER, "tile": WatermarkMode.TILE}
wm_mode = mode_map.get(watermark_mode, WatermarkMode.CENTER)
wm_path = Path(watermark_path) if watermark_path else None
if wm_path and not wm_path.exists():
return api_err(f"水印文件不存在: {watermark_path}")
out_dir = Path(output_dir) if output_dir else None
packer = DeliveryPacker(
preview_max_size=preview_max_size,
thumbnail_max_size=thumbnail_max_size,
watermark_opacity=watermark_opacity,
protection_level=protection_level,
copyright_info=copyright_info,
output_folder_name=output_folder_name,
)
result_dir, results = packer.process_image(
input_path=src,
watermark_path=wm_path,
output_dir=out_dir,
watermark_mode=wm_mode,
)
file_info = {k: str(v) for k, v in results.items()}
return api_ok(
f"打包完成,共生成 {len(results)} 个文件",
output_path=str(result_dir),
data={"files": file_info},
)
except PackerError as e:
logger.error("打包失败: %s", e)
return api_err(str(e))
except Exception as e:
logger.exception("打包时发生意外错误")
return api_err(f"意外错误: {e}")
Key facts:
protection_leveldefaults to"none", i.e. the invisible watermark is disabled by defaultcopyright_infois at most 32 bytes (silently truncated if longer)
4.5 Worker-Layer Implementation
"""Watermark Extraction Worker"""
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication
_tr = QCoreApplication.translate
from .._utils import ensure_src_path
class WatermarkExtractWorker(QThread):
"""Worker thread for watermark extraction (rivaGan model loading is slow)."""
finished = Signal(str) # watermark result or empty string
logMessage = Signal(str, str) # (message, level)
def __init__(self, image_path: str):
super().__init__()
self.image_path = image_path
def run(self):
"""Extract watermark in background thread."""
try:
ensure_src_path()
from PIL import Image
from tools.packer.watermark_protection import extract_watermark
img_path = Path(self.image_path)
if not img_path.exists():
self.logMessage.emit(_tr("WatermarkExtractWorker", "文件不存在: %s") % self.image_path, "error")
self.finished.emit("")
return
self.logMessage.emit(_tr("WatermarkExtractWorker", "正在提取水印: %s") % img_path.name, "info")
image = Image.open(img_path)
watermark = extract_watermark(image)
if watermark:
self.logMessage.emit(_tr("WatermarkExtractWorker", "提取成功: %s") % watermark, "success")
self.finished.emit(watermark)
else:
self.logMessage.emit("未检测到隐形水印", "warning")
self.finished.emit("")
except Exception as e:
self.logMessage.emit(f"提取失败: {str(e)}", "error")
self.finished.emit("")4.6 Capacity and Encoding
| Encoding | Bytes per character | Max characters |
|---|---|---|
| ASCII | 1 | 32 |
| CJK (UTF-8) | 3 | 10 |
| Mixed | — | Depends on the specific characters |
5. White-Box Source Audit of AI Metadata Detection
This section audits Nephele Workshop's AI metadata / C2PA credential detection feature. This feature reads machine-readable evidence already present in image files, including C2PA content credentials, generation-tool metadata, platform declarations, and export traces. It does not use a visual style classification model, and it does not interpret "not detected" as "not AI".
5.1 Architecture Overview
AI metadata detection is split into four layers:
| Layer | File | Responsibility |
|---|---|---|
| Rule layer | tools/validator/logic.py:MetaDataDetector | Metadata reading, rule matching, evidence grading, final status output |
| C2PA layer | tools/validator/c2pa_verifier.py | Official C2PA SDK adapter, manifest reading, signature-chain and trust-state parsing |
| Worker layer | core/workers/ai_detector_worker.py | Batch detection thread, error isolation, result signals |
| UI layer | gui/qml/views/AIValidatorView.qml | Maps raw evidence into user-friendly evidence labels |
Data flow:
用户选择图片
-> core/workers/ai_detector_worker.py
-> tools/validator/logic.py:MetaDataDetector.detect()
├── Pillow 读取 PNG/JPEG/WebP/TIFF 元数据(PNG info / EXIF)
├── tools/validator/c2pa_verifier.py:verify_c2pa_file()(官方 SDK)
└── 原始字节扫描(JUMBF / APP11 fallback)
-> 返回 {status, reason, tool, evidence}
-> UI 映射证据标签Detection result structure:
{
"status": "ai" | "unknown" | "human" | "error",
"reason": str,
"tool": str,
"evidence": str,
}注意
status="human" is a legacy field name. The UI should not present it as "definitively a human work", but rather as "no credentials found" or "insufficient credentials".
5.2 The MetaDataDetector Rule Layer
tools/validator/logic.py:MetaDataDetector is the main rule-matching class. It holds no state of its own, and __init__ is empty.
5.2.1 Rule Constants
class MetaDataDetector:
"""
Detects AI generation metadata from image files using heuristic analysis
of EXIF, PNG info chunks, and generation parameters.
"""
# 1. 明确的软件签名 (强特征,优先匹配专有字符串)
# 顺序:Midjourney 先于 Gemini,以避免误判
AI_SOFTWARE_SIGNATURES = {
"Midjourney": [
re.compile(r"job id:\s*[a-f0-9\-]+", re.IGNORECASE), # 专有 Job ID
re.compile(r"--ar\s*\d+:\d+", re.IGNORECASE), # --ar 参数
re.compile(r"--v\s*\d+", re.IGNORECASE), # --v 参数
re.compile(r"--stylize\s*\d+", re.IGNORECASE), # --stylize
re.compile(r"midjourney", re.IGNORECASE), # 显式名称
re.compile(r"mj v", re.IGNORECASE),
re.compile(r"mj_", re.IGNORECASE),
],
"ComfyUI": [
re.compile(r"comfyui", re.IGNORECASE),
re.compile(r"comfyland", re.IGNORECASE),
# workflow/prompt JSON 在 detect 中单独处理
],
"Gemini (Google)": [
re.compile(r"gemini", re.IGNORECASE),
re.compile(r"google deepmind", re.IGNORECASE),
re.compile(r"generated by google", re.IGNORECASE),
re.compile(r"google imagen", re.IGNORECASE), # 必须有 "google"
re.compile(r"imagen by google", re.IGNORECASE),
re.compile(r"synthid", re.IGNORECASE), # Google SynthID 水印
re.compile(r"nano banana", re.IGNORECASE), # 你的工具标识
re.compile(r"nanobanana", re.IGNORECASE),
],
"DALL-E": [re.compile(r"dall-e", re.IGNORECASE), re.compile(r"dalle", re.IGNORECASE)],
"NovelAI": [re.compile(r"novelai", re.IGNORECASE), re.compile(r"nai-diffusion", re.IGNORECASE)],
"InvokeAI": [re.compile(r"invokeai", re.IGNORECASE), re.compile(r"invoke ai", re.IGNORECASE)],
"Fooocus": [re.compile(r"fooocus", re.IGNORECASE)],
"Stable Diffusion": [re.compile(r"stable diffusion", re.IGNORECASE), re.compile(r"sd\.?next", re.IGNORECASE), re.compile(r"forge", re.IGNORECASE)],
"Leonardo.ai": [re.compile(r"leonardo.ai", re.IGNORECASE)],
"Adobe Firefly": [re.compile(r"adobe firefly", re.IGNORECASE)],
"Bing Image Creator": [re.compile(r"bing image creator", re.IGNORECASE)],
}
# 2. 生成参数指纹 (次优先,当没有软件名时)
# 移除 Gemini 专有,将 trainedAlgorithmicMedia 作为通用 AI 标记
GENERATION_PARAM_FINGERPRINTS = [
(r"Steps:\s*\d+", "Stable Diffusion (Parameters)"),
(r"CFG scale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
(r"Sampler:\s*\w+", "Stable Diffusion (Parameters)"),
(r"Seed:\s*\d+", "Stable Diffusion (Parameters)"),
(r"Model hash:\s*[a-f0-9]+", "Stable Diffusion (Parameters)"),
(r"Model:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
(r"Negative prompt:", "Stable Diffusion (Parameters)"),
(r"Size:\s*\d+x\d+", "Stable Diffusion (Parameters)"),
(r"Clip skip:\s*\d+", "Stable Diffusion (Parameters)"),
(r"Schedule type:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
(r"Denoising strength:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
(r"Hires upscale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
# 更新 regex 以匹配 IPTC/XMP 变体(包括 URL)
(r"DigitalSourceType\s*[:=]\s*(?:http://cv\.iptc\.org/newscodes/digitalsourcetype/)?trainedAlgorithmicMedia", "Generative AI (IPTC/XMP Standard)"),
]
RAW_METADATA_SCAN_LIMIT = 128 * 1024 * 1024
C2PA_CONTAINER_MARKERS = [
b"c2pa",
b"jumbf",
b"content credentials",
b"contentcredentials",
b"contentauth",
]
C2PA_AI_MARKERS = [
b"trainedalgorithmicmedia",
b"compositewithtrainedalgorithmicmedia",
b"algorithmicmedia",
b"generated by ai",
b"ai generated",
b"adobe firefly",
b"google imagen",
b"synthid",
b"dall-e",
b"dalle",
b"midjourney",
b"stable diffusion",
]Audit points:
- Rule hierarchy: software signatures (strong) → parameter fingerprints (secondary) → C2PA container + AI marker (strong) → weak features (file name)
- Order-sensitive:
AI_SOFTWARE_SIGNATURESis adict, and Python 3.7+ preserves insertion order.Midjourneyis listed beforeGemini (Google), to prevent a Midjourney image that references a Google tool internally from being misclassified as Gemini. - SynthID ambiguity: the
synthidstring is treated as strong evidence for Gemini, but this is only a string match — this module does not decode the SynthID pixel watermark. - Generic IPTC marker:
trainedAlgorithmicMediais not exclusive to Google; it is attributed to Gemini only when Google evidence also appears in the same text, otherwise it is classified asGenerative AI (Unknown). - Raw-scan upper limit: 128 MB; beyond that, the byte-level fallback is skipped (relying only on Pillow and the C2PA SDK).
5.2.2 The detect() Main Flow
def detect(self, image_path: str) -> Dict[str, str]:
"""
检测图像文件中的 AI 生成元数据
Args:
image_path: 图像文件路径
Returns:
检测结果字典,包含:
- status: "ai" | "human" | "error"
- reason: 检测原因描述
- tool: 检测到的 AI 工具名称(如果为 AI)
- evidence: 证据描述
"""
img_path = Path(image_path)
if not img_path.exists():
return {"status": "error", "reason": "文件不存在", "tool": "", "evidence": ""}
evidence_found = []
weak_evidence_found = []
context_evidence_found = []
detected_tool = None
extension_mismatch = False
minimal_web_jpeg = False
try:
with Image.open(img_path) as img:
img.load() # Ensure header is loaded
actual_format = (img.format or "").upper()
suffix = img_path.suffix.lower()
if actual_format == "JPEG" and suffix not in {".jpg", ".jpeg", ".jpe"}:
extension_mismatch = True
context_evidence_found.append(f"File extension mismatch: {suffix} file contains JPEG data")
elif actual_format == "PNG" and suffix != ".png":
extension_mismatch = True
context_evidence_found.append(f"File extension mismatch: {suffix} file contains PNG data")
minimal_jpeg_keys = {"jfif", "jfif_density", "jfif_unit", "jfif_version", "progression", "progressive"}
if actual_format == "JPEG" and set(img.info.keys()).issubset(minimal_jpeg_keys):
minimal_web_jpeg = True
context_evidence_found.append("Minimal JPEG metadata only")
# --- 1. Check PNG Info / tEXt chunks ---
if hasattr(img, 'info') and img.info:
software_value = img.info.get("Software") or img.info.get("software")
if isinstance(software_value, bytes):
software_text = software_value.decode("utf-8", errors="ignore")
else:
software_text = str(software_value or "")
software_lower = software_text.lower()
if "celsys" in software_lower or "clip studio" in software_lower:
context_evidence_found.append("Edited/exported by CELSYS/Clip Studio Paint")
if not detected_tool:
for key, val in img.info.items():
if not isinstance(val, (str, bytes)):
continue
structured_res = self._detect_structured_generator_metadata(key, val)
if structured_res:
detected_tool, marker = structured_res
evidence_found.append(marker)
break
# Case A: A1111 / SD
if 'parameters' in img.info:
val = img.info['parameters']
if isinstance(val, str) and ("Steps:" in val or "Prompt" in val):
detected_tool = "Stable Diffusion (A1111)"
evidence_found.append("Stable Diffusion parameters chunk")
res = self._analyze_text(val)
if res:
detected_tool, marker = res
evidence_found.append(f"Parameters: {marker}")
# Case B: ComfyUI (专有检查)
if not detected_tool and ('workflow' in img.info or 'prompt' in img.info):
# 验证是否是 JSON
try:
if 'workflow' in img.info:
json.loads(img.info['workflow'])
evidence_found.append("Valid 'workflow' JSON")
if 'prompt' in img.info:
json.loads(img.info['prompt'])
evidence_found.append("Valid 'prompt' JSON")
detected_tool = "ComfyUI"
except json.JSONDecodeError:
pass # 非 JSON,忽略
# Case C: Generic Scan (其他 info)
if not detected_tool:
for key, val in img.info.items():
if isinstance(val, (str, bytes)):
val_str = self._metadata_value_to_text(val)
res = self._analyze_text(val_str)
if res:
detected_tool, marker = res
evidence_found.append(f"PNG Info '{key}': {marker}")
break
for key, val in img.info.items():
if not isinstance(val, (str, bytes)):
continue
val_str = val.decode("utf-8", errors="ignore") if isinstance(val, bytes) else str(val)
if "DigitalSourceType" in val_str and "trainedAlgorithmicMedia" in val_str:
marker = f"PNG Info '{key}': IPTC/XMP trainedAlgorithmicMedia"
if marker not in evidence_found:
evidence_found.append(marker)
# --- 2. Check EXIF / XMP Data ---
if not detected_tool:
exif = img.getexif()
if exif:
for tag_id, value in exif.items():
tag_name = ExifTags.TAGS.get(tag_id, str(tag_id))
# Handle UserComment or other bytes
if isinstance(value, bytes):
try:
value_str = value.decode('utf-8', errors='ignore')
except:
continue
else:
value_str = str(value)
res = self._analyze_text(value_str)
if res:
detected_tool, marker = res
evidence_found.append(f"EXIF {tag_name}: {marker}")
break
except Exception as e:
return {"status": "error", "reason": f"读取错误: {str(e)}", "tool": "", "evidence": ""}
# --- 3. Official C2PA manifest/signature verification ---
c2pa_available = False
c2pa_has_manifest = False
c2pa_claim_generator = ""
try:
from tools.validator.c2pa_verifier import verify_c2pa_file
c2pa_result = verify_c2pa_file(img_path)
c2pa_available = c2pa_result.available
if c2pa_result.has_manifest:
c2pa_has_manifest = True
c2pa_claim_generator = c2pa_result.claim_generator or ""
evidence_found.append(c2pa_result.evidence_summary())
if c2pa_result.ai_generated:
detected_tool = "Generative AI (C2PA Content Credentials)"
elif c2pa_result.available and c2pa_result.error:
evidence_found.append(f"C2PA verification error: {c2pa_result.error}")
elif c2pa_result.available:
evidence_found.append("No C2PA manifest found")
except Exception as e:
evidence_found.append(f"C2PA verification error: {e}")
# --- 4. Raw metadata fallback: C2PA/JUMBF/XMP payloads ---
if not detected_tool and not c2pa_has_manifest:
res, has_c2pa_container = self._scan_raw_metadata(img_path)
if res:
detected_tool, marker = res
prefix = "Raw metadata"
if not c2pa_available and has_c2pa_container:
prefix = "Raw metadata (official C2PA verifier unavailable)"
evidence_found.append(f"{prefix}: {marker}")
elif has_c2pa_container:
if c2pa_available:
evidence_found.append("C2PA Content Credentials found, no AI generation marker")
else:
evidence_found.append("C2PA Content Credentials found, official verifier unavailable")
# --- 5. Visible Google/Gemini watermark ---
google_context = "google" in c2pa_claim_generator.lower()
filename_lower = img_path.name.lower()
filename_gemini_hint = "gemini" in filename_lower or "google" in filename_lower
if not detected_tool and (google_context or filename_gemini_hint):
if self._detect_google_visible_watermark(img_path):
detected_tool = "Gemini (Google Visible Watermark)"
evidence_found.append("Visible watermark: Google/Gemini sparkle mark")
# --- 6. Final Fallback: Filename Check ---
if not detected_tool:
res = self._analyze_text(filename_lower, include_weak_markers=False)
if res:
weak_tool, marker = res
if c2pa_has_manifest:
weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")
else:
weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")
if extension_mismatch and minimal_web_jpeg:
context_evidence_found.append("Downloaded file appears re-encoded or metadata-stripped")
# --- Result Construction ---
if detected_tool:
return {
"status": "ai",
"reason": f"检测到 {detected_tool} 元数据",
"tool": detected_tool,
"evidence": "; ".join(evidence_found + context_evidence_found)
}
else:
if weak_evidence_found:
return {
"status": "unknown",
"reason": "AI indicators found, but C2PA credentials do not declare AI generation",
"tool": "",
"evidence": "; ".join(evidence_found + context_evidence_found + weak_evidence_found)
}
reason = "C2PA Content Credentials do not declare AI generation" if c2pa_has_manifest else "No known AI generation metadata detected"
return {
"status": "human",
"reason": reason,
"tool": "",
"evidence": "; ".join(evidence_found + context_evidence_found) if evidence_found or context_evidence_found else "No metadata signatures found"Audit points (execution order):
- Open a Pillow handle and identify contextual evidence such as format/suffix mismatch, minimal JPEG metadata, and CELSYS export (these do not upgrade the status)
- PNG info:
parameters→ A1111;workflow/promptJSON → ComfyUI; other keys are identified via_detect_structured_generator_metadata()as NovelAI / InvokeAI / Fooocus; finally a generic_analyze_text()pass is run - EXIF: run
_analyze_text()on each tag - Official C2PA SDK (
verify_c2pa_file): if a manifest exists, grabclaim_generatorand AI markers - Raw-byte fallback: executed only when the first two steps both miss; it reads the entire file header (≤ 128 MB) and searches for
c2pa / jumbf / contentauthcontainers + AI-marker strings - Google visible watermark: enabled only when "there is a Google
claim_generator" or "the file name contains gemini / google", to avoid a visual scan of every image - Weak file-name hints: never trigger
ai, at most produceunknown
Status convergence rules:
| Evidence combination | status |
|---|---|
detected_tool is assigned (any strong evidence hit) | ai |
No strong evidence, but weak_evidence_found (a file-name hint) | unknown |
| No evidence at all + a C2PA manifest exists but does not declare AI | human (reason: "C2PA Content Credentials do not declare AI generation") |
| No evidence at all + no manifest | human (reason: "No known AI generation metadata detected") |
| Pillow raises an exception | error |
注意
Both "no metadata" and "a C2PA manifest exists but declares non-AI" map to human in terms of status; they can only be distinguished via reason. The UI must read reason, otherwise it will mistakenly present an image whose "metadata has been stripped" as "confirmed non-AI".
5.2.3 Metadata Helper Parsing
def _decode_metadata_bytes(self, data: bytes) -> str:
"""Best-effort decoding for embedded XMP/C2PA text inside binary assets."""
if not data:
return ""
text = data.decode("utf-8", errors="ignore")
if len(text.strip()) < 8:
text = data.decode("latin-1", errors="ignore")
return text
def _metadata_value_to_text(self, value: Any) -> str:
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
return str(value)
def _json_loads(self, value: str) -> Any:
try:
return json.loads(value)
except Exception:
return None
def _json_has_keys(self, value: Any, keys: set[str]) -> bool:
if isinstance(value, dict):
lowered = {str(key).lower() for key in value.keys()}
if lowered.intersection(keys):
return True
return any(self._json_has_keys(item, keys) for item in value.values())
if isinstance(value, list):
return any(self._json_has_keys(item, keys) for item in value)
return False
def _detect_structured_generator_metadata(self, key: str, value: Any) -> Optional[Tuple[str, str]]:
text = self._metadata_value_to_text(value)
lowered_key = key.lower()
lowered_text = text.lower()
parsed = self._json_loads(text)
if "invoke" in lowered_key or "invokeai" in lowered_text or "invoke ai" in lowered_text:
return "InvokeAI", f"PNG Info '{key}': InvokeAI metadata"
if "fooocus" in lowered_key or "fooocus" in lowered_text:
return "Fooocus", f"PNG Info '{key}': Fooocus metadata"
if "novelai" in lowered_text or "nai-diffusion" in lowered_text:
return "NovelAI", f"PNG Info '{key}': NovelAI metadata"
if isinstance(parsed, dict):
has_generation_keys = self._json_has_keys(
parsed,
{"sampler", "sampler_name", "steps", "scale", "cfg_scale", "seed", "model", "model_hash", "uc"},
)
has_novelai_shape = self._json_has_keys(parsed, {"uc"}) and self._json_has_keys(parsed, {"sampler", "steps", "scale"})
if has_novelai_shape:
return "NovelAI", f"PNG Info '{key}': NovelAI generation JSON"
if "invoke" in lowered_key and has_generation_keys:
return "InvokeAI", f"PNG Info '{key}': InvokeAI generation JSON"
if lowered_key in {"sd-metadata", "sd_metadata", "generation_data", "generation_data_formatted"} and has_generation_keys:
return "Stable Diffusion", f"PNG Info '{key}': Stable Diffusion generation JSON"
return None
Audit points:
- NovelAI criterion: a JSON containing both
ucand one of (sampler/steps/scale) → NovelAI generation JSON. This shape-based criterion allows NovelAI to remain identifiable even after explicit names have been stripped. - InvokeAI / SD JSON: both "key name + generation field" must hit simultaneously, to prevent arbitrary JSON from being treated as generation metadata.
5.2.4 Raw-Byte Scan (C2PA/JUMBF fallback)
def _analyze_c2pa_bytes(self, data: bytes) -> Optional[Tuple[str, str]]:
"""Detect AI signals embedded in C2PA/JUMBF Content Credentials payloads."""
if not data:
return None
lowered = data.lower()
has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
if not has_c2pa_container:
return None
for marker in self.C2PA_AI_MARKERS:
if marker in lowered:
return "Generative AI (C2PA Content Credentials)", marker.decode("ascii", errors="ignore")
text = self._decode_metadata_bytes(data)
res = self._analyze_text(text, include_weak_markers=False)
if res:
tool, marker = res
return tool, f"C2PA payload: {marker}"
return None
# PNG ancillary chunks that may carry metadata text. Skip IDAT/PLTE/etc.
PNG_METADATA_CHUNKS = {b"tEXt", b"iTXt", b"zTXt", b"eXIf", b"iCCP", b"caBX", b"jumb"}
# WebP RIFF chunks that may carry metadata.
WEBP_METADATA_CHUNKS = {b"EXIF", b"XMP ", b"ICCP", b"JUMB"}
def _extract_png_metadata(self, data: bytes) -> bytes:
out = bytearray()
pos = 8 # skip signature
n = len(data)
iend_end = n
while pos + 12 <= n:
try:
length = int.from_bytes(data[pos:pos + 4], "big")
chunk_type = data[pos + 4:pos + 8]
except Exception:
break
data_end = pos + 8 + length
if length < 0 or data_end + 4 > n:
break
if chunk_type in self.PNG_METADATA_CHUNKS:
out.extend(data[pos + 8:data_end])
out.append(0)
if chunk_type == b"IEND":
iend_end = data_end + 4
break
pos = data_end + 4 # skip CRC
# Some pipelines append C2PA/JUMBF payloads after IEND. Include any
# trailing bytes verbatim — they cannot be pixel data.
if iend_end < n:
out.extend(data[iend_end:])
return bytes(out)
def _extract_jpeg_metadata(self, data: bytes) -> bytes:
out = bytearray()
pos = 2 # skip SOI (FFD8)
n = len(data)
while pos + 4 <= n:
if data[pos] != 0xFF:
break
# Skip fill bytes
while pos < n and data[pos] == 0xFF:
pos += 1
if pos >= n:
break
marker = data[pos]
pos += 1
# Standalone markers without length payload
if marker == 0xD9: # EOI
break
if marker == 0xDA: # SOS — compressed image data starts here
break
if marker == 0x00 or marker == 0x01 or 0xD0 <= marker <= 0xD8:
continue
if pos + 2 > n:
break
seg_len = int.from_bytes(data[pos:pos + 2], "big")
if seg_len < 2 or pos + seg_len > n:
break
seg_data = data[pos + 2:pos + seg_len]
# APP0..APP15 (E0..EF) and COM (FE) carry text-style metadata
if 0xE0 <= marker <= 0xEF or marker == 0xFE:
out.extend(seg_data)
out.append(0)
pos += seg_len
return bytes(out)
def _extract_webp_metadata(self, data: bytes) -> bytes:
out = bytearray()
n = len(data)
if n < 12:
return b""
pos = 12 # skip RIFF + size + WEBP
while pos + 8 <= n:
fourcc = data[pos:pos + 4]
size = int.from_bytes(data[pos + 4:pos + 8], "little")
if size < 0 or pos + 8 + size > n:
break
if fourcc in self.WEBP_METADATA_CHUNKS:
out.extend(data[pos + 8:pos + 8 + size])
out.append(0)
pos += 8 + size + (size & 1) # chunks are padded to even size
return bytes(out)
def _extract_metadata_blocks(self, data: bytes) -> bytes:
"""
Return bytes from text/metadata-bearing containers only, skipping
pixel/compressed payloads. This prevents random regex hits in raw
IDAT / SOS data from being mistaken for AI markers.
"""
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return self._extract_png_metadata(data)
if data[:2] == b"\xff\xd8":
return self._extract_jpeg_metadata(data)
if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
return self._extract_webp_metadata(data)
# Unknown format: keep legacy behavior (whole file).
return data
def _scan_raw_metadata(self, image_path: Path) -> Tuple[Optional[Tuple[str, str]], bool]:
"""
Scan only metadata-bearing chunks/segments of an image, not pixel data.
C2PA manifests live in PNG `caBX`/`jumb` chunks or JPEG APP11 segments.
Pillow may not surface them through Image.info, so we parse the
container ourselves and feed only those bytes to the text analyzer.
"""
try:
size = image_path.stat().st_size
if size > self.RAW_METADATA_SCAN_LIMIT:
return None, False
data = image_path.read_bytes()
except OSError:
return None, False
metadata_bytes = self._extract_metadata_blocks(data)
lowered = metadata_bytes.lower()
has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
c2pa_res = self._analyze_c2pa_bytes(metadata_bytes)
if c2pa_res:
return c2pa_res, has_c2pa_container
text = self._decode_metadata_bytes(metadata_bytes)
return self._analyze_text(text, include_weak_markers=False), has_c2pa_container
Audit points:
- Executed only when "the official C2PA SDK could not read a manifest" and "PNG info/EXIF produced no hit", to avoid a full-file scan of every image
- It scans the entire file's bytes; performance cost depends on file size; files > 128 MB are skipped directly
- If a container is found but there is no AI marker →
has_c2pa_container=True, anddetect()attaches it as contextual information in the evidence without upgrading the status - This path has no defense against maliciously forged C2PA strings (see §5.3 — the signature verification done by the official SDK is the trustworthy path)
5.2.5 Text Rule Analysis
def _analyze_text(self, text: str, include_weak_markers: bool = True) -> Optional[Tuple[str, str]]:
"""
Analyze a string for AI markers.
Returns: (tool_name, found_marker) or None
检测优先级:
1. 先检查明确的软件签名(使用 regex 更严格匹配)
2. 再检查生成参数指纹(如果 IPTC 标记,检查是否有 Google 证据,否则通用)
"""
if not text:
return None
text_lower = text.lower()
# 1. Check Explicit Software Names (按字典顺序,Midjourney 先)
weak_patterns = {r"mj_"}
for tool, patterns in self.AI_SOFTWARE_SIGNATURES.items():
for pattern in patterns:
if not include_weak_markers and pattern.pattern in weak_patterns:
continue
if pattern.search(text_lower):
return tool, pattern.pattern
# 2. Check Generation Parameter Fingerprints
match_count = 0
evidence = []
detected_tool = None
weak_evidence_found = []
for pattern, tool_name in self.GENERATION_PARAM_FINGERPRINTS:
match = re.search(pattern, text, re.IGNORECASE)
if match:
if "IPTC/XMP Standard" in tool_name:
# 如果是通用 IPTC,检查是否有 Google 证据
if any(re.search(p, text_lower) for p in self.AI_SOFTWARE_SIGNATURES["Gemini (Google)"]):
return "Gemini (Google)", "IPTC/XMP with Google Evidence"
else:
detected_tool = "Generative AI (Unknown)"
evidence.append("IPTC/XMP Signature")
else:
match_count += 1
evidence.append(pattern)
if match_count >= 1:
return "Stable Diffusion WebUI", "Generation Parameters Detected"
if detected_tool:
return detected_tool, "; ".join(evidence)
return None
Audit points:
include_weak_markers=Falseis the file-name scanning mode: a two-character prefix likemj_is far too easy to false-match (for example,mj_portrait.jpg), so it is suppressed in the file-name context- A parameter-fingerprint match of ≥ 1 upgrades to SD: a single
Steps:orSampler:is enough to determine SD — lenient, but it may produce false positives on non-AI images "where the user copied SD parameters into a comment" - IPTC attribution branch:
trainedAlgorithmicMediais attributed to Gemini when it co-occurs with a Google keyword, otherwise markedGenerative AI (Unknown). It is never attributed to Midjourney / DALL-E
5.2.6 Visible Google/Gemini Watermark
def _detect_google_visible_watermark(self, image_path: Path) -> bool:
"""
Detect the visible Google/Gemini sparkle mark often placed near the
lower-right area of generated images. This is visual evidence, not a
SynthID decoder.
"""
try:
with Image.open(image_path) as img:
img = img.convert("RGB")
scale = 512 / max(img.size)
if scale < 1:
img = img.resize((round(img.width * scale), round(img.height * scale)))
width, height = img.size
pix = img.load()
mask = set()
for y in range(height // 2, height):
for x in range(width // 2, width):
r, g, b = pix[x, y]
saturation = max(r, g, b) - min(r, g, b)
luminance = (r * 299 + g * 587 + b * 114) // 1000
if 110 <= luminance <= 245 and saturation < 28:
if not (luminance > 235 and saturation < 8):
mask.add((x, y))
seen = set()
for pt in list(mask):
if pt in seen:
continue
stack = [pt]
seen.add(pt)
xs = []
ys = []
while stack:
x, y = stack.pop()
xs.append(x)
ys.append(y)
for nx in (x - 1, x, x + 1):
for ny in (y - 1, y, y + 1):
npt = (nx, ny)
if npt in mask and npt not in seen:
seen.add(npt)
stack.append(npt)
area = len(xs)
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
comp_w = max_x - min_x + 1
comp_h = max_y - min_y + 1
center_x = (min_x + max_x) / 2
center_y = (min_y + max_y) / 2
density = area / max(1, comp_w * comp_h)
if not (80 <= area <= 900):
continue
if not (14 <= comp_w <= 60 and 14 <= comp_h <= 80):
continue
if not (0.12 <= density <= 0.70):
continue
if center_x < width * 0.62 or center_y < height * 0.55:
continue
# A sparkle mark has a sparse center-heavy diamond shape.
mid_x = (min_x + max_x) / 2
mid_y = (min_y + max_y) / 2
near_center = sum(
1 for x, y in zip(xs, ys)
if abs(x - mid_x) <= comp_w * 0.25 and abs(y - mid_y) <= comp_h * 0.25
)
if near_center / area >= 0.18:
return True
except Exception:
return False
return False
Audit points:
- Not a SynthID decoder, a purely morphological discriminator: low saturation, medium-to-high brightness, lower-right quadrant, area 80–900 px, aspect-ratio constraint, dense center
- The long edge is scaled to 512 px to normalize the criteria
- Enabled only in a Google context (see step 5 of the §5.2.2 flow), to avoid running an O(W·H) scan on every image
- Miss scenarios: a white-background image, the lower-right corner cropped out, or heavy recompression that fragments the sparkle's connected component
- False-positive scenarios: the lower-right corner originally contains a low-saturation decorative element (moon, star, logo, etc.)
5.3 Official C2PA SDK Adapter
tools/validator/c2pa_verifier.py isolates the optional c2pa-python dependency into a separate module. When installed, it reads the manifest via the official SDK and requires the SDK to verify the manifest / signature chain.
5.3.1 C2PAVerificationResult
AI_DIGITAL_SOURCE_MARKERS = {
"trainedalgorithmicmedia",
"compositewithtrainedalgorithmicmedia",
"algorithmicmedia",
"generated by ai",
"ai generated",
"adobe firefly",
"google imagen",
"synthid",
"dall-e",
"dalle",
"midjourney",
"stable diffusion",
}
@dataclass
class C2PAVerificationResult:
available: bool
has_manifest: bool = False
verified: bool | None = None
trusted: bool | None = None
validation_state: str = ""
validation_results: dict[str, Any] | None = None
manifest_store: dict[str, Any] | None = None
active_manifest: dict[str, Any] | None = None
sdk_version: str = ""
embedded: bool | None = None
remote_url: str | None = None
ai_markers: list[str] = field(default_factory=list)
claim_generator: str = ""
error: str = ""
asset_format: str = ""
extension_mismatch: bool = False
validation_issues: list[str] = field(default_factory=list)
@property
def ai_generated(self) -> bool:
return bool(self.ai_markers)
def evidence_summary(self) -> str:
if not self.has_manifest:
if self.available:
parts = ["No C2PA manifest found"]
if self.asset_format:
parts.append(f"asset_format={self.asset_format}")
if self.extension_mismatch:
parts.append("extension_mismatch=true")
return "; ".join(parts)
return self.error or "C2PA verifier unavailable"
parts = ["C2PA manifest found"]
if self.validation_state:
parts.append(f"validation_state={self.validation_state}")
if self.verified is not None:
parts.append(f"signature_chain={'verified' if self.verified else 'failed'}")
if self.trusted is not None:
parts.append(f"trust={'trusted' if self.trusted else 'untrusted'}")
if self.claim_generator:
parts.append(f"claim_generator={self.claim_generator}")
if self.ai_markers:
parts.append("ai_markers=" + ",".join(self.ai_markers[:4]))
if self.validation_issues:
parts.append("validation_issues=" + ",".join(self.validation_issues[:4]))
if self.remote_url:
parts.append(f"remote_manifest={self.remote_url}")
if self.sdk_version:
parts.append(f"c2pa_sdk={self.sdk_version}")
return "; ".join(parts)
Audit points:
verified/trusteduse the three-valuedbool | None:None= undeterminable, not a failureai_generatedis a derived property: it is True wheneverAI_DIGITAL_SOURCE_MARKERSmatches any string at any depth in the manifest store, without requiring a signature / trustevidence_summary()is a UI summary, listing at most 4 AI markers and 4 validation issues
5.3.2 The verify_c2pa_file Entry Point and SDK Configuration
def _read_json(reader: Any) -> dict[str, Any]:
raw = reader.json()
if isinstance(raw, bytes):
raw = raw.decode("utf-8", errors="replace")
return json.loads(raw)
def _make_context(c2pa_module: Any) -> Any:
settings = {
"verify": {
"verify_after_reading": True,
"verify_trust": True,
"verify_timestamp_trust": True,
"ocsp_fetch": True,
"remote_manifest_fetch": True,
},
"trust": {
"verify_trust_list": True,
},
}
context_cls = getattr(c2pa_module, "Context", None)
if context_cls and hasattr(context_cls, "from_dict"):
return context_cls.from_dict(settings)
if hasattr(c2pa_module, "load_settings"):
c2pa_module.load_settings(settings)
return None
def _detect_asset_format(path: Path) -> tuple[str, bool]:
suffix = path.suffix.lower()
try:
header = path.read_bytes()[:16]
except OSError:
return "", False
asset_format = ""
if header.startswith(b"\x89PNG\r\n\x1a\n"):
asset_format = "png"
elif header.startswith(b"\xff\xd8\xff"):
asset_format = "jpeg"
elif header[:4] == b"RIFF" and header[8:12] == b"WEBP":
asset_format = "webp"
expected_suffixes = {
"png": {".png"},
"jpeg": {".jpg", ".jpeg", ".jpe"},
"webp": {".webp"},
}
mismatch = bool(asset_format and suffix and suffix not in expected_suffixes.get(asset_format, set()))
return asset_format, mismatch
def verify_c2pa_file(image_path: str | Path) -> C2PAVerificationResult:
path = Path(image_path)
try:
import c2pa
except Exception as exc:
return C2PAVerificationResult(
available=False,
error=f"c2pa-python unavailable: {exc}",
)
result = C2PAVerificationResult(available=True)
try:
result.asset_format, result.extension_mismatch = _detect_asset_format(path)
result.sdk_version = str(c2pa.sdk_version()) if hasattr(c2pa, "sdk_version") else ""
context = _make_context(c2pa)
if result.extension_mismatch and result.asset_format:
with path.open("rb") as stream:
try:
reader = c2pa.Reader.try_create(result.asset_format, stream, None, context)
except TypeError:
reader = c2pa.Reader.try_create(result.asset_format, stream)
else:
try:
reader = c2pa.Reader.try_create(str(path), None, None, context)
except TypeError:
reader = c2pa.Reader.try_create(str(path))
if reader is None:
return result
with reader:
result.has_manifest = True
result.manifest_store = _read_json(reader)
result.validation_state = str(reader.get_validation_state() or "")
result.validation_results = reader.get_validation_results() or None
result.active_manifest = reader.get_active_manifest() or None
result.embedded = bool(reader.is_embedded())
result.remote_url = reader.get_remote_url() or None
result.verified, result.trusted = _verification_flags(
result.validation_state,
result.validation_results,
)
result.validation_issues = _collect_validation_issues(result.validation_results)
result.ai_markers = _find_ai_markers(result.manifest_store or {})
result.claim_generator = _extract_claim_generator(result.active_manifest)
return result
except Exception as exc:
result.error = str(exc)
return resultAudit points:
- SDK fallback on extension mismatch: if the file header indicates JPEG but the suffix is
.png, it switches to theReader.try_create(format, stream, ...)streaming interface to tell the SDK the real format. Passing the path directly would make the SDK judge by the suffix and fail. - SDK configuration enabled: trust list, timestamp trust, OCSP revocation checking, and remote-manifest fetching are all turned on
- SDK version compatibility: both
try_createsignatures are attempted (with / without context), to be compatible with different versions ofc2pa-python - An empty reader is not an error:
has_manifeststaysFalse,available=True, and the error field is empty
5.3.3 Separation of Signature Chain and Trust State
def _contains_failure(value: Any) -> bool:
if isinstance(value, dict):
for key, item in value.items():
key_lower = str(key).lower()
if key_lower in {"failure", "failures", "error", "errors"} and item:
return True
if _contains_failure(item):
return True
elif isinstance(value, list):
for item in value:
if _contains_failure(item):
return True
elif isinstance(value, str):
lowered = value.lower()
return any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
return False
def _contains_trust_failure(value: Any) -> bool:
if isinstance(value, dict):
for key, item in value.items():
key_lower = str(key).lower()
if "trust" in key_lower and _contains_failure(item):
return True
if _contains_trust_failure(item):
return True
elif isinstance(value, list):
for item in value:
if _contains_trust_failure(item):
return True
elif isinstance(value, str):
lowered = value.lower()
return "trust" in lowered and any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
return False
def _contains_non_trust_failure(value: Any) -> bool:
if isinstance(value, dict):
for key, item in value.items():
key_lower = str(key).lower()
if key_lower in {"failure", "failures", "error", "errors"} and item:
if not _contains_only_trust_related(item):
return True
if _contains_non_trust_failure(item):
return True
elif isinstance(value, list):
for item in value:
if _contains_non_trust_failure(item):
return True
elif isinstance(value, str):
lowered = value.lower()
if any(token in lowered for token in ("invalid", "failure", "error")):
return "trust" not in lowered and "untrusted" not in lowered
return False
def _contains_only_trust_related(value: Any) -> bool:
strings = list(_iter_strings(value))
if not strings:
return False
for text in strings:
lowered = text.lower()
if any(token in lowered for token in ("invalid", "failure", "error")):
if "trust" not in lowered and "untrusted" not in lowered:
return False
return any("trust" in text.lower() or "untrusted" in text.lower() for text in strings)
def _contains_trust_signal(value: Any) -> bool:
if isinstance(value, dict):
for key, item in value.items():
if "trust" in str(key).lower():
return True
if _contains_trust_signal(item):
return True
elif isinstance(value, list):
for item in value:
if _contains_trust_signal(item):
return True
elif isinstance(value, str):
return "trust" in value.lower()
return False
def _active_manifest_results(validation_results: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(validation_results, dict):
return None
active = validation_results.get("activeManifest")
return active if isinstance(active, dict) else None
def _has_validation_code(value: Any, code_fragment: str) -> bool:
if isinstance(value, dict):
code = value.get("code")
if isinstance(code, str) and code_fragment in code:
return True
return any(_has_validation_code(item, code_fragment) for item in value.values())
if isinstance(value, list):
return any(_has_validation_code(item, code_fragment) for item in value)
return False
def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
issues: list[str] = []
if not validation_results:
return issues
if _has_validation_code(validation_results, "ingredient.malformed"):
issues.append("ingredient_malformed")
if _has_validation_code(validation_results, "timeStamp.untrusted"):
issues.append("timestamp_untrusted")
return issues
def _verification_flags(validation_state: str, validation_results: dict[str, Any] | None) -> tuple[bool | None, bool | None]:
state = (validation_state or "").lower()
active_results = _active_manifest_results(validation_results) or validation_results
has_failure = _contains_failure(active_results) if active_results else False
has_non_trust_failure = _contains_non_trust_failure(active_results) if active_results else False
has_trust_failure = _contains_trust_failure(active_results) if active_results else False
has_trust_signal = _contains_trust_signal(active_results) if active_results else False
active_signature_valid = _has_validation_code(active_results, "claimSignature.validated")
active_data_hash_valid = _has_validation_code(active_results, "assertion.dataHash.match")
verified: bool | None
if active_signature_valid and active_data_hash_valid and not has_non_trust_failure:
verified = True
elif "valid" in state and "invalid" not in state and not has_non_trust_failure:
verified = True
elif "invalid" in state or (has_failure and has_non_trust_failure):
verified = False
else:
verified = None
trusted: bool | None
if has_trust_failure:
trusted = False
elif verified is True and has_trust_signal:
trusted = True
else:
trusted = None
return verified, trusted
Audit points:
- Active manifest first:
_active_manifest_results()first takesvalidation_results["activeManifest"], falling back to the full tree only if that is unavailable - "Signature valid + data hash valid" and "no non-trust-class failures" →
verified=True. Even if thevalidation_statefield itself contains "invalid" (possibly from an issue in the ingredient chain), this does not lower the signature conclusion of the active manifest - Trust evaluated independently: a trust failure does not turn
verifiedinto False. The UI can obtain a combination likesignature_chain=verified; trust=untrusted, indicating that the signature itself is verifiable but the signing certificate is not in the current trust list (common for Google and OpenAI, which are not yet in the default trust anchors)
5.3.4 AI Marker and claim_generator Extraction
def _iter_strings(value: Any) -> Iterable[str]:
if isinstance(value, str):
yield value
elif isinstance(value, dict):
for key, item in value.items():
yield str(key)
yield from _iter_strings(item)
elif isinstance(value, list):
for item in value:
yield from _iter_strings(item)
def _find_ai_markers(manifest_store: dict[str, Any]) -> list[str]:
found: list[str] = []
seen = set()
for text in _iter_strings(manifest_store):
lowered = text.lower()
for marker in AI_DIGITAL_SOURCE_MARKERS:
if marker in lowered and marker not in seen:
seen.add(marker)
found.append(marker)
return found
def _extract_claim_generator(active_manifest: dict[str, Any] | None) -> str:
if not active_manifest:
return ""
claim_generator = active_manifest.get("claim_generator")
if isinstance(claim_generator, str):
return claim_generator
if isinstance(claim_generator, dict):
name = claim_generator.get("name") or claim_generator.get("identifier")
version = claim_generator.get("version")
if name and version:
return f"{name} {version}"
if name:
return str(name)
infos = active_manifest.get("claim_generator_info")
if isinstance(infos, list) and infos:
first = infos[0]
if isinstance(first, dict):
name = first.get("name") or first.get("identifier")
version = first.get("version")
if name and version:
return f"{name} {version}"
if name:
return str(name)
return ""
Audit points:
- Recursively traverses all strings in the manifest store (including keys), matching case-insensitively
- C2PA / IPTC semantics such as
trainedalgorithmicmedia/algorithmicmedia/synthidare treated as strong evidence - Does not decode the invisible SynthID watermark; the presence of
synthidin the manifest store represents a C2PA declaration that "the image contains SynthID", not that local SynthID decoding was performed claim_generatoris compatible with three forms: a string, a dict ({name, version}/{identifier, version}), and aclaim_generator_infoarray
5.3.5 Validation Issues
def _has_validation_code(value: Any, code_fragment: str) -> bool:
if isinstance(value, dict):
code = value.get("code")
if isinstance(code, str) and code_fragment in code:
return True
return any(_has_validation_code(item, code_fragment) for item in value.values())
if isinstance(value, list):
return any(_has_validation_code(item, code_fragment) for item in value)
return False
def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
issues: list[str] = []
if not validation_results:
return issues
if _has_validation_code(validation_results, "ingredient.malformed"):
issues.append("ingredient_malformed")
if _has_validation_code(validation_results, "timeStamp.untrusted"):
issues.append("timestamp_untrusted")
return issues
A typical evidence-summary output:
C2PA manifest found;
validation_state=Invalid;
signature_chain=verified;
trust=untrusted;
ai_markers=algorithmicmedia,trainedalgorithmicmedia,synthid;
validation_issues=ingredient_malformed,timestamp_untrustedAudit points:
validation_state=Invaliddoes not necessarily mean the current image data was tampered with. The active manifest's signature and data hash can be valid while the ingredient chain has an issue- The UI should present this as "signature chain valid / certificate not in the current trust list / link issue present", not as "signature failed"
5.4 Worker Layer
"""AI Metadata Detection Worker"""
import logging
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication
_tr = QCoreApplication.translate
logger = logging.getLogger(__name__)
class AIDetectorWorker(QThread):
"""Worker thread for AI metadata detection."""
progress = Signal(int, int, str) # (current, total, filename)
item_finished = Signal(str, str, str, str, str) # (path, status, reason, tool, evidence)
all_finished = Signal()
model_status = Signal(str) # Status for DynamicIsland
def __init__(self, file_paths: list):
super().__init__()
self.file_paths = file_paths
def run(self):
"""Execute metadata detection in background thread."""
try:
from .._utils import ensure_src_path
ensure_src_path()
from tools.validator.logic import MetaDataDetector
detector = MetaDataDetector()
self.model_status.emit(_tr("AIDetectorWorker", "扫描元数据..."))
total = len(self.file_paths)
logger.info("开始检测 %d 个文件", total)
for i, path in enumerate(self.file_paths):
if self.isInterruptionRequested():
break
filename = Path(path).name
self.progress.emit(i + 1, total, filename)
try:
res = detector.detect(path)
self.item_finished.emit(
path,
res["status"],
res["reason"],
res["tool"] or "",
res["evidence"] or ""
)
except Exception as e:
logger.error("[MetaDetector] 检测文件出错 %s: %s", path, e)
self.item_finished.emit(path, "error", _tr("AIDetectorWorker", "检测出错: %s") % str(e), "", "")
logger.info("[MetaDetector] 检测完成,共 %d 个文件", total)
except Exception as e:
logger.error("[MetaDetector] Worker error: %s", e, exc_info=True)
finally:
self.all_finished.emit()
Audit points:
- Per-file error isolation: an exception on a single file is caught and converted into a
status="error"signal, without affecting subsequent files - Interruptible:
isInterruptionRequested()allows the UI to cancel a batch task - Detector instance reuse: the whole batch shares one
MetaDataDetector, but the class itself is stateless (__init__is empty), so there is no risk of cross-file contamination - Signal payload:
item_finishedis emitted one image at a time, to avoid backlog
5.5 Evidence Grading Table
| Evidence | Level | Triggers ai? |
|---|---|---|
C2PA ai_markers (trainedAlgorithmicMedia / synthid / ...) | Strong | Yes |
ComfyUI workflow / prompt JSON | Strong | Yes |
Stable Diffusion parameters chunk | Strong | Yes |
NovelAI generation JSON (uc + sampler/steps/scale) | Strong | Yes |
| InvokeAI / Fooocus metadata | Strong | Yes |
Midjourney Job ID / --ar / --v / --stylize | Strong | Yes |
IPTC/XMP trainedAlgorithmicMedia | Strong | Yes |
| Google visible sparkle watermark (under Google context) | Medium | Yes |
| File name contains a platform word | Weak | No (at most unknown) |
| Extension mismatch | Contextual | No |
| Minimal JPEG metadata | Contextual | No |
| CELSYS / Clip Studio export marker | Contextual | No |
| No metadata at all | No evidence | No |
5.6 Test Coverage
tests/test_validator_c2pa.py currently covers:
- C2PA raw payload AI marker
- C2PA manifest with no AI marker does not false-positive
- Official C2PA verifier mock-driven detection
- Suppression of raw false positives when a Google C2PA image has no AI marker
- Google / Gemini visible-watermark contextual detection
- Midjourney Job ID and IPTC AI source
- A file name alone for Gemini outputs only
unknown - Platform recompression / a
.png-suffixed JPEG does not upgrade to AI - A1111 Stable Diffusion parameters
- NovelAI generation JSON
- InvokeAI metadata
- Fooocus metadata
- The separated explanation of "active C2PA signature valid but the ingredient chain has an issue"
It is recommended to continuously add a real-sample regression set: downloaded images from platforms such as OpenAI, Gemini, Midjourney, ComfyUI, A1111, Forge, Fooocus, NovelAI, InvokeAI, Adobe Firefly, Tusi / Liblib / TensorArt, etc.
6. Threat Model Overview
6.1 Digital Timestamping
| Threat | Mitigation | Residual Risk |
|---|---|---|
| User tampers with the original file | Merkle Tree root-hash verification | None (tampering is always detected) |
| TSA private key leaked | Multi-provider failover | A single TSA leak does not affect historical verification |
| Local JSON modified | Optional AES-256 encryption | Modifiable when unencrypted, but file-hash verification still exposes it |
| Merkle second-preimage | No domain-separation prefix | Does not resist adversarial collision construction |
| Author identity forgery | User self-declaration | author_name has no third-party verification |
6.2 Infringement Evidence Capture
| Threat | Mitigation | Residual Risk |
|---|---|---|
| Target page deleted | Immediate capture + RFC 3161 | If deleted before capture, it cannot be recovered |
| Locally forged web page | TLS certificate capture | Only verifies the domain certificate, not content authenticity |
| Screenshot edited in Photoshop | manifest SHA-256 | A screenshot itself cannot prove "no Photoshopping" |
| Browser identified as a bot | stealth + visible fallback | Some platforms may still block |
| Missing HAR / certificate | Multi-source collection | A single-point failure does not invalidate the whole package |
| Very long page | Viewport screenshot | full_page=False; content below the fold is not captured |
6.3 Invisible Watermark
| Threat | Mitigation | Residual Risk |
|---|---|---|
| Hardcoded password | Compile-time constant | All user instances share the same password pair; once reverse-engineered it can be extracted in bulk |
| Unauthenticated watermark | Dual-password system | Cannot prove "I embedded this watermark", only that "the image contains this text" |
| Forged watermark | Password secrecy | Once the password is known, arbitrary text can be embedded and claimed to come from Nephele |
| Watermark removal | Quantized embedding (d1=36) | Heavy compression, rotation, and large-area cropping (> 50%) can destroy it |
| Silent truncation | Fixed-length 32-byte encoding | Overly long text is silently truncated; the user may mistakenly believe it was fully embedded |
| Output despite verification failure | round-trip check | On mismatch only a warning is logged; the image is still output |
注意
Within Nephele, the invisible watermark is positioned as an auxiliary provenance tool, not a cryptographic digital signature. Its core value lies in "increasing the cost for an image thief to remove the watermark", not in "providing unforgeable proof of ownership". If you need legal-grade proof of ownership, please use the digital timestamping feature.
6.4 AI Metadata Detection
| Threat / Scenario | Result |
|---|---|
| Original ComfyUI PNG | workflow / prompt can be detected |
| Original A1111 PNG | parameters can be detected |
| OpenAI / Google C2PA image | The manifest, AI markers, signature chain, and trust state can be read |
| Midjourney retaining Job ID / XMP | Strong evidence can be detected |
| Platform-recompressed image (downloaded from Weibo / Twitter / Xiaohongshu) | Can only flag insufficient credentials, status=human (distinguished by reason) |
| Screenshot | Original metadata is usually lost, undeterminable |
| Maliciously cleaned metadata | Deleted evidence cannot be recovered |
| Maliciously forged non-C2PA text metadata | No cryptographic authenticity guarantee; may false-positive |
| Only the visual style looks like AI | Not judged |
| Non-AI image where the user copied SD parameters into a comment | May false-positive as ai |
| Non-AI image with a low-saturation decoration in the lower-right corner (under Google context) | May trigger a sparkle false positive |
注意
The audit conclusion for this feature is: it is suitable as an AI-generation-credential and metadata screening tool, and should not be promoted as a general-purpose AI image authenticity detector. "Not detected" is not equivalent to "not AI-generated".
7. Dependency List and Degradation Behavior
| Library | Purpose | Behavior When Missing |
|---|---|---|
rfc3161ng | TSA communication | TSA fully unavailable; forced downgrade to a local .json |
asn1crypto | TSR parsing | Falls back to the local clock and provider_name |
pyzipper | .nep AES-256 | The password has no effect; standard ZIP |
Pillow | Thumbnail / image I/O / metadata reading | Timestamping flow blocked / packaging blocked / AI detection blocked |
reportlab | PDF report | Timestamping flow blocked |
qrcode | PDF QR code | Falls back to a plain-text URL |
playwright | Browser evidence capture | Feature entirely unavailable |
blind_watermark | Invisible-watermark DWT embedding / extraction | Invisible-watermark feature entirely unavailable; returns the original image |
numpy | Invisible-watermark bit-array conversion / image processing | Invisible-watermark feature entirely unavailable |
pywt | Wavelet transform (transitive dependency of blind_watermark) | Invisible-watermark feature entirely unavailable |
c2pa-python | Official C2PA SDK | Falls back to a byte-level scan (no signature verification); evidence notes "official C2PA verifier unavailable" |
8. Privacy and Network Behavior
8.1 Default Network Paths
| Feature | Network Action | Can It Be Disabled? |
|---|---|---|
| Digital timestamping (TSA) | Sends a SHA-256 digest request to DigiCert / FreeTSA / IdenTrust | Can switch back to a purely local timestamp (degraded) |
| Infringement evidence capture | Via Playwright, makes HTTPS requests, a TLS handshake, and DNS resolution to the target site | No (the feature is networked evidence capture by nature) |
| AI metadata detection (C2PA SDK) | ocsp_fetch=True + remote_manifest_fetch=True | Currently hardcoded on via SDK settings, with no UI toggle |
| Invisible watermark | None | — |
8.2 Offline Notes
-
Digital timestamping: only a hash value is sent to the TSA — the original file content is not sent.
-
Infringement evidence capture: by design it makes a full request to the target URL; that is the evidence capture itself.
-
AI metadata detection: regular PNG info / EXIF / byte scanning is done locally. However, while verifying the signature chain, the C2PA SDK may:
- Fetch a remote manifest (
remote_manifest_fetch) - Check certificate revocation (
ocsp_fetch) - Verify timestamp trust (
verify_timestamp_trust)
Therefore product copy should not blanketly claim that "C2PA verification never goes online". If a user needs a strict offline mode, a toggle to disable remote manifest / OCSP should be provided (not yet implemented).
- Fetch a remote manifest (
-
Invisible watermark: entirely local; neither embedding nor extraction touches the network.