首页

技术审计文档

本文档面向安全研究员、密码学审计人员和开发者,精确描述 Nephele Workshop 安全相关模块的实现细节、已知限制与威胁模型。

所有核心源码直接附于文中,白盒供审。敏感常数(如水印密码)已脱敏处理。

本文档对应 Nephele Workshop v0.3.2-alpha 代码库状态。

提示

本文档中所有源码片段以 MIT License 公开。自由复制、修改、商用;保留版权声明即可。


1. 审计范围

1.1 受审文件清单

功能文件审计范围
数字存证tools/rights/logic.py文件哈希、批量存证、深度验证
tools/rights/utils.pyMerkle Tree
tools/rights/tsa_client.pyRFC 3161 TSA 客户端
tools/rights/rights_packer.py.nep 容器打包
维权取证tools/rights/url_evidence.pyURL 取证主流程、TLS、CAPTCHA
core/browser/session.pyPlaywright 会话 / 截图
隐水印tools/packer/watermark_protection.py定长编码、round-trip 验证、异常回退
tools/packer/logic.py / agent_api.py业务层调用
core/workers/watermark_worker.py后台提取线程
blind_watermark (PyPI)DWT+DCT+SVD 底层算法
AI 元数据检测tools/validator/logic.py元数据读取、规则匹配、证据分级
tools/validator/c2pa_verifier.py官方 C2PA SDK 适配、信任状态解析
core/workers/ai_detector_worker.py批量检测线程

1.2 产品边界

本审计涉及:

  • 付费 / 许可证模块(core/license_manager.pycore/payment.py
  • 认证 / JWT / CAPTCHA 对接(core/auth/
  • AI 对话 Agent 与云端推理(core/agent_loop.pynephele-api/
  • 客户端更新与 SSL 固定(core/updater.pycore/ssl_pinning.py

这些模块各有独立的安全边界和威胁模型,不在本文档范围内。


2. 数字存证核心实现

2.1 文件哈希计算

tools/rights/logic.py:calculate_file_hash()

python
def calculate_file_hash(file_path: Path, algorithm: str = 'sha256') -> str:
    if not file_path or not isinstance(file_path, Path):
        raise RightsError(f"无效的文件路径: {file_path}")
    if not file_path.exists():
        raise RightsError(f"文件不存在: {file_path}")
    if not file_path.is_file():
        raise RightsError(f"路径不是文件: {file_path}")
 
    file_size = file_path.stat().st_size
    if file_size > 10 * 1024 * 1024 * 1024:  # 10GB 限制
        raise RightsError(f"文件过大(超过10GB): {file_path}")
 
    hash_obj = hashlib.new(algorithm)
    with open(file_path, 'rb') as f:
        chunk_size = 8192
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            hash_obj.update(chunk)
    return hash_obj.hexdigest()

审计点

  • 算法:SHA-256,无盐值,无密钥(非 HMAC)
  • 分块:8,192 字节,流式处理
  • 上限:10 GB,超限拒绝

2.2 Merkle Tree 完整实现

tools/rights/utils.py:MerkleTree

python
class MerkleTree:
    """
    Merkle Tree 实现,用于将多个文件的哈希值聚合成单个根哈希
 
    Known limitation (second-preimage resistance):
        This implementation does NOT use domain separation prefixes for leaf vs
        internal nodes (i.e. b'\\x00' for leaves, b'\\x01' for internal nodes as
        recommended by RFC 6962 §2.1).  Adding prefixes would change the root hash
        computation and break backward compatibility with all existing .nep files
        and the verification website (verify.arisfusion.com).  A future tree_version
        bump can introduce domain separation; the current version is safe for our
        threat model (user-submitted files, not adversarial tree construction).
    """
 
    def __init__(self, hash_algorithm: str = 'sha256'):
        self.hash_algorithm = hash_algorithm
        self.leaves: List[str] = []
        self.tree: List[List[str]] = []
        self.root_hash: Optional[str] = None
 
    def add_file_hash(self, file_hash: str) -> None:
        self.leaves.append(file_hash)
 
    def build(self) -> str:
        if not self.leaves:
            raise ValueError("Merkle Tree 没有叶子节点")
 
        if len(self.leaves) == 1:
            self.root_hash = self.leaves[0]
            return self.root_hash
 
        current_level = self.leaves.copy()
        self.tree = [current_level]
 
        while len(current_level) > 1:
            next_level = []
            for i in range(0, len(current_level), 2):
                if i + 1 < len(current_level):
                    combined = current_level[i] + current_level[i + 1]
                else:
                    combined = current_level[i] + current_level[i]
 
                hash_obj = hashlib.new(self.hash_algorithm)
                hash_obj.update(combined.encode('utf-8'))
                parent_hash = hash_obj.hexdigest()
                next_level.append(parent_hash)
 
            self.tree.append(next_level)
            current_level = next_level
 
        self.root_hash = current_level[0]
        return self.root_hash
 
    def get_proof(self, leaf_index: int) -> List[Dict]:
        if not self.tree:
            self.build()
 
        if leaf_index >= len(self.leaves):
            raise IndexError(f"叶子节点索引超出范围: {leaf_index}")
 
        proof = []
        current_index = leaf_index
        current_level = 0
 
        while current_level < len(self.tree) - 1:
            level = self.tree[current_level]
 
            if current_index % 2 == 0:
                sibling_index = current_index + 1
                if sibling_index < len(level):
                    proof.append({'hash': level[sibling_index], 'position': 'right'})
                else:
                    proof.append({'hash': level[current_index], 'position': 'right'})
            else:
                sibling_index = current_index - 1
                proof.append({'hash': level[sibling_index], 'position': 'left'})
 
            current_index = current_index // 2
            current_level += 1
 
        return proof
 
    def verify_proof(self, leaf_hash: str, proof: List[Dict], root_hash: str) -> bool:
        current_hash = leaf_hash
 
        for step in proof:
            sibling_hash = step['hash']
            position = step['position']
 
            if position == 'right':
                combined = current_hash + sibling_hash
            else:
                combined = sibling_hash + current_hash
 
            hash_obj = hashlib.new(self.hash_algorithm)
            hash_obj.update(combined.encode('utf-8'))
            current_hash = hash_obj.hexdigest()
 
        return current_hash == root_hash

已知安全限制(主动披露)

当前实现没有使用 RFC 6962 §2.1 推荐的域分隔前缀(叶子节点未加 \x00,内部节点未加 \x01)。这意味着在极端对抗性场景下,存在 second-preimage 构造的理论可能。

实际风险评估

  • 威胁模型为"用户为自己的作品生成存证"时,风险可忽略
  • 威胁模型若要求"抵抗恶意构造碰撞",当前实现不满足该安全等级

2.3 TSA 客户端完整实现

tools/rights/tsa_client.py:TSAClient

python
class TSAClient:
    PROVIDERS = {
        'freetsa': {
            'name': 'FreeTSA',
            'url': 'https://freetsa.org/tsr',
            'hashname': 'sha256',
            'requires_auth': False,
            'legal_strength': 3,
            'price': 0
        },
        'digicert': {
            'name': 'DigiCert',
            'url': 'http://timestamp.digicert.com',
            'hashname': 'sha256',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        },
        'identrust': {
            'name': 'IdenTrust',
            'url': 'http://timestamp.identrust.com',
            'hashname': 'sha256',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        }
    }
 
    FAILOVER_ORDER: List[str] = ['digicert', 'freetsa', 'identrust']
 
    def __init__(self, provider: str = 'freetsa', custom_url: Optional[str] = None,
                 hashname: str = 'sha256', timeout: int = 30):
        if not RFC3161_AVAILABLE:
            raise ImportError("rfc3161ng 库未安装。请运行: pip install rfc3161ng")
 
        if custom_url:
            self.url = custom_url
            self.provider_name = "Custom TSA"
            self.provider_key = None
        elif provider in self.PROVIDERS:
            config = self.PROVIDERS[provider]
            self.url = config['url']
            self.provider_name = config['name']
            self.provider_key = provider
            hashname = config['hashname']
        else:
            raise ValueError(f"未知的 TSA 提供商: {provider}")
 
        self.hashname = hashname
        self.timeout = timeout
 
        try:
            self.stamper = rfc3161ng.RemoteTimestamper(
                url=self.url, hashname=self.hashname, timeout=self.timeout
            )
        except Exception as e:
            self.stamper = None
            self._init_error = str(e)
 
    def _call_with_retry(self, hash_bytes: bytes, max_retries: int = 3) -> bytes:
        providers_to_try = []
        if self.provider_key:
            providers_to_try.append(self.provider_key)
            for p in self.FAILOVER_ORDER:
                if p != self.provider_key:
                    providers_to_try.append(p)
        else:
            providers_to_try = [None]
 
        last_error = None
        for provider_key in providers_to_try:
            if provider_key is not None:
                config = self.PROVIDERS[provider_key]
                url = config['url']
                hashname = config['hashname']
                provider_name = config['name']
            else:
                url = self.url
                hashname = self.hashname
                provider_name = self.provider_name
 
            for attempt in range(max_retries):
                try:
                    stamper = rfc3161ng.RemoteTimestamper(
                        url=url, hashname=hashname, timeout=self.timeout
                    )
                    tsr_token = stamper(digest=hash_bytes)
                    if provider_key is not None:
                        self.provider_name = provider_name
                        self.provider_key = provider_key
                        self.url = url
                        self.stamper = stamper
                    return tsr_token
                except Exception as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        time.sleep(2 ** attempt)
 
        raise Exception(
            f"所有 TSA 提供商均失败: {last_error}"
        )
 
    def timestamp_hash(self, hash_value: str, output_path: Path) -> Dict:
        hash_bytes = bytes.fromhex(hash_value)
        tsr_token = self._call_with_retry(hash_bytes)
 
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        with open(output_path, 'wb') as f:
            f.write(tsr_token)
 
        tsa_timestamp = datetime.now().isoformat()
        tsa_issuer = self.provider_name
        try:
            from asn1crypto import tsp, cms
            signed_data = None
            try:
                ts_resp = tsp.TimeStampResp.load(tsr_token)
                signed_data = ts_resp['time_stamp_token']['content']
            except (ValueError, KeyError, TypeError):
                try:
                    ci = cms.ContentInfo.load(tsr_token)
                    if ci['content_type'].native == 'signed_data':
                        signed_data = ci['content']
                except (ValueError, KeyError, TypeError):
                    pass
 
            if signed_data:
                tst_info = signed_data['encap_content_info']['content'].parsed
                gen_time = tst_info['gen_time'].native
                if gen_time:
                    tsa_timestamp = gen_time.isoformat()
 
                try:
                    signer_infos = signed_data['signer_infos']
                    if signer_infos:
                        sid = signer_infos[0]['sid']
                        if sid.name == 'issuer_and_serial_number':
                            for rdn in sid.chosen['issuer'].chosen:
                                for attr in rdn:
                                    if attr['type'].dotted == '2.5.4.3':
                                        tsa_issuer = attr['value'].native
                                        break
                except (KeyError, IndexError, ValueError):
                    pass
        except ImportError:
            pass
 
        return {
            'success': True,
            'timestamp': tsa_timestamp,
            'hash': hash_value,
            'issuer': tsa_issuer,
            'tsr_path': str(output_path),
            'algorithm': self.hashname.upper(),
        }
 
    def verify_tsr(self, tsr_path: Path, data: Optional[bytes] = None,
                   digest: Optional[bytes] = None) -> Dict:
        with open(tsr_path, 'rb') as f:
            tsr_token = f.read()
 
        if digest is not None:
            verified = rfc3161ng.check_timestamp(tsr_token, digest=digest)
            if verified:
                return {
                    'valid': True,
                    'message': '时间戳验证通过(数据完整性已确认)',
                    'issuer': self._extract_issuer_from_tsr(tsr_token),
                }
            else:
                return {'valid': False, 'message': '时间戳验证失败:哈希值不匹配'}
        elif data is not None:
            verified = rfc3161ng.check_timestamp(tsr_token, data=data)
            if verified:
                return {
                    'valid': True,
                    'message': '时间戳验证通过(数据完整性已确认)',
                    'issuer': self._extract_issuer_from_tsr(tsr_token),
                }
            else:
                return {'valid': False, 'message': '时间戳验证失败:哈希值不匹配'}
        else:
            return self._verify_tsr_structure(tsr_token)

审计要点

  • 默认构造:provider='freetsa'
  • UI 调用 batch_protect_works(tsa_provider='digicert'),因此用户实际首选 DigiCert
  • 故障转移顺序:['digicert', 'freetsa', 'identrust']
  • 指数退避:sleep(2 ** attempt),即 1s, 2s, 4s
  • 单提供商最多 3 次重试
  • timestamp_hash 中的 tsa_timestamp 初始回退值为 datetime.now().isoformat()(本地时钟),仅在 asn1crypto 成功解析后才替换为 TSA 断言时间

2.4 批量存证主流程

tools/rights/logic.py:batch_protect_works() 核心片段

python
def batch_protect_works(
    file_paths: List[Path],
    author_name: str,
    inspiration: Optional[str] = None,
    output_dir: Optional[Path] = None,
    password: Optional[str] = None,
    progress_callback=None,
    tsa_provider: str = 'digicert',
    tsa_timeout: int = 30,
    cert_mode: str = 'simple',
) -> Dict:
    from .utils import build_merkle_tree_from_files
    from .rights_packer import RightsPacker
    from .pdf_generator import PDFGenerator
 
    if output_dir is None:
        output_dir = Path.cwd() / "digital_evidence"
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
 
    # Step 1: Merkle Tree
    tree = build_merkle_tree_from_files(file_paths, progress_callback=hash_progress)
    root_hash = tree.root_hash
    file_hashes = dict(tree.file_hashes)
 
    # Step 2: 感知哈希(非阻塞)
    fingerprints = []
    for file_path in file_paths:
        if file_path.suffix.lower() in {'.jpg', '.jpeg', '.png', '.webp', '.bmp'}:
            try:
                from .fingerprint import compute_fingerprint
                fp = compute_fingerprint(file_path, file_sha256=file_hashes.get(str(file_path)))
                fingerprints.append(fp)
            except Exception as e:
                logger.debug("Skipping perceptual hash for %s: %s", file_path.name, e)
 
    # Step 3: Manifest
    packer = RightsPacker(output_dir / "evidence.nep", password=password)
    manifest_data = packer.create_manifest(
        author_name=author_name, inspiration=inspiration,
        works=works, file_hashes=file_hashes
    )
 
    # Step 4: TSA 时间戳
    tsa_binary_path = output_dir / "proof.tsa"
    local_json_path = output_dir / "proof.json"
    timestamp_file = tsa_binary_path
 
    try:
        from .tsa_client import TSAClient
        tsa_client = TSAClient(provider=tsa_provider, timeout=tsa_timeout)
        tsa_result = tsa_client.timestamp_hash(root_hash, tsa_binary_path)
 
        if tsa_result['success']:
            timestamp_file = tsa_binary_path
            timestamp_info = {
                'timestamp': tsa_result['timestamp'],
                'hash': root_hash,
                'issuer': tsa_result['issuer'],
                'algorithm': tsa_result['algorithm'],
                'valid': True,
            }
        else:
            timestamp_file = local_json_path
            timestamp_info = {
                'timestamp': datetime.now().isoformat(),
                'hash': root_hash,
                'issuer': 'Nephele Workshop (本地)',
                'local_only': True,
                'error': tsa_result['message']
            }
            with open(local_json_path, 'w', encoding='utf-8') as f:
                json.dump(timestamp_info, f, indent=2, ensure_ascii=False)
    except ImportError:
        timestamp_file = local_json_path
        timestamp_info = {
            'timestamp': datetime.now().isoformat(),
            'hash': root_hash,
            'issuer': 'Nephele Workshop (本地)',
            'local_only': True,
            'note': '需安装 rfc3161ng 库以使用 FreeTSA 服务'
        }
        with open(local_json_path, 'w', encoding='utf-8') as f:
            json.dump(timestamp_info, f, indent=2, ensure_ascii=False)
 
    # Step 5: Manifest 加固
    manifest_data['merkle_root'] = root_hash
    manifest_data['cert_mode'] = cert_mode
 
    works_map = {}
    for idx, fp in enumerate(file_paths):
        works_map[f"works/{idx:03d}_{fp.name}"] = file_hashes.get(str(fp), "")
    manifest_data['works_map'] = works_map
 
    _manifest_for_hash = {k: v for k, v in manifest_data.items() if k != 'manifest_sha256'}
    _manifest_json = json.dumps(_manifest_for_hash, ensure_ascii=False, sort_keys=True, default=str)
    manifest_data['manifest_sha256'] = hashlib.sha256(_manifest_json.encode('utf-8')).hexdigest()
 
    # Step 6: PDF
    pdf_gen = PDFGenerator(pdf_path, cert_mode=cert_mode)
    pdf_gen.generate(
        manifest_data=manifest_data, root_hash=root_hash,
        timestamp_info=timestamp_info, verification_url=verification_url,
        locale="zh_CN", image_paths=image_paths,
    )
 
    # Step 7: Pack .nep
    nep_path = packer.pack(
        manifest_data=manifest_data, thumbnail_path=thumbnail_path,
        timestamp_file=timestamp_file, pdf_report=pdf_path,
        source_files=file_paths,
    )
 
    return {
        'success': True,
        'nep_path': str(nep_path),
        'root_hash': root_hash,
        'file_count': len(file_paths),
        'message': f"数字存证完成,共处理 {len(file_paths)} 个文件"
    }

关键行为

  • cert_mode 默认 "simple",不会自动检测源文件升级
  • UI 层(PipelineWindow.qml)根据 certifySourceFiles.length > 0 显式传入 "full"
  • rfc3161ng 未安装 → 强制降级本地时间戳,不阻断流程
  • manifest_sha256 计算时排除自身键,防止循环依赖

2.5 .nep 打包实现

tools/rights/rights_packer.py:RightsPacker._write_zip_contents()

python
@staticmethod
def _write_zip_contents(zipf, manifest_data, thumbnail_path, timestamp_file,
                        pdf_report, additional_files, source_files):
    # 1. manifest.json
    manifest_json = json.dumps(manifest_data, indent=2, ensure_ascii=False)
    zipf.writestr('manifest.json', manifest_json.encode('utf-8'))
 
    # 2. 缩略图
    if thumbnail_path and thumbnail_path.exists():
        zipf.write(thumbnail_path, 'thumbnail.jpg')
 
    # 3. 时间戳(保持原始扩展名区分等级)
    if timestamp_file and timestamp_file.exists():
        archive_name = 'proof.tsa' if timestamp_file.suffix == '.tsa' else 'proof.json'
        zipf.write(timestamp_file, archive_name)
 
    # 4. PDF 报告
    if pdf_report and pdf_report.exists():
        zipf.write(pdf_report, 'VerificationReport.pdf')
 
    # 5. 原始作品 → works/ 目录,索引命名防撞名
    if source_files:
        for idx, file_path in enumerate(source_files):
            if isinstance(file_path, str):
                file_path = Path(file_path)
            if file_path.exists() and file_path.is_file():
                zipf.write(file_path, f'works/{idx:03d}_{file_path.name}')
 
    # 6. 额外文件
    if additional_files:
        for file_path in additional_files:
            if file_path.exists():
                zipf.write(file_path, f'additional/{file_path.name}')

密码保护逻辑(pack() 方法):

  • pyzipper 可用且设置了密码:使用 AESZipFile + WZ_AES(AES-256)
  • pyzipper 不可用但设置了密码:回退标准 ZIP,manifest 中写入 _warning
  • 无密码:标准 zipfile.ZipFile

2.6 独立验证器(verify.arisfusion.com)

公开部署于 verify.arisfusion.com.nep 独立验证器。

部署形态

  • 单文件 HTML(2087 行),无构建工具、无打包流程、无外部依赖
  • 纯客户端计算(SubtleCrypto + 纯 JS 实现的 ASN.1 解析)
  • 浏览器"查看页面源代码"即可审计完整逻辑

验证链路

  1. 用户上传 .nep → 浏览器本地解包
  2. works/ 重算 SHA-256 → 按文件名字典序构建 Merkle Tree
  3. 解析 proof.tsa(RFC 3161 TSR ASN.1 结构)→ 提取 messageImprint.hashedMessage
  4. 比对本地 Merkle Root vs TSR 内嵌 digest
  5. 解析 TSR 中的 genTimetsa 字段,展示签发机构与时间

信任边界

  • 验证器本身不签发任何时间戳,只读取 .nep 内已存在的 proof.tsa
  • TSA 签名的密码学验证(证书链 + 公钥链)在当前版本为结构性比对 + TSA 公钥指纹匹配,完整的 CA 链验证推荐使用 openssl ts -verifyrfc3161ng 交叉确认
  • 源码 MIT 许可,任何人可以自行搭建镜像或离线使用(保存 HTML 文件即可)

2.7 深度验证实现

tools/rights/logic.py:verify_evidence_package()

python
def verify_evidence_package(tsa_path: Path, file_paths: List[Path]) -> Tuple[bool, Dict]:
    from .utils import build_merkle_tree_from_files
 
    if not tsa_path.exists():
        return False, {'valid': False, 'message': f"时间戳文件不存在: {tsa_path}"}
 
    missing = [str(p) for p in file_paths if not p.exists()]
    if missing:
        return False, {'valid': False, 'message': f"原始文件缺失: {', '.join(missing)}"}
 
    # Step 1: 重算 Merkle Tree
    tree = build_merkle_tree_from_files(file_paths)
    computed_root = tree.root_hash
 
    # Step 2: 本地时间戳检查
    suffix = tsa_path.suffix.lower()
    if suffix == '.json':
        tsr_result = parse_tsr(tsa_path)
        tsr_hash = tsr_result.get('hash', tsr_result.get('work_identity', ''))
        if computed_root.lower() == tsr_hash.lower():
            return False, {
                'valid': False,
                'message': "本地时间戳无第三方签名,不具备密码学证明力。文件哈希一致但无法证明时间。",
                'root_hash': computed_root,
                'local_only': True,
            }
        else:
            return False, {
                'valid': False,
                'message': "验证失败:Merkle Root 与本地时间戳记录不匹配",
                'computed_root': computed_root,
            }
 
    # Step 3: RFC 3161 TSR — 密码学签名验证
    computed_digest = bytes.fromhex(computed_root)
    try:
        from .tsa_client import TSAClient
        tsa_client = TSAClient()
        verify_result = tsa_client.verify_tsr(tsa_path, digest=computed_digest)
 
        if verify_result.get('valid'):
            tsr_result = parse_tsr(tsa_path)
            return True, {
                'valid': True,
                'message': f"深度验证通过:{len(file_paths)} 个文件的 Merkle Root 与 TSA 签名匹配",
                'timestamp': tsr_result.get('timestamp'),
                'issuer': tsr_result.get('issuer'),
                'root_hash': computed_root,
            }
        else:
            return False, {
                'valid': False,
                'message': f"TSA 签名验证失败:{verify_result.get('message', 'unknown')}",
                'root_hash': computed_root,
            }
    except ImportError:
        # rfc3161ng 未安装,降级为结构性比对
        tsr_result = parse_tsr(tsa_path)
        tsr_hash_raw = tsr_result.get('hash', '')
        tsr_hash = tsr_hash_raw.split(':', 1)[1] if ':' in tsr_hash_raw else tsr_hash_raw
 
        if computed_root.lower() == tsr_hash.lower():
            return True, {
                'valid': True,
                'message': "结构验证通过(安装 rfc3161ng 可启用密码学签名验证)",
                'partial_verification': True,
                'root_hash': computed_root,
            }
        else:
            return False, {
                'valid': False,
                'message': "验证失败:Merkle Root 与 TSR 记录的哈希不匹配",
                'computed_root': computed_root,
                'tsr_hash': tsr_hash,
            }

3. 维权取证核心实现

3.1 浏览器截图

core/browser/session.py:BrowserManager._async_screenshot()

python
async def _async_screenshot(self, path: Optional[str]) -> dict:
    import tempfile
    if not path:
        path = os.path.join(tempfile.gettempdir(), "nephele_screenshot.png")
    await self._page.screenshot(path=path, full_page=False)
    return {"success": True, "message": f"截图已保存: {path}", "output_path": path}

关键事实full_page=False。这是视口截图,不是滚动长截图。超长页面的下方内容不会被视觉 capture。


3.2 维权取证主流程

tools/rights/url_evidence.py:URLEvidenceCapture.capture() 核心片段

python
def capture(self, url: str, progress_callback=None) -> Dict:
    total_phases = 5
    self._record(f"Starting evidence capture for: {url}")
 
    try:
        # Phase 1: Environment + DNS + TLS
        environment = self.collect_environment()
        dns_info = self.resolve_dns(url)
        tls_info = self.capture_tls_certificate(url)
 
        # Phase 2: Browser capture
        artifacts = self.capture_page(url, progress_callback=None)
 
        # Phase 3: Hash all artifacts
        file_hashes = self.hash_artifacts(artifacts)
 
        # Phase 4: Manifest
        log_path = Path(self.save_log())
        self._log_committed = True
        file_hashes[log_path.name] = {
            "sha256": self._sha256_file(log_path),
            "size": log_path.stat().st_size,
            "type": "operation_log",
        }
 
        manifest = self.generate_manifest(
            target_url=url, environment=environment,
            dns_info=dns_info, artifacts=artifacts,
            file_hashes=file_hashes, tls_info=tls_info,
        )
 
        # Phase 5: Timestamp
        ts_info = self.timestamp_manifest(manifest)
 
        # Phase 6: Package .nep
        nep_path = self._package_nep()
 
        return {
            "success": True,
            "evidence_id": self._evidence_id,
            "output_dir": str(self._output_dir),
            "nep_path": str(nep_path),
            "manifest": manifest,
            "timestamp_info": ts_info,
            "message": f"URL evidence captured"
        }

日志不可变性保证

  • save_log() 在 manifest 生成前调用
  • 写入后设置 self._log_committed = True
  • 此后 _record() 只追加内存列表,不再写入磁盘
  • manifest 中的 file_hashes 包含日志文件的 SHA-256
  • 因此 manifest 哈希锚定了"冻结"后的日志状态

3.3 TLS 证书抓取

tools/rights/url_evidence.py:URLEvidenceCapture.capture_tls_certificate()

python
def capture_tls_certificate(self, url: str) -> Dict:
    import ssl
    from urllib.parse import urlparse
 
    parsed = urlparse(url if "://" in url else f"https://{url}")
    hostname = parsed.hostname or ""
    port = parsed.port or 443
 
    if not hostname:
        return {"error": "Invalid hostname"}
 
    try:
        ctx = ssl.create_default_context()
        with ctx.wrap_socket(
            socket.socket(socket.AF_INET, socket.SOCK_STREAM),
            server_hostname=hostname,
        ) as sock:
            sock.settimeout(10)
            sock.connect((hostname, port))
            cert = sock.getpeercert()
            cert_der = sock.getpeercert(binary_form=True)
 
        cert_path = self._output_dir / "server_certificate.der"
        cert_path.write_bytes(cert_der)
 
        # ... PEM 转换和字段提取 ...
        return cert_info
    except Exception as e:
        return {"error": str(e)}

失败场景:自签名证书、ssl.SSLError、连接超时 → 返回 {"error": ...},非致命。


3.4 CAPTCHA 检测与处理

tools/rights/url_evidence.py:URLEvidenceCapture

python
_CAPTCHA_KEYWORDS = (
    "验证码", "验证", "captcha", "verify", "challenge",
    "human verification", "robot", "机器人",
)
 
def _is_captcha_page(self, title: str, url: str) -> bool:
    text = (title or "").lower()
    url_lower = (url or "").lower()
    for kw in self._CAPTCHA_KEYWORDS:
        if kw in text or kw in url_lower:
            return True
    return False

处理流程:

  1. headless 导航
  2. 检测标题关键词 → 判定 CAPTCHA
  3. 关闭 headless,打开 visible 浏览器
  4. 重新导航
  5. time.sleep(2) 轮询,最多 120 秒
  6. 超时后 capture 当前状态

阻塞风险:轮询期间使用 time.sleep(2) 阻塞当前线程。


4. 隐水印技术审计

4.1 架构概述

隐水印模块分为三层:

层级文件职责
底层库blind_watermark (PyPI)DWT+DCT+SVD 嵌入/提取核心算法
引擎层tools/packer/watermark_protection.py包装层:定长编码、round-trip 验证、Alpha 保留、异常回退
业务层tools/packer/logic.py / agent_api.py打包参数编排、可见水印与隐水印叠加
Worker 层core/workers/watermark_worker.py后台线程提取,避免阻塞 UI

全部运行在本地,零网络依赖。


4.2 底层库算法(blind_watermark)

Nephele 使用的底层库为 blind_watermark(github.com/guofei9987/blind_watermark),采用 DWT(离散小波变换)→ DCT(离散余弦变换)→ SVD(奇异值分解) 的三级混合域嵌入策略。

4.2.1 WaterMark 封装类

blind_watermark/blind_watermark.py

python
class WaterMark:
    def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
        self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
        self.password_wm = password_wm
        self.wm_bit = None
        self.wm_size = 0
 
    def read_img(self, filename=None, img=None):
        if img is None:
            img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
        self.bwm_core.read_img_arr(img=img)
        return img
 
    def read_wm(self, wm_content, mode='img'):
        if mode == 'bit':
            self.wm_bit = np.array(wm_content)
        # ... img / str 模式省略 ...
 
        self.wm_size = self.wm_bit.size
        # 水印加密:用 password_wm 作为种子对 bit 序列做伪随机置乱
        np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
        self.bwm_core.read_wm(self.wm_bit)
 
    def embed(self, filename=None, compression_ratio=None):
        embed_img = self.bwm_core.embed()
        if filename is not None:
            cv2.imwrite(filename=filename, img=embed_img)
        return embed_img
 
    def extract_decrypt(self, wm_avg):
        # 逆置乱:根据相同的 seed 生成相同的 shuffle index,再逆序还原
        wm_index = np.arange(self.wm_size)
        np.random.RandomState(self.password_wm).shuffle(wm_index)
        wm_avg[wm_index] = wm_avg.copy()
        return wm_avg
 
    def extract(self, filename=None, embed_img=None, wm_shape=None, mode='img'):
        if filename is not None:
            embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
        self.wm_size = np.array(wm_shape).prod()
 
        if mode in ('str', 'bit'):
            wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
        else:
            wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
 
        wm = self.extract_decrypt(wm_avg=wm_avg)
        return wm

关键事实

  • password_wm 用于 水印 bit 序列的伪随机置乱np.random.RandomState.shuffle
  • password_img 传递给 WaterMarkCore,用于 图像块的选择置乱
  • 加密本质是"基于已知种子的确定性 shuffle",不是现代密码学加密

4.2.2 WaterMarkCore 核心引擎

blind_watermark/bwm_core.py

python
class WaterMarkCore:
    def __init__(self, password_img=1, mode='common', processes=None):
        self.block_shape = np.array([4, 4])
        self.password_img = password_img
        self.d1, self.d2 = 36, 20  # 量化步长:越大鲁棒性越强,但失真越大
        self.pool = AutoPool(mode=mode, processes=processes)

图像预处理(read_img_arr

python
    def read_img_arr(self, img):
        # 处理透明图
        self.alpha = None
        if img.shape[2] == 4:
            if img[:, :, 3].min() < 255:
                self.alpha = img[:, :, 3]
                img = img[:, :, :3]
 
        # BGR -> YUV,补白边使像素变偶数(DWT 要求)
        self.img = img.astype(np.float32)
        self.img_shape = self.img.shape[:2]
        self.img_YUV = cv2.copyMakeBorder(
            cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
            0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
            cv2.BORDER_CONSTANT, value=(0, 0, 0)
        )
 
        # 对 Y/U/V 三个通道分别做 1 级 Haar DWT
        self.ca_shape = [(i + 1) // 2 for i in self.img_shape]
        self.ca_block_shape = (
            self.ca_shape[0] // self.block_shape[0],
            self.ca_shape[1] // self.block_shape[1],
            self.block_shape[0], self.block_shape[1]
        )
 
        for channel in range(3):
            self.ca[channel], self.hvd[channel] = dwt2(
                self.img_YUV[:, :, channel], 'haar'
            )
            # 将 CA(近似系数)转为 4D 分块数组
            self.ca_block[channel] = np.lib.stride_tricks.as_strided(
                self.ca[channel].astype(np.float32),
                self.ca_block_shape,
                strides=4 * np.array([
                    self.ca_shape[1] * self.block_shape[0],
                    self.block_shape[1], self.ca_shape[1], 1
                ])
            )

审计点

  • 颜色空间:BGR → YUV,水印嵌入在 Y(亮度)通道的 DWT 近似子带
  • DWT 级数:仅 1 级 Haar,不是多级分解
  • 分块大小:固定 4×4,将 CA 子带切成不重叠的小块

4.2.3 块级嵌入流程(block_add_wm_slow

python
    def block_add_wm_slow(self, arg):
        block, shuffler, i = arg
        wm_1 = self.wm_bit[i % self.wm_size]
 
        # Step 1: 对 4x4 块做 DCT
        block_dct = dct(block)
 
        # Step 2: flatten 后按 shuffler 打乱顺序(块内置乱)
        block_dct_shuffled = block_dct.flatten()[shuffler].reshape(self.block_shape)
 
        # Step 3: SVD 分解
        u, s, v = svd(block_dct_shuffled)
 
        # Step 4: 在奇异值上嵌入 1 bit 水印
        # 量化公式:把 s[0] 量化到 d1 的整数倍,再根据 wm_1 偏移 1/4 个步长
        s[0] = (s[0] // self.d1 + 1/4 + 1/2 * wm_1) * self.d1
        if self.d2:
            s[1] = (s[1] // self.d2 + 1/4 + 1/2 * wm_1) * self.d2
 
        # Step 5: 逆 SVD
        block_dct_flatten = np.dot(u, np.dot(np.diag(s), v)).flatten()
 
        # Step 6: 逆置乱
        block_dct_flatten[shuffler] = block_dct_flatten.copy()
 
        # Step 7: 逆 DCT
        return idct(block_dct_flatten.reshape(self.block_shape))

数学原理

嵌入公式(以 s[0] 为例):

text
s'[0] = (floor(s[0] / d1) + 1/4 + 1/2 * w) * d1

其中 w ∈ {0, 1} 为水印 bit。提取时:

text
w = 1  if (s[0] mod d1) > (d1 / 2)  else 0

d1=36 意味着每个 bit 的量化间隔为 36,对系数的最大扰动约为 0.75 × d1 = 27


4.2.4 块级提取流程(block_get_wm_slow

python
    def block_get_wm_slow(self, args):
        block, shuffler = args
        block_dct_shuffled = dct(block).flatten()[shuffler].reshape(self.block_shape)
        u, s, v = svd(block_dct_shuffled)
 
        # 从 s[0] 提取 bit
        wm = (s[0] % self.d1 > self.d1 / 2) * 1
        if self.d2:
            # s[1] 作为辅助,加权平均
            tmp = (s[1] % self.d2 > self.d2 / 2) * 1
            wm = (wm * 3 + tmp * 1) / 4
        return wm

审计点

  • d2(默认 20)为辅助量化步长,s[0] 权重 3,s[1] 权重 1
  • d2=0 时退化为单奇异值提取

4.2.5 全局嵌入流程(embed

python
    def embed(self):
        self.init_block_index()
        embed_ca = copy.deepcopy(self.ca)
 
        # 生成块选择置乱序列(跨块置乱)
        self.idx_shuffle = random_strategy1(
            self.password_img, self.block_num,
            self.block_shape[0] * self.block_shape[1]
        )
 
        for channel in range(3):
            # 对每个块并行执行 block_add_wm
            tmp = self.pool.map(self.block_add_wm, [
                (self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i], i)
                for i in range(self.block_num)
            ])
 
            # 写回 4D 数组
            for i in range(self.block_num):
                self.ca_block[channel][self.block_index[i]] = tmp[i]
 
            # 4D -> 2D,拼接回 CA 子带
            self.ca_part[channel] = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
            embed_ca[channel][:self.part_shape[0], :self.part_shape[1]] = self.ca_part[channel]
 
            # 逆 DWT
            embed_YUV[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")
 
        # 合并三通道,YUV -> BGR,裁剪回原始尺寸
        embed_img_YUV = np.stack(embed_YUV, axis=2)
        embed_img_YUV = embed_img_YUV[:self.img_shape[0], :self.img_shape[1]]
        embed_img = cv2.cvtColor(embed_img_YUV, cv2.COLOR_YUV2BGR)
        embed_img = np.clip(embed_img, a_min=0, a_max=255)
 
        if self.alpha is not None:
            embed_img = cv2.merge([embed_img.astype(np.uint8), self.alpha])
        return embed_img

关键事实

  • 循环嵌入:水印 bit 序列在 block_num 个块中循环重复嵌入(wm_bit[i % wm_size]
  • 三通道独立:Y/U/V 三个通道各嵌入完整的一份水印,提取时做平均
  • 块内置乱idx_shuffle[i]):每个 4×4 块内部的 16 个 DCT 系数顺序被打乱
  • 跨块顺序block_index):块遍历顺序是固定的行列扫描,未置乱

4.2.6 K-Means 二值化(one_dim_kmeans

python
def one_dim_kmeans(inputs):
    threshold = 0
    e_tol = 10 ** (-6)
    center = [inputs.min(), inputs.max()]
    for i in range(300):
        threshold = (center[0] + center[1]) / 2
        is_class01 = inputs > threshold
        center = [inputs[~is_class01].mean(), inputs[is_class01].mean()]
        if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol:
            threshold = (center[0] + center[1]) / 2
            break
    is_class01 = inputs > threshold
    return is_class01

用于 extract_with_kmeans 模式(Nephele 的 mode="bit" 不经过此路径,直接返回 wm_avg)。


4.2.7 随机置乱策略

python
def random_strategy1(seed, size, block_shape):
    return np.random.RandomState(seed) \
        .random(size=(size, block_shape)) \
        .argsort(axis=1)

生成 size × block_shape 的随机矩阵,按行 argsort 得到每行的置乱索引。对于 password_imgsize = block_numblock_shape = 16


4.3 引擎包装层源码

Nephele 在底层库之上增加了定长编码、round-trip 验证、Alpha 通道保留和异常回退。

4.3.1 常量与工具函数

tools/packer/watermark_protection.py

python
WATERMARK_BYTES = 32
WATERMARK_BITS = WATERMARK_BYTES * 8
_WM_PASSWORD_IMG = <REDACTED>   # 图片置乱种子
_WM_PASSWORD_WM = <REDACTED>    # 水印加密密钥
 
 
def _text_to_bits(text: str) -> list[int]:
    raw = text.encode("utf-8")[:WATERMARK_BYTES]
    padded = raw.ljust(WATERMARK_BYTES, b"\x00")
    bits = []
    for byte in padded:
        for i in range(7, -1, -1):
            bits.append((byte >> i) & 1)
    return bits
 
 
def _bits_to_text(bits: list) -> str:
    raw = bytearray()
    for i in range(0, len(bits), 8):
        chunk = bits[i:i+8]
        if len(chunk) < 8:
            break
        val = 0
        for b in chunk:
            val = (val << 1) | (1 if b > 0.5 else 0)
        raw.append(val)
    return raw.rstrip(b"\x00").decode("utf-8", errors="replace")

审计点

  • UTF-8 定长截断:超长文本静默截断至 32 字节
  • 阈值判定:提取时 b > 0.5 视为 1,对噪声有一定容忍

4.3.2 嵌入引擎

python
class WatermarkEngine:
    _instance = None
 
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
 
    def embed(self, image: Image.Image, text: str) -> Image.Image:
        try:
            from blind_watermark import WaterMark
 
            rgb = image.convert("RGB")
            alpha = image.split()[3] if image.mode == "RGBA" else None
 
            with tempfile.TemporaryDirectory() as tmpdir:
                orig_path = str(Path(tmpdir) / "orig.png")
                wm_path = str(Path(tmpdir) / "watermarked.png")
                rgb.save(orig_path, format="PNG")
 
                bits = _text_to_bits(text)
 
                bwm = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                )
                bwm.read_img(orig_path)
                bwm.read_wm(np.array(bits), mode="bit")
                bwm.embed(wm_path)
 
                result_img = Image.open(wm_path).convert("RGB")
 
                # Round-trip verify
                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(wm_path, wm_shape=WATERMARK_BITS, mode="bit")
                extracted_text = _bits_to_text(extracted_bits)
 
                if extracted_text == text[:WATERMARK_BYTES]:
                    logger.info("[Watermark] Verified: '%s'", extracted_text)
                else:
                    logger.warning(
                        "[Watermark] Verify mismatch: '%s' -> '%s'",
                        text[:WATERMARK_BYTES], extracted_text,
                    )
 
            if alpha:
                result_img = result_img.convert("RGBA")
                result_img.putalpha(alpha)
            return result_img
 
        except Exception as e:
            logger.error("Embed failed: %s", e, exc_info=True)
            return image

审计点

  • 单例模式:WatermarkEngine 为单例,但 blind_watermark.WaterMark 每次新建实例
  • 文件级 I/O:通过 TemporaryDirectory + PNG 临时文件工作,规避 numpy array 模式的 dtype/shape 兼容性 bug
  • 验证失败仅打 warning,仍返回含水印图片
  • Alpha 通道:RGBA 输入先转 RGB 嵌入,完成后恢复 alpha
  • 异常回退:任何异常返回原始 image,调用方无感知失败

4.3.3 提取引擎

python
    def extract(self, image: Image.Image) -> Optional[str]:
        try:
            from blind_watermark import WaterMark
            rgb = image.convert("RGB")
            bit_len = WATERMARK_BITS
 
            with tempfile.TemporaryDirectory() as tmpdir:
                img_path = str(Path(tmpdir) / "check.png")
                rgb.save(img_path, format="PNG")
 
                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(img_path, wm_shape=bit_len, mode="bit")
 
            text = _bits_to_text(extracted_bits)
            return text.strip() if text.strip() else None
        except Exception as e:
            logger.warning("Extract failed: %s", e)
            return None

审计点

  • 提取失败返回 None,无法区分"图片无水印"与"提取过程出错"
  • 空字符串(全零填充)经 strip() 后同样返回 None

4.3.4 公共 API

python
def protect_image(
    image: Image.Image,
    level: str = "none",
    copyright_info: str = "ARIS",
) -> Image.Image:
    level = LEVEL_ALIASES.get(level, level)
    if level == "invisible":
        return WatermarkEngine().embed(image, copyright_info)
    return image
 
 
def extract_watermark(image: Image.Image) -> Optional[str]:
    return WatermarkEngine().extract(image)

4.4 业务层调用

tools/packer/agent_api.py:pack_image()

python
def pack_image(
    input_path: str,
    watermark_path: Optional[str] = None,
    output_dir: Optional[str] = None,
    watermark_mode: str = "center",
    watermark_opacity: float = 0.3,
    preview_max_size: int = 1920,
    thumbnail_max_size: int = 500,
    protection_level: str = "none",
    copyright_info: str = "© ArisFusion Studio",
    output_folder_name: str = "Delivery_Pack",
) -> dict:
    ...
    packer = DeliveryPacker(
        ...,
        protection_level=protection_level,
        copyright_info=copyright_info,
    )
    result_dir, results = packer.process_image(...)
    ...

关键事实

  • protection_level 默认 "none",即默认不启用隐水印
  • copyright_info 最长 32 字节(超长静默截断)

4.5 Worker 层实现

core/workers/watermark_worker.py

python
class WatermarkExtractWorker(QThread):
    finished = Signal(str)  # watermark result or empty string
    logMessage = Signal(str, str)
 
    def run(self):
        try:
            ensure_src_path()
            from PIL import Image
            from tools.packer.watermark_protection import extract_watermark
 
            img_path = Path(self.image_path)
            if not img_path.exists():
                self.finished.emit("")
                return
 
            image = Image.open(img_path)
            watermark = extract_watermark(image)
 
            if watermark:
                self.finished.emit(watermark)
            else:
                self.finished.emit("")
        except Exception as e:
            self.finished.emit("")

4.6 容量与编码

编码每字符字节最大字符数
ASCII132
CJK (UTF-8)310
混合视具体字符而定

5. AI 元数据检测白盒源码审计

本节审计 Nephele Workshop 的 AI 元数据 / C2PA 凭据检测 功能。该功能用于读取图片文件中已有的机器可读证据,包括 C2PA 内容凭证、生成工具元数据、平台声明和导出痕迹。它不使用视觉风格分类模型,不把"未检出"解释为"非 AI"。

5.1 架构概述

AI 元数据检测分为四层:

层级文件职责
规则层tools/validator/logic.py:MetaDataDetector元数据读取、规则匹配、证据分级、最终状态输出
C2PA 层tools/validator/c2pa_verifier.py官方 C2PA SDK 适配、manifest 读取、签名链与信任状态解析
Worker 层core/workers/ai_detector_worker.py批量检测线程、错误隔离、结果信号
UI 层gui/qml/views/AIValidatorView.qml将 raw evidence 映射成用户友好的证据标签

数据流:

text
用户选择图片
  -> core/workers/ai_detector_worker.py
  -> tools/validator/logic.py:MetaDataDetector.detect()
       ├── Pillow 读取 PNG/JPEG/WebP/TIFF 元数据(PNG info / EXIF)
       ├── tools/validator/c2pa_verifier.py:verify_c2pa_file()(官方 SDK)
       └── 原始字节扫描(JUMBF / APP11 fallback)
  -> 返回 {status, reason, tool, evidence}
  -> UI 映射证据标签

检测结果结构:

python
{
    "status": "ai" | "unknown" | "human" | "error",
    "reason": str,
    "tool": str,
    "evidence": str,
}

注意

status="human" 是历史字段名。UI 不应把它展示为"人类作品实锤",而应展示为"未发现凭据"或"凭据不足"。


5.2 MetaDataDetector 规则层

tools/validator/logic.py:MetaDataDetector 是规则匹配的主类,本身不持有状态,__init__ 为空。

5.2.1 规则常量

python
class MetaDataDetector:
    """
    Detects AI generation metadata from image files using heuristic analysis
    of EXIF, PNG info chunks, and generation parameters.
    """
 
    # 1. 明确的软件签名 (强特征,优先匹配专有字符串)
    # 顺序:Midjourney 先于 Gemini,以避免误判
    AI_SOFTWARE_SIGNATURES = {
        "Midjourney": [
            re.compile(r"job id:\s*[a-f0-9\-]+", re.IGNORECASE),  # 专有 Job ID
            re.compile(r"--ar\s*\d+:\d+", re.IGNORECASE),         # --ar 参数
            re.compile(r"--v\s*\d+", re.IGNORECASE),              # --v 参数
            re.compile(r"--stylize\s*\d+", re.IGNORECASE),        # --stylize
            re.compile(r"midjourney", re.IGNORECASE),             # 显式名称
            re.compile(r"mj v", re.IGNORECASE),
            re.compile(r"mj_", re.IGNORECASE),
        ],
        "ComfyUI": [
            re.compile(r"comfyui", re.IGNORECASE),
            re.compile(r"comfyland", re.IGNORECASE),
        ],
        "Gemini (Google)": [
            re.compile(r"gemini", re.IGNORECASE),
            re.compile(r"google deepmind", re.IGNORECASE),
            re.compile(r"generated by google", re.IGNORECASE),
            re.compile(r"google imagen", re.IGNORECASE),
            re.compile(r"imagen by google", re.IGNORECASE),
            re.compile(r"synthid", re.IGNORECASE),
            re.compile(r"nano banana", re.IGNORECASE),
            re.compile(r"nanobanana", re.IGNORECASE),
        ],
        "DALL-E": [re.compile(r"dall-e", re.IGNORECASE), re.compile(r"dalle", re.IGNORECASE)],
        "NovelAI": [re.compile(r"novelai", re.IGNORECASE), re.compile(r"nai-diffusion", re.IGNORECASE)],
        "InvokeAI": [re.compile(r"invokeai", re.IGNORECASE), re.compile(r"invoke ai", re.IGNORECASE)],
        "Fooocus": [re.compile(r"fooocus", re.IGNORECASE)],
        "Stable Diffusion": [re.compile(r"stable diffusion", re.IGNORECASE), re.compile(r"sd\.?next", re.IGNORECASE), re.compile(r"forge", re.IGNORECASE)],
        "Leonardo.ai": [re.compile(r"leonardo.ai", re.IGNORECASE)],
        "Adobe Firefly": [re.compile(r"adobe firefly", re.IGNORECASE)],
        "Bing Image Creator": [re.compile(r"bing image creator", re.IGNORECASE)],
    }
 
    # 2. 生成参数指纹 (次优先,当没有软件名时)
    GENERATION_PARAM_FINGERPRINTS = [
        (r"Steps:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"CFG scale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Sampler:\s*\w+", "Stable Diffusion (Parameters)"),
        (r"Seed:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Model hash:\s*[a-f0-9]+", "Stable Diffusion (Parameters)"),
        (r"Model:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Negative prompt:", "Stable Diffusion (Parameters)"),
        (r"Size:\s*\d+x\d+", "Stable Diffusion (Parameters)"),
        (r"Clip skip:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Schedule type:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Denoising strength:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Hires upscale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"DigitalSourceType\s*[:=]\s*(?:http://cv\.iptc\.org/newscodes/digitalsourcetype/)?trainedAlgorithmicMedia", "Generative AI (IPTC/XMP Standard)"),
    ]
 
    RAW_METADATA_SCAN_LIMIT = 128 * 1024 * 1024
 
    C2PA_CONTAINER_MARKERS = [
        b"c2pa", b"jumbf", b"content credentials",
        b"contentcredentials", b"contentauth",
    ]
 
    C2PA_AI_MARKERS = [
        b"trainedalgorithmicmedia",
        b"compositewithtrainedalgorithmicmedia",
        b"algorithmicmedia",
        b"generated by ai", b"ai generated",
        b"adobe firefly", b"google imagen", b"synthid",
        b"dall-e", b"dalle", b"midjourney", b"stable diffusion",
    ]

审计点

  • 规则层级:软件签名(强)→ 参数指纹(次)→ C2PA 容器 + AI marker(强)→ 弱特征(文件名)
  • 顺序敏感AI_SOFTWARE_SIGNATURESdict,Python 3.7+ 保留插入顺序。Midjourney 排在 Gemini (Google) 前,用于阻止 Midjourney 图内引用 Google 工具时被错判为 Gemini。
  • SynthID 歧义synthid 字符串被视为 Gemini 强证据,但这只是字符串匹配,本模块不解码 SynthID 像素水印
  • IPTC 通用标记trainedAlgorithmicMedia 不是 Google 专有;只有在同一文本中同时出现 Google 证据时才归因 Gemini,否则归为 Generative AI (Unknown)
  • 原始扫描上限:128 MB,超过即跳过字节级 fallback(只依赖 Pillow 与 C2PA SDK)。

5.2.2 detect() 主流程

python
def detect(self, image_path: str) -> Dict[str, str]:
    img_path = Path(image_path)
    if not img_path.exists():
        return {"status": "error", "reason": "文件不存在", "tool": "", "evidence": ""}
 
    evidence_found = []
    weak_evidence_found = []
    context_evidence_found = []
    detected_tool = None
    extension_mismatch = False
    minimal_web_jpeg = False
 
    try:
        with Image.open(img_path) as img:
            img.load()  # Ensure header is loaded
            actual_format = (img.format or "").upper()
            suffix = img_path.suffix.lower()
            if actual_format == "JPEG" and suffix not in {".jpg", ".jpeg", ".jpe"}:
                extension_mismatch = True
                context_evidence_found.append(f"File extension mismatch: {suffix} file contains JPEG data")
            elif actual_format == "PNG" and suffix != ".png":
                extension_mismatch = True
                context_evidence_found.append(f"File extension mismatch: {suffix} file contains PNG data")
 
            minimal_jpeg_keys = {"jfif", "jfif_density", "jfif_unit", "jfif_version", "progression", "progressive"}
            if actual_format == "JPEG" and set(img.info.keys()).issubset(minimal_jpeg_keys):
                minimal_web_jpeg = True
                context_evidence_found.append("Minimal JPEG metadata only")
 
            # --- 1. Check PNG Info / tEXt chunks ---
            if hasattr(img, 'info') and img.info:
                software_value = img.info.get("Software") or img.info.get("software")
                if isinstance(software_value, bytes):
                    software_text = software_value.decode("utf-8", errors="ignore")
                else:
                    software_text = str(software_value or "")
                software_lower = software_text.lower()
                if "celsys" in software_lower or "clip studio" in software_lower:
                    context_evidence_found.append("Edited/exported by CELSYS/Clip Studio Paint")
 
                if not detected_tool:
                    for key, val in img.info.items():
                        if not isinstance(val, (str, bytes)):
                            continue
                        structured_res = self._detect_structured_generator_metadata(key, val)
                        if structured_res:
                            detected_tool, marker = structured_res
                            evidence_found.append(marker)
                            break
 
                # Case A: A1111 / SD
                if 'parameters' in img.info:
                    val = img.info['parameters']
                    if isinstance(val, str) and ("Steps:" in val or "Prompt" in val):
                        detected_tool = "Stable Diffusion (A1111)"
                        evidence_found.append("Stable Diffusion parameters chunk")
                        res = self._analyze_text(val)
                        if res:
                            detected_tool, marker = res
                            evidence_found.append(f"Parameters: {marker}")
 
                # Case B: ComfyUI (专有检查)
                if not detected_tool and ('workflow' in img.info or 'prompt' in img.info):
                    try:
                        if 'workflow' in img.info:
                            json.loads(img.info['workflow'])
                            evidence_found.append("Valid 'workflow' JSON")
                        if 'prompt' in img.info:
                            json.loads(img.info['prompt'])
                            evidence_found.append("Valid 'prompt' JSON")
                        detected_tool = "ComfyUI"
                    except json.JSONDecodeError:
                        pass
 
                # Case C: Generic Scan (其他 info)
                if not detected_tool:
                    for key, val in img.info.items():
                        if isinstance(val, (str, bytes)):
                            val_str = self._metadata_value_to_text(val)
                            res = self._analyze_text(val_str)
                            if res:
                                detected_tool, marker = res
                                evidence_found.append(f"PNG Info '{key}': {marker}")
                                break
 
                for key, val in img.info.items():
                    if not isinstance(val, (str, bytes)):
                        continue
                    val_str = val.decode("utf-8", errors="ignore") if isinstance(val, bytes) else str(val)
                    if "DigitalSourceType" in val_str and "trainedAlgorithmicMedia" in val_str:
                        marker = f"PNG Info '{key}': IPTC/XMP trainedAlgorithmicMedia"
                        if marker not in evidence_found:
                            evidence_found.append(marker)
 
            # --- 2. Check EXIF / XMP Data ---
            if not detected_tool:
                exif = img.getexif()
                if exif:
                    for tag_id, value in exif.items():
                        tag_name = ExifTags.TAGS.get(tag_id, str(tag_id))
                        if isinstance(value, bytes):
                            try:
                                value_str = value.decode('utf-8', errors='ignore')
                            except:
                                continue
                        else:
                            value_str = str(value)
                        res = self._analyze_text(value_str)
                        if res:
                            detected_tool, marker = res
                            evidence_found.append(f"EXIF {tag_name}: {marker}")
                            break
 
    except Exception as e:
        return {"status": "error", "reason": f"读取错误: {str(e)}", "tool": "", "evidence": ""}
 
    # --- 3. Official C2PA manifest/signature verification ---
    c2pa_available = False
    c2pa_has_manifest = False
    c2pa_claim_generator = ""
    try:
        from tools.validator.c2pa_verifier import verify_c2pa_file
 
        c2pa_result = verify_c2pa_file(img_path)
        c2pa_available = c2pa_result.available
        if c2pa_result.has_manifest:
            c2pa_has_manifest = True
            c2pa_claim_generator = c2pa_result.claim_generator or ""
            evidence_found.append(c2pa_result.evidence_summary())
            if c2pa_result.ai_generated:
                detected_tool = "Generative AI (C2PA Content Credentials)"
        elif c2pa_result.available and c2pa_result.error:
            evidence_found.append(f"C2PA verification error: {c2pa_result.error}")
        elif c2pa_result.available:
            evidence_found.append("No C2PA manifest found")
    except Exception as e:
        evidence_found.append(f"C2PA verification error: {e}")
 
    # --- 4. Raw metadata fallback: C2PA/JUMBF/XMP payloads ---
    if not detected_tool and not c2pa_has_manifest:
        res, has_c2pa_container = self._scan_raw_metadata(img_path)
        if res:
            detected_tool, marker = res
            prefix = "Raw metadata"
            if not c2pa_available and has_c2pa_container:
                prefix = "Raw metadata (official C2PA verifier unavailable)"
            evidence_found.append(f"{prefix}: {marker}")
        elif has_c2pa_container:
            if c2pa_available:
                evidence_found.append("C2PA Content Credentials found, no AI generation marker")
            else:
                evidence_found.append("C2PA Content Credentials found, official verifier unavailable")
 
    # --- 5. Visible Google/Gemini watermark ---
    google_context = "google" in c2pa_claim_generator.lower()
    filename_lower = img_path.name.lower()
    filename_gemini_hint = "gemini" in filename_lower or "google" in filename_lower
    if not detected_tool and (google_context or filename_gemini_hint):
        if self._detect_google_visible_watermark(img_path):
            detected_tool = "Gemini (Google Visible Watermark)"
            evidence_found.append("Visible watermark: Google/Gemini sparkle mark")
 
    # --- 6. Final Fallback: Filename Check ---
    if not detected_tool:
        res = self._analyze_text(filename_lower, include_weak_markers=False)
        if res:
            weak_tool, marker = res
            weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")
 
    if extension_mismatch and minimal_web_jpeg:
        context_evidence_found.append("Downloaded file appears re-encoded or metadata-stripped")
 
    # --- Result Construction ---
    if detected_tool:
        return {
            "status": "ai",
            "reason": f"检测到 {detected_tool} 元数据",
            "tool": detected_tool,
            "evidence": "; ".join(evidence_found + context_evidence_found)
        }
    else:
        if weak_evidence_found:
            return {
                "status": "unknown",
                "reason": "AI indicators found, but C2PA credentials do not declare AI generation",
                "tool": "",
                "evidence": "; ".join(evidence_found + context_evidence_found + weak_evidence_found)
            }
        reason = "C2PA Content Credentials do not declare AI generation" if c2pa_has_manifest else "No known AI generation metadata detected"
        return {
            "status": "human",
            "reason": reason,
            "tool": "",
            "evidence": "; ".join(evidence_found + context_evidence_found) if evidence_found or context_evidence_found else "No metadata signatures found"
        }

审计点(执行顺序)

  1. 打开 Pillow 句柄,识别 format / suffix 不一致、最小 JPEG 元数据、CELSYS 导出等上下文证据(不升级状态)
  2. PNG info:parameters → A1111;workflow / prompt JSON → ComfyUI;其他 key 用 _detect_structured_generator_metadata() 识别 NovelAI / InvokeAI / Fooocus;最后通用 _analyze_text() 跑一遍
  3. EXIF:对每个标签做 _analyze_text()
  4. 官方 C2PA SDKverify_c2pa_file):manifest 存在即抓 claim_generator 与 AI marker
  5. 原始字节 fallback:仅在前两步都未命中时执行,读取整个文件头(≤ 128 MB),搜 c2pa / jumbf / contentauth 容器 + AI marker 字符串
  6. Google 可见水印:仅在"有 Google claim_generator"或"文件名含 gemini / google"时启用,避免对所有图片做视觉扫描
  7. 文件名弱线索:不会触发 ai,最多产生 unknown

状态收敛规则

证据组合status
detected_tool 被赋值(任一强证据命中)ai
无强证据,但有 weak_evidence_found(文件名提示)unknown
无任何证据 + C2PA manifest 存在但未声明 AIhuman(reason: "C2PA Content Credentials do not declare AI generation")
无任何证据 + 无 manifesthuman(reason: "No known AI generation metadata detected")
Pillow 抛异常error

注意

"无元数据"与"有 C2PA manifest 但声明非 AI"在 status 上都归为 human,只能通过 reason 区分。UI 必须读取 reason,否则会把"元数据被清洗"的图片误展示为"确认非 AI"。


5.2.3 元数据辅助解析

python
def _decode_metadata_bytes(self, data: bytes) -> str:
    """Best-effort decoding for embedded XMP/C2PA text inside binary assets."""
    if not data:
        return ""
    text = data.decode("utf-8", errors="ignore")
    if len(text.strip()) < 8:
        text = data.decode("latin-1", errors="ignore")
    return text
 
def _metadata_value_to_text(self, value: Any) -> str:
    if isinstance(value, bytes):
        return value.decode("utf-8", errors="ignore")
    return str(value)
 
def _json_loads(self, value: str) -> Any:
    try:
        return json.loads(value)
    except Exception:
        return None
 
def _json_has_keys(self, value: Any, keys: set[str]) -> bool:
    if isinstance(value, dict):
        lowered = {str(key).lower() for key in value.keys()}
        if lowered.intersection(keys):
            return True
        return any(self._json_has_keys(item, keys) for item in value.values())
    if isinstance(value, list):
        return any(self._json_has_keys(item, keys) for item in value)
    return False
 
def _detect_structured_generator_metadata(self, key: str, value: Any) -> Optional[Tuple[str, str]]:
    text = self._metadata_value_to_text(value)
    lowered_key = key.lower()
    lowered_text = text.lower()
    parsed = self._json_loads(text)
 
    if "invoke" in lowered_key or "invokeai" in lowered_text or "invoke ai" in lowered_text:
        return "InvokeAI", f"PNG Info '{key}': InvokeAI metadata"
 
    if "fooocus" in lowered_key or "fooocus" in lowered_text:
        return "Fooocus", f"PNG Info '{key}': Fooocus metadata"
 
    if "novelai" in lowered_text or "nai-diffusion" in lowered_text:
        return "NovelAI", f"PNG Info '{key}': NovelAI metadata"
 
    if isinstance(parsed, dict):
        has_generation_keys = self._json_has_keys(
            parsed,
            {"sampler", "sampler_name", "steps", "scale", "cfg_scale", "seed", "model", "model_hash", "uc"},
        )
        has_novelai_shape = self._json_has_keys(parsed, {"uc"}) and self._json_has_keys(parsed, {"sampler", "steps", "scale"})
        if has_novelai_shape:
            return "NovelAI", f"PNG Info '{key}': NovelAI generation JSON"
 
        if "invoke" in lowered_key and has_generation_keys:
            return "InvokeAI", f"PNG Info '{key}': InvokeAI generation JSON"
 
        if lowered_key in {"sd-metadata", "sd_metadata", "generation_data", "generation_data_formatted"} and has_generation_keys:
            return "Stable Diffusion", f"PNG Info '{key}': Stable Diffusion generation JSON"
 
    return None

审计点

  • NovelAI 判据:JSON 中同时含 uc 与(sampler / steps / scale)之一 → NovelAI generation JSON。这个形状判据允许 NovelAI 把显式名称清洗后仍被识别。
  • InvokeAI / SD JSON:必须"key 名 + 生成字段"同时命中,防止任意 JSON 被当成生成元数据。

5.2.4 原始字节扫描(C2PA/JUMBF fallback)

python
def _analyze_c2pa_bytes(self, data: bytes) -> Optional[Tuple[str, str]]:
    """Detect AI signals embedded in C2PA/JUMBF Content Credentials payloads."""
    if not data:
        return None
 
    lowered = data.lower()
    has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
    if not has_c2pa_container:
        return None
 
    for marker in self.C2PA_AI_MARKERS:
        if marker in lowered:
            return "Generative AI (C2PA Content Credentials)", marker.decode("ascii", errors="ignore")
 
    text = self._decode_metadata_bytes(data)
    res = self._analyze_text(text, include_weak_markers=False)
    if res:
        tool, marker = res
        return tool, f"C2PA payload: {marker}"
 
    return None
 
def _scan_raw_metadata(self, image_path: Path) -> Tuple[Optional[Tuple[str, str]], bool]:
    """
    Scan raw image bytes for metadata payloads Pillow does not expose.
 
    C2PA manifests are commonly stored as JUMBF boxes or JPEG APP11 segments.
    Pillow can open those files without surfacing the manifest through
    Image.info or EXIF, so a raw marker scan is needed as a fallback.
    """
    try:
        size = image_path.stat().st_size
        if size > self.RAW_METADATA_SCAN_LIMIT:
            return None, False
        data = image_path.read_bytes()
    except OSError:
        return None, False
 
    lowered = data.lower()
    has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
    c2pa_res = self._analyze_c2pa_bytes(data)
    if c2pa_res:
        return c2pa_res, has_c2pa_container
 
    text = self._decode_metadata_bytes(data)
    return self._analyze_text(text, include_weak_markers=False), has_c2pa_container

审计点

  • 仅在"官方 C2PA SDK 读不到 manifest"且"PNG info/EXIF 未命中"时执行,避免对所有图片做整文件扫描
  • 扫描整个文件字节,性能开销取决于文件大小;> 128 MB 的文件直接跳过
  • 扫描到 container 但无 AI marker → has_c2pa_container=Truedetect() 把它作为上下文信息附在 evidence 里,不升级状态
  • 该路径对恶意伪造 C2PA 字符串没有防御能力(见 §5.3 官方 SDK 做的签名验证才是可信链路)

5.2.5 文本规则分析

python
def _analyze_text(self, text: str, include_weak_markers: bool = True) -> Optional[Tuple[str, str]]:
    """
    Analyze a string for AI markers.
    Returns: (tool_name, found_marker) or None
 
    检测优先级:
    1. 先检查明确的软件签名(使用 regex 更严格匹配)
    2. 再检查生成参数指纹(如果 IPTC 标记,检查是否有 Google 证据,否则通用)
    """
    if not text:
        return None
 
    text_lower = text.lower()
 
    # 1. Check Explicit Software Names (按字典顺序,Midjourney 先)
    weak_patterns = {r"mj_"}
    for tool, patterns in self.AI_SOFTWARE_SIGNATURES.items():
        for pattern in patterns:
            if not include_weak_markers and pattern.pattern in weak_patterns:
                continue
            if pattern.search(text_lower):
                return tool, pattern.pattern
 
    # 2. Check Generation Parameter Fingerprints
    match_count = 0
    evidence = []
    detected_tool = None
    for pattern, tool_name in self.GENERATION_PARAM_FINGERPRINTS:
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            if "IPTC/XMP Standard" in tool_name:
                # 通用 IPTC,优先检查是否有 Google 证据
                if any(re.search(p, text_lower) for p in self.AI_SOFTWARE_SIGNATURES["Gemini (Google)"]):
                    return "Gemini (Google)", "IPTC/XMP with Google Evidence"
                else:
                    detected_tool = "Generative AI (Unknown)"
                    evidence.append("IPTC/XMP Signature")
            else:
                match_count += 1
                evidence.append(pattern)
 
    if match_count >= 1:
        return "Stable Diffusion WebUI", "Generation Parameters Detected"
 
    if detected_tool:
        return detected_tool, "; ".join(evidence)
 
    return None

审计点

  • include_weak_markers=False 是文件名扫描模式:mj_ 这种两字符前缀太容易误命中(比如 mj_portrait.jpg),在文件名上下文中被屏蔽
  • 参数指纹匹配 ≥ 1 即升级 SD:单个 Steps:Sampler: 就足够判定,宽松但可能对"用户在注释里抄了 SD 参数"的非 AI 图产生误报
  • IPTC 归因分支trainedAlgorithmicMedia 碰到 Google 关键词即归 Gemini,否则标 Generative AI (Unknown)。不会归给 Midjourney / DALL-E

5.2.6 可见 Google/Gemini 水印

python
def _detect_google_visible_watermark(self, image_path: Path) -> bool:
    """
    Detect the visible Google/Gemini sparkle mark often placed near the
    lower-right area of generated images. This is visual evidence, not a
    SynthID decoder.
    """
    try:
        with Image.open(image_path) as img:
            img = img.convert("RGB")
            scale = 512 / max(img.size)
            if scale < 1:
                img = img.resize((round(img.width * scale), round(img.height * scale)))
 
            width, height = img.size
            pix = img.load()
            mask = set()
            for y in range(height // 2, height):
                for x in range(width // 2, width):
                    r, g, b = pix[x, y]
                    saturation = max(r, g, b) - min(r, g, b)
                    luminance = (r * 299 + g * 587 + b * 114) // 1000
                    if 110 <= luminance <= 245 and saturation < 28:
                        if not (luminance > 235 and saturation < 8):
                            mask.add((x, y))
 
            seen = set()
            for pt in list(mask):
                if pt in seen:
                    continue
                stack = [pt]
                seen.add(pt)
                xs = []
                ys = []
                while stack:
                    x, y = stack.pop()
                    xs.append(x)
                    ys.append(y)
                    for nx in (x - 1, x, x + 1):
                        for ny in (y - 1, y, y + 1):
                            npt = (nx, ny)
                            if npt in mask and npt not in seen:
                                seen.add(npt)
                                stack.append(npt)
 
                area = len(xs)
                min_x, max_x = min(xs), max(xs)
                min_y, max_y = min(ys), max(ys)
                comp_w = max_x - min_x + 1
                comp_h = max_y - min_y + 1
                center_x = (min_x + max_x) / 2
                center_y = (min_y + max_y) / 2
                density = area / max(1, comp_w * comp_h)
 
                if not (80 <= area <= 900):
                    continue
                if not (14 <= comp_w <= 60 and 14 <= comp_h <= 80):
                    continue
                if not (0.12 <= density <= 0.70):
                    continue
                if center_x < width * 0.62 or center_y < height * 0.55:
                    continue
 
                # A sparkle mark has a sparse center-heavy diamond shape.
                mid_x = (min_x + max_x) / 2
                mid_y = (min_y + max_y) / 2
                near_center = sum(
                    1 for x, y in zip(xs, ys)
                    if abs(x - mid_x) <= comp_w * 0.25 and abs(y - mid_y) <= comp_h * 0.25
                )
                if near_center / area >= 0.18:
                    return True
    except Exception:
        return False
 
    return False

审计点

  • 不是 SynthID 解码器,纯形态学判别:低饱和、中偏高亮度、右下象限、面积 80–900 px、长宽比约束、中心稠密
  • 长边缩放到 512 px以标准化判据
  • 仅在 Google 上下文下启用(见 §5.2.2 流程第 5 步),避免对所有图片跑一遍 O(W·H) 的扫描
  • 漏检场景:白底图、裁剪掉右下角、重压缩导致 sparkle 连通块破碎
  • 误报场景:右下角原本有低饱和装饰元素(月亮、星星、LOGO 等)

5.3 C2PA 官方 SDK 适配

tools/validator/c2pa_verifier.py 把可选的 c2pa-python 依赖隔离到单独模块。已安装时,它通过官方 SDK 读取 manifest 并要求 SDK 验证 manifest / 签名链。

5.3.1 C2PAVerificationResult

python
AI_DIGITAL_SOURCE_MARKERS = {
    "trainedalgorithmicmedia",
    "compositewithtrainedalgorithmicmedia",
    "algorithmicmedia",
    "generated by ai",
    "ai generated",
    "adobe firefly",
    "google imagen",
    "synthid",
    "dall-e",
    "dalle",
    "midjourney",
    "stable diffusion",
}
 
 
@dataclass
class C2PAVerificationResult:
    available: bool
    has_manifest: bool = False
    verified: bool | None = None
    trusted: bool | None = None
    validation_state: str = ""
    validation_results: dict[str, Any] | None = None
    manifest_store: dict[str, Any] | None = None
    active_manifest: dict[str, Any] | None = None
    sdk_version: str = ""
    embedded: bool | None = None
    remote_url: str | None = None
    ai_markers: list[str] = field(default_factory=list)
    claim_generator: str = ""
    error: str = ""
    asset_format: str = ""
    extension_mismatch: bool = False
    validation_issues: list[str] = field(default_factory=list)
 
    @property
    def ai_generated(self) -> bool:
        return bool(self.ai_markers)
 
    def evidence_summary(self) -> str:
        if not self.has_manifest:
            if self.available:
                parts = ["No C2PA manifest found"]
                if self.asset_format:
                    parts.append(f"asset_format={self.asset_format}")
                if self.extension_mismatch:
                    parts.append("extension_mismatch=true")
                return "; ".join(parts)
            return self.error or "C2PA verifier unavailable"
 
        parts = ["C2PA manifest found"]
        if self.validation_state:
            parts.append(f"validation_state={self.validation_state}")
        if self.verified is not None:
            parts.append(f"signature_chain={'verified' if self.verified else 'failed'}")
        if self.trusted is not None:
            parts.append(f"trust={'trusted' if self.trusted else 'untrusted'}")
        if self.claim_generator:
            parts.append(f"claim_generator={self.claim_generator}")
        if self.ai_markers:
            parts.append("ai_markers=" + ",".join(self.ai_markers[:4]))
        if self.validation_issues:
            parts.append("validation_issues=" + ",".join(self.validation_issues[:4]))
        if self.remote_url:
            parts.append(f"remote_manifest={self.remote_url}")
        if self.sdk_version:
            parts.append(f"c2pa_sdk={self.sdk_version}")
        return "; ".join(parts)

审计点

  • verified / trustedbool | None 三值:None = 无法判定,不是失败
  • ai_generated派生属性:凡是在 manifest store 任意深度字符串中命中 AI_DIGITAL_SOURCE_MARKERS 即为 True,不要求签名 / trust
  • evidence_summary() 是 UI 摘要,最多列 4 个 AI marker 和 4 个 validation issue

5.3.2 verify_c2pa_file 入口与 SDK 配置

python
def _make_context(c2pa_module: Any) -> Any:
    settings = {
        "verify": {
            "verify_after_reading": True,
            "verify_trust": True,
            "verify_timestamp_trust": True,
            "ocsp_fetch": True,
            "remote_manifest_fetch": True,
        },
        "trust": {
            "verify_trust_list": True,
        },
    }
    context_cls = getattr(c2pa_module, "Context", None)
    if context_cls and hasattr(context_cls, "from_dict"):
        return context_cls.from_dict(settings)
    if hasattr(c2pa_module, "load_settings"):
        c2pa_module.load_settings(settings)
    return None
 
 
def _detect_asset_format(path: Path) -> tuple[str, bool]:
    suffix = path.suffix.lower()
    try:
        header = path.read_bytes()[:16]
    except OSError:
        return "", False
 
    asset_format = ""
    if header.startswith(b"\x89PNG\r\n\x1a\n"):
        asset_format = "png"
    elif header.startswith(b"\xff\xd8\xff"):
        asset_format = "jpeg"
    elif header[:4] == b"RIFF" and header[8:12] == b"WEBP":
        asset_format = "webp"
 
    expected_suffixes = {
        "png": {".png"},
        "jpeg": {".jpg", ".jpeg", ".jpe"},
        "webp": {".webp"},
    }
    mismatch = bool(asset_format and suffix and suffix not in expected_suffixes.get(asset_format, set()))
    return asset_format, mismatch
 
 
def verify_c2pa_file(image_path: str | Path) -> C2PAVerificationResult:
    path = Path(image_path)
 
    try:
        import c2pa
    except Exception as exc:
        return C2PAVerificationResult(
            available=False,
            error=f"c2pa-python unavailable: {exc}",
        )
 
    result = C2PAVerificationResult(available=True)
    try:
        result.asset_format, result.extension_mismatch = _detect_asset_format(path)
        result.sdk_version = str(c2pa.sdk_version()) if hasattr(c2pa, "sdk_version") else ""
        context = _make_context(c2pa)
        if result.extension_mismatch and result.asset_format:
            with path.open("rb") as stream:
                try:
                    reader = c2pa.Reader.try_create(result.asset_format, stream, None, context)
                except TypeError:
                    reader = c2pa.Reader.try_create(result.asset_format, stream)
        else:
            try:
                reader = c2pa.Reader.try_create(str(path), None, None, context)
            except TypeError:
                reader = c2pa.Reader.try_create(str(path))
 
        if reader is None:
            return result
 
        with reader:
            result.has_manifest = True
            result.manifest_store = _read_json(reader)
            result.validation_state = str(reader.get_validation_state() or "")
            result.validation_results = reader.get_validation_results() or None
            result.active_manifest = reader.get_active_manifest() or None
            result.embedded = bool(reader.is_embedded())
            result.remote_url = reader.get_remote_url() or None
 
        result.verified, result.trusted = _verification_flags(
            result.validation_state,
            result.validation_results,
        )
        result.validation_issues = _collect_validation_issues(result.validation_results)
        result.ai_markers = _find_ai_markers(result.manifest_store or {})
        result.claim_generator = _extract_claim_generator(result.active_manifest)
        return result
 
    except Exception as exc:
        result.error = str(exc)
        return result

审计点

  • 扩展名错配时的 SDK 回退:如果文件头显示是 JPEG 但后缀是 .png,改用 Reader.try_create(format, stream, ...) 流式接口,告诉 SDK 真实格式。直接传路径会让 SDK 按后缀判断失败。
  • SDK 配置启用:trust list、timestamp trust、OCSP 吊销检查、remote manifest 远端拉取全部开启
  • SDK 版本兼容:两种 try_create 签名都尝试(带 context / 不带),兼容不同版本 c2pa-python
  • reader 为空不是错误has_manifest 保持 Falseavailable=True,错误字段为空

5.3.3 签名链与信任状态分离

python
def _verification_flags(validation_state: str, validation_results: dict[str, Any] | None) -> tuple[bool | None, bool | None]:
    state = (validation_state or "").lower()
    active_results = _active_manifest_results(validation_results) or validation_results
    has_failure = _contains_failure(active_results) if active_results else False
    has_non_trust_failure = _contains_non_trust_failure(active_results) if active_results else False
    has_trust_failure = _contains_trust_failure(active_results) if active_results else False
    has_trust_signal = _contains_trust_signal(active_results) if active_results else False
    active_signature_valid = _has_validation_code(active_results, "claimSignature.validated")
    active_data_hash_valid = _has_validation_code(active_results, "assertion.dataHash.match")
 
    verified: bool | None
    if active_signature_valid and active_data_hash_valid and not has_non_trust_failure:
        verified = True
    elif "valid" in state and "invalid" not in state and not has_non_trust_failure:
        verified = True
    elif "invalid" in state or (has_failure and has_non_trust_failure):
        verified = False
    else:
        verified = None
 
    trusted: bool | None
    if has_trust_failure:
        trusted = False
    elif verified is True and has_trust_signal:
        trusted = True
    else:
        trusted = None
 
    return verified, trusted

审计点

  • active manifest 优先_active_manifest_results() 先取 validation_results["activeManifest"],取不到才回退到全树
  • "签名有效 + 数据哈希有效""没有非 trust 类失败"verified=True。即使 validation_state 字段本身含 "invalid"(可能来自 ingredient chain 的问题),也不拉低 active manifest 的签名结论
  • trust 独立评估:trust 失败不会把 verified 打成 False。UI 可以得到 signature_chain=verified; trust=untrusted 这种组合,表示签名本身可验证,但签名证书不在当前信任列表(常见于 Google、OpenAI 尚未纳入默认 trust anchors)

5.3.4 AI marker 与 claim_generator 提取

python
def _iter_strings(value: Any) -> Iterable[str]:
    if isinstance(value, str):
        yield value
    elif isinstance(value, dict):
        for key, item in value.items():
            yield str(key)
            yield from _iter_strings(item)
    elif isinstance(value, list):
        for item in value:
            yield from _iter_strings(item)
 
 
def _find_ai_markers(manifest_store: dict[str, Any]) -> list[str]:
    found: list[str] = []
    seen = set()
    for text in _iter_strings(manifest_store):
        lowered = text.lower()
        for marker in AI_DIGITAL_SOURCE_MARKERS:
            if marker in lowered and marker not in seen:
                seen.add(marker)
                found.append(marker)
    return found
 
 
def _extract_claim_generator(active_manifest: dict[str, Any] | None) -> str:
    if not active_manifest:
        return ""
 
    claim_generator = active_manifest.get("claim_generator")
    if isinstance(claim_generator, str):
        return claim_generator
    if isinstance(claim_generator, dict):
        name = claim_generator.get("name") or claim_generator.get("identifier")
        version = claim_generator.get("version")
        if name and version:
            return f"{name} {version}"
        if name:
            return str(name)
 
    infos = active_manifest.get("claim_generator_info")
    if isinstance(infos, list) and infos:
        first = infos[0]
        if isinstance(first, dict):
            name = first.get("name") or first.get("identifier")
            version = first.get("version")
            if name and version:
                return f"{name} {version}"
            if name:
                return str(name)
 
    return ""

审计点

  • 递归遍历 manifest store 所有字符串(包括 key),大小写无关匹配
  • trainedalgorithmicmedia / algorithmicmedia / synthid 等 C2PA / IPTC 语义被视为强证据
  • 不解码不可见 SynthID 水印synthid 出现在 manifest store 中代表 C2PA 声明"图片含 SynthID",不代表本地完成了 SynthID 解码
  • claim_generator 兼容三种形态:字符串、dict({name, version} / {identifier, version})、claim_generator_info 数组

5.3.5 Validation issues

python
def _has_validation_code(value: Any, code_fragment: str) -> bool:
    if isinstance(value, dict):
        code = value.get("code")
        if isinstance(code, str) and code_fragment in code:
            return True
        return any(_has_validation_code(item, code_fragment) for item in value.values())
    if isinstance(value, list):
        return any(_has_validation_code(item, code_fragment) for item in value)
    return False
 
 
def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
    issues: list[str] = []
    if not validation_results:
        return issues
    if _has_validation_code(validation_results, "ingredient.malformed"):
        issues.append("ingredient_malformed")
    if _has_validation_code(validation_results, "timeStamp.untrusted"):
        issues.append("timestamp_untrusted")
    return issues

典型 evidence summary 输出:

text
C2PA manifest found;
validation_state=Invalid;
signature_chain=verified;
trust=untrusted;
ai_markers=algorithmicmedia,trainedalgorithmicmedia,synthid;
validation_issues=ingredient_malformed,timestamp_untrusted

审计点

  • validation_state=Invalid 不一定代表当前图片数据被篡改。active manifest 的签名和数据哈希可以有效,同时 ingredient chain 存在问题
  • UI 应展示为"签名链有效 / 证书未在当前信任列表 / 存在链路问题",不是"签名失败"

5.4 Worker 层

core/workers/ai_detector_worker.py

python
class AIDetectorWorker(QThread):
    """Worker thread for AI metadata detection."""
 
    progress = Signal(int, int, str)  # (current, total, filename)
    item_finished = Signal(str, str, str, str, str)  # (path, status, reason, tool, evidence)
    all_finished = Signal()
    model_status = Signal(str)
 
    def __init__(self, file_paths: list):
        super().__init__()
        self.file_paths = file_paths
 
    def run(self):
        """Execute metadata detection in background thread."""
        try:
            from .._utils import ensure_src_path
            ensure_src_path()
 
            from tools.validator.logic import MetaDataDetector
            detector = MetaDataDetector()
 
            self.model_status.emit(_tr("AIDetectorWorker", "扫描元数据..."))
 
            total = len(self.file_paths)
            logger.info("开始检测 %d 个文件", total)
 
            for i, path in enumerate(self.file_paths):
                if self.isInterruptionRequested():
                    break
 
                filename = Path(path).name
                self.progress.emit(i + 1, total, filename)
 
                try:
                    res = detector.detect(path)
                    self.item_finished.emit(
                        path,
                        res["status"],
                        res["reason"],
                        res["tool"] or "",
                        res["evidence"] or ""
                    )
                except Exception as e:
                    logger.error("[MetaDetector] 检测文件出错 %s: %s", path, e)
                    self.item_finished.emit(path, "error", _tr("AIDetectorWorker", "检测出错: %s") % str(e), "", "")
 
            logger.info("[MetaDetector] 检测完成,共 %d 个文件", total)
 
        except Exception as e:
            logger.error("[MetaDetector] Worker error: %s", e, exc_info=True)
        finally:
            self.all_finished.emit()

审计点

  • 逐文件错误隔离:单个文件的异常被捕获并转换为 status="error" 信号,不影响后续文件
  • 可中断isInterruptionRequested() 允许 UI 取消批量任务
  • Detector 实例复用:整批共享一个 MetaDataDetector,但该类自身无状态(__init__ 为空),无跨文件污染风险
  • 信号载荷item_finished 串一张图一发,避免积压

5.5 证据分级表

证据等级是否触发 ai
C2PA ai_markerstrainedAlgorithmicMedia / synthid / ...)
ComfyUI workflow / prompt JSON
Stable Diffusion parameters chunk
NovelAI 生成 JSON(uc + sampler/steps/scale
InvokeAI / Fooocus 元数据
Midjourney Job ID / --ar / --v / --stylize
IPTC/XMP trainedAlgorithmicMedia
Google 可见 sparkle 水印(Google context 下)
文件名含平台词否(最多 unknown
扩展名错配上下文
最小 JPEG 元数据上下文
CELSYS / Clip Studio 导出标记上下文
无任何元数据无证据

5.6 测试覆盖

tests/test_validator_c2pa.py 当前覆盖:

  • C2PA raw payload AI marker
  • C2PA manifest 无 AI marker 不误报
  • 官方 C2PA verifier mock 驱动检测
  • Google C2PA 无 AI marker 时压制 raw false positive
  • Google / Gemini 可见水印上下文检测
  • Midjourney Job ID 与 IPTC AI source
  • 文件名单独 Gemini 仅输出 unknown
  • 平台重编码 / .png 后缀 JPEG 不升级 AI
  • A1111 Stable Diffusion parameters
  • NovelAI generation JSON
  • InvokeAI metadata
  • Fooocus metadata
  • active C2PA signature valid but ingredient chain has issue 的分离解释

建议持续加入真实样本回归集:OpenAI、Gemini、Midjourney、ComfyUI、A1111、Forge、Fooocus、NovelAI、InvokeAI、Adobe Firefly、Tusi / Liblib / TensorArt 等平台下载图。


6. 威胁模型总览

6.1 数字存证

威胁缓解措施残余风险
用户篡改原始文件Merkle Tree 根哈希验证无(篡改必被发现)
TSA 私钥泄露多供应商故障转移单一 TSA 泄露不影响历史验证
本地 JSON 被修改可选 AES-256 加密未加密时可修改,但文件哈希验证仍会暴露
Merkle second-preimage无域分隔前缀不满足对抗性碰撞构造
作者身份伪造用户自声明author_name 无第三方验证

6.2 维权取证

威胁缓解措施残余风险
目标页面删除立即取证 + RFC 3161取证前已删除则无法补救
本地伪造网页TLS 证书抓取仅验证域名证书,不验证内容真实性
截图被 PSmanifest SHA-256截图本身无法证明"未经过 PS"
浏览器被识别为 botstealth + visible fallback部分平台仍可能拦截
HAR / 证书缺失多源采集单点失败不会导致整包失效
超长页面视口截图full_page=False,下方内容不会被 capture

6.3 隐水印

威胁缓解措施残余风险
密码硬编码编译期常量所有用户实例共享同一对密码,逆向获取后可批量提取
无认证水印双密码系统无法证明"这条水印是我嵌入的",只能证明"图片包含这段文本"
伪造水印密码保密知道密码后可将任意文本嵌入并声称来自 Nephele
去除水印量化嵌入(d1=36)重度压缩、旋转、大面积裁切(> 50%)可破坏
静默截断定长 32 字节编码超长文本被静默截断,用户可能误以为完整嵌入
验证失败仍输出round-trip 检查mismatch 时仅记录 warning,图片仍会输出

注意

隐水印在 Nephele 中的定位是辅助溯源工具,不是密码学意义上的数字签名。其核心价值在于"增加盗图者去除水印的成本",而非"提供不可伪造的权属证明"。如需法律级确权,请使用数字存证功能。

6.4 AI 元数据检测

威胁 / 场景结果
原始 ComfyUI PNG可检出 workflow / prompt
原始 A1111 PNG可检出 parameters
OpenAI / Google C2PA 图可读取 manifest、AI marker、签名链、trust 状态
Midjourney 保留 Job ID / XMP可检出强证据
平台重编码图(微博 / Twitter / 小红书下载)只能提示凭据不足,status=human(reason 区分)
截图原始 metadata 通常丢失,无法判定
恶意清理元数据无法恢复已删除证据
恶意伪造非 C2PA 文本元数据无密码学真实性保证,可能误报
仅视觉风格像 AI不判断
用户在注释里抄 SD 参数的非 AI 图可能误报为 ai
右下角有低饱和装饰物的非 AI 图(Google context 下)可能触发 sparkle 误报

注意

本功能的审计结论是:适合作为 AI 生成凭据与元数据筛查工具,不应被宣传为通用 AI 图像真伪检测器。"未检出"不等于"非 AI 生成"。


7. 依赖清单与降级行为

用途缺失时行为
rfc3161ngTSA 通信TSA 完全不可用,强制降级本地 .json
asn1cryptoTSR 解析回退到本地时钟和 provider_name
pyzipper.nep AES-256密码不生效,标准 ZIP
Pillow缩略图 / 图像 I/O / 元数据读取存证流程阻断 / 打包功能阻断 / AI 检测阻断
reportlabPDF 报告存证流程阻断
qrcodePDF 二维码回退纯文本 URL
playwright浏览器取证功能完全不可用
blind_watermark隐水印 DWT 嵌入 / 提取隐水印功能完全不可用,返回原图
numpy隐水印 bit 数组转换 / 图像处理隐水印功能完全不可用
pywt小波变换(blind_watermark 传递依赖)隐水印功能完全不可用
c2pa-python官方 C2PA SDK回退到字节级扫描(无签名验证),evidence 注明 "official C2PA verifier unavailable"

8. 隐私与网络行为

8.1 默认联网路径

功能网络动作是否可关闭
数字存证(TSA)向 DigiCert / FreeTSA / IdenTrust 发送 SHA-256 摘要请求可切回纯本地时间戳(降级)
维权取证通过 Playwright 向目标站点发起 HTTPS 请求、TLS 握手、DNS 解析否(功能本质即为联网取证)
AI 元数据检测(C2PA SDK)ocsp_fetch=True + remote_manifest_fetch=True当前通过 SDK settings 硬编码开启,无 UI 开关
隐水印

8.2 离线性说明

  • 数字存证:向 TSA 只发送哈希值,不发送原始文件内容

  • 维权取证:按设计发起对目标 URL 的完整请求,这就是取证本身。

  • AI 元数据检测:常规 PNG info / EXIF / 字节扫描在本地完成。 C2PA SDK 在验证签名链时可能:

    • 拉取远端 manifest(remote_manifest_fetch
    • 检查证书吊销(ocsp_fetch
    • 验证时间戳 trust(verify_timestamp_trust

    因此产品文案不应笼统宣称"C2PA 验证永远不联网"。如用户需要严格离线模式,应提供关闭 remote manifest / OCSP 的开关(尚未实现)。

  • 隐水印:完全本地,嵌入 / 提取均不触网。

Nephele

在工作室

...又是个不看说明书的。

文档我都整理好了,自己翻也行。但你要是懒得翻——就问吧,我听着呢。