Docs/Security

Technical Audit Document

This document is written for security researchers, cryptography auditors, and developers. It precisely describes the implementation details, known limitations, and threat model of the security-related modules in Nephele Workshop.

All core source code is included inline for white-box review. Sensitive constants (such as watermark passwords) have been redacted.

This document corresponds to the state of the Nephele Workshop v0.5.2-beta codebase.

提示

All source code snippets in this document are released under the MIT License. You are free to copy, modify, and use them commercially; simply retain the copyright notice.


1. Audit Scope

1.1 List of Files Under Audit

FeatureFileAudit Scope
Digital Timestampingtools/rights/logic.pyFile hashing, batch timestamping, deep verification
tools/rights/utils.pyMerkle Tree
tools/rights/tsa_client.pyRFC 3161 TSA client
tools/rights/rights_packer.py.nep container packaging
Infringement Evidence Capturetools/rights/url_evidence.pyURL evidence main flow, TLS, CAPTCHA
core/browser/session.pyPlaywright session / screenshot
Invisible Watermarktools/packer/watermark_protection.pyFixed-length encoding, round-trip verification, exception fallback
tools/packer/logic.py / agent_api.pyBusiness-layer invocation
core/workers/watermark_worker.pyBackground extraction thread
blind_watermark (PyPI)DWT+DCT+SVD underlying algorithm
AI Metadata Detectiontools/validator/logic.pyMetadata reading, rule matching, evidence grading
tools/validator/c2pa_verifier.pyOfficial C2PA SDK adapter, trust-state parsing
core/workers/ai_detector_worker.pyBatch detection thread

1.2 Product Boundary

This audit does not cover:

  • Payment / licensing modules (core/license_manager.py, core/payment.py)
  • Authentication / JWT / CAPTCHA integration (core/auth/)
  • The AI conversation Agent and cloud inference (core/agent_loop.py, nephele-api/)
  • Client updates and SSL pinning (core/updater.py, core/ssl_pinning.py)

Each of these modules has its own independent security boundary and threat model, which fall outside the scope of this document.


2. Digital Timestamping Core Implementation

2.1 File Hash Computation

rights/logic.py · L352-L395
GitHub
def calculate_file_hash(file_path: Path, algorithm: str = 'sha256') -> str:
    """
    计算文件的哈希值(极简版,仅用于非确权场景)
    
    Args:
        file_path: 文件路径
        algorithm: 哈希算法,默认 'sha256'
    
    Returns:
        文件的十六进制哈希值
    
    Raises:
        RightsError: 文件不存在或读取失败
    """
    if not file_path or not isinstance(file_path, Path):
        raise RightsError(f"无效的文件路径: {file_path}")
    
    if not file_path.exists():
        raise RightsError(f"文件不存在: {file_path}")
    
    if not file_path.is_file():
        raise RightsError(f"路径不是文件: {file_path}")
    
    try:
        file_size = file_path.stat().st_size
        if file_size > 10 * 1024 * 1024 * 1024:  # 10GB 限制
            raise RightsError(f"文件过大(超过10GB): {file_path}")
        
        hash_obj = hashlib.new(algorithm)
        with open(file_path, 'rb') as f:
            chunk_size = 8192
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                hash_obj.update(chunk)
        
        return hash_obj.hexdigest()
    except PermissionError:
        raise RightsError(f"没有权限读取文件: {file_path}")
    except OSError as e:
        raise RightsError(f"读取文件失败: {file_path}, 错误: {str(e)}")
    except Exception as e:
        raise RightsError(f"计算文件哈希失败: {file_path}, 错误: {str(e)}")
tools/rights/logic.py:calculate_file_hash()

Audit points:

  • Algorithm: SHA-256, no salt, no key (not HMAC)
  • Chunking: 8,192 bytes, streamed
  • Upper limit: 10 GB, rejected if exceeded

2.2 Full Merkle Tree Implementation

rights/utils.py · L13-L185
GitHub
class MerkleTree:
    """
    Merkle Tree 实现,用于将多个文件的哈希值聚合成单个根哈希

    优势:
    - 支持 100+ 文件批量处理
    - 单个根哈希可代表整个批次
    - 节省 TSA 调用成本(1 次调用 vs N 次调用)

    Known limitation (second-preimage resistance):
        This implementation does NOT use domain separation prefixes for leaf vs
        internal nodes (i.e. b'\\x00' for leaves, b'\\x01' for internal nodes as
        recommended by RFC 6962 §2.1).  Adding prefixes would change the root hash
        computation and break backward compatibility with all existing .nep files
        and the verification website (verify.arisfusion.com).  A future tree_version
        bump can introduce domain separation; the current version is safe for our
        threat model (user-submitted files, not adversarial tree construction).
    """
    
    def __init__(self, hash_algorithm: str = 'sha256'):
        """
        初始化 Merkle Tree
        
        Args:
            hash_algorithm: 哈希算法,默认 'sha256'
        """
        self.hash_algorithm = hash_algorithm
        self.leaves: List[str] = []
        self.tree: List[List[str]] = []
        self.root_hash: Optional[str] = None
    
    def add_leaf(self, data: bytes) -> str:
        """
        添加叶子节点(文件哈希)
        
        Args:
            data: 文件数据或哈希值(bytes)
        
        Returns:
            叶子节点的哈希值
        """
        hash_obj = hashlib.new(self.hash_algorithm)
        hash_obj.update(data)
        leaf_hash = hash_obj.hexdigest()
        self.leaves.append(leaf_hash)
        return leaf_hash
    
    def add_file_hash(self, file_hash: str) -> None:
        """
        直接添加文件哈希值(已计算好的)
        
        Args:
            file_hash: 文件的十六进制哈希值
        """
        self.leaves.append(file_hash)
    
    def build(self) -> str:
        """
        构建 Merkle Tree 并返回根哈希
        
        Returns:
            根哈希值(十六进制字符串)
        """
        if not self.leaves:
            raise ValueError("Merkle Tree 没有叶子节点")
        
        # 如果只有一个叶子节点,直接返回
        if len(self.leaves) == 1:
            self.root_hash = self.leaves[0]
            return self.root_hash
        
        # 构建树:从叶子节点开始,逐层向上
        current_level = self.leaves.copy()
        self.tree = [current_level]
        
        while len(current_level) > 1:
            next_level = []
            
            # 成对处理节点
            for i in range(0, len(current_level), 2):
                if i + 1 < len(current_level):
                    # 两个节点:合并哈希
                    combined = current_level[i] + current_level[i + 1]
                else:
                    # 奇数个节点:最后一个节点复制后与自己合并
                    combined = current_level[i] + current_level[i]
                
                # 计算父节点哈希
                hash_obj = hashlib.new(self.hash_algorithm)
                hash_obj.update(combined.encode('utf-8'))
                parent_hash = hash_obj.hexdigest()
                next_level.append(parent_hash)
            
            self.tree.append(next_level)
            current_level = next_level
        
        # 根哈希是最后一层的唯一节点
        self.root_hash = current_level[0]
        return self.root_hash
    
    def get_proof(self, leaf_index: int) -> List[Dict]:
        """
        获取指定叶子节点的 Merkle Proof(用于验证)

        Args:
            leaf_index: 叶子节点索引

        Returns:
            Merkle Proof 路径,每个元素为 {'hash': str, 'position': 'left'|'right'}
            position 表示兄弟节点在合并时的位置
        """
        if not self.tree:
            self.build()

        if leaf_index >= len(self.leaves):
            raise IndexError(f"叶子节点索引超出范围: {leaf_index}")

        proof = []
        current_index = leaf_index
        current_level = 0

        while current_level < len(self.tree) - 1:
            level = self.tree[current_level]

            # 找到兄弟节点并记录位置
            if current_index % 2 == 0:
                # 当前是左节点,兄弟在右侧
                sibling_index = current_index + 1
                if sibling_index < len(level):
                    proof.append({'hash': level[sibling_index], 'position': 'right'})
                else:
                    # 奇数情况,兄弟是自己(已复制)
                    proof.append({'hash': level[current_index], 'position': 'right'})
            else:
                # 当前是右节点,兄弟在左侧
                sibling_index = current_index - 1
                proof.append({'hash': level[sibling_index], 'position': 'left'})

            # 移动到上一层
            current_index = current_index // 2
            current_level += 1

        return proof

    def verify_proof(self, leaf_hash: str, proof: List[Dict], root_hash: str) -> bool:
        """
        验证 Merkle Proof

        Args:
            leaf_hash: 叶子节点哈希
            proof: Merkle Proof 路径(由 get_proof 返回)
            root_hash: 根哈希

        Returns:
            验证是否通过
        """
        current_hash = leaf_hash

        for step in proof:
            sibling_hash = step['hash']
            position = step['position']

            # 按照 build() 相同的位置顺序合并:左 + 右
            if position == 'right':
                combined = current_hash + sibling_hash
            else:
                combined = sibling_hash + current_hash

            hash_obj = hashlib.new(self.hash_algorithm)
            hash_obj.update(combined.encode('utf-8'))
            current_hash = hash_obj.hexdigest()

        return current_hash == root_hash
tools/rights/utils.py:MerkleTree

Known security limitation (disclosed proactively):

The current implementation does not use the domain-separation prefixes recommended by RFC 6962 §2.1 (leaf nodes are not prefixed with \x00, internal nodes are not prefixed with \x01). This means that in an extreme adversarial scenario, a theoretical possibility exists for constructing a second-preimage.

Practical risk assessment:

  • When the threat model is "a user generating a timestamp for their own work", the risk is negligible.
  • If the threat model requires "resistance to maliciously constructed collisions", the current implementation does not meet that security level.

2.3 Full TSA Client Implementation

rights/tsa_client.py · L29-L514
GitHub
class TSAClient:
    """
    RFC 3161 时间戳服务客户端

    支持的服务:
    - FreeTSA (https://freetsa.org/tsr) - 免费,无需注册
    - DigiCert (http://timestamp.digicert.com)
    - IdenTrust (http://timestamp.identrust.com)
    - 其他 RFC 3161 兼容服务
    """

    # 预定义的 TSA 服务提供商
    PROVIDERS = {
        'freetsa': {
            'name': 'FreeTSA',
            'url': 'https://freetsa.org/tsr',
            'hashname': 'sha256',
            'description': '免费时间戳服务,国际标准 RFC 3161',
            'requires_auth': False,
            'legal_strength': 3,  # 1-5 评分
            'price': 0
        },
        'digicert': {
            'name': 'DigiCert',
            'url': 'http://timestamp.digicert.com',
            'hashname': 'sha256',
            'description': 'DigiCert 免费时间戳服务',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        },
        'identrust': {
            'name': 'IdenTrust',
            'url': 'http://timestamp.identrust.com',
            'hashname': 'sha256',
            'description': 'IdenTrust 免费时间戳服务',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        }
    }

    def __init__(
        self,
        provider: str = 'freetsa',
        custom_url: Optional[str] = None,
        hashname: str = 'sha256',
        timeout: int = 30
    ):
        """
        初始化 TSA 客户端

        Args:
            provider: 预定义的服务提供商名称 ('freetsa', 'digicert', 'identrust')
            custom_url: 自定义 TSA URL(如果指定,则忽略 provider)
            hashname: 哈希算法 ('sha256', 'sha512' 等)
            timeout: 请求超时时间(秒)
        """
        if not RFC3161_AVAILABLE:
            raise ImportError(
                "rfc3161ng 库未安装。请运行: pip install rfc3161ng"
            )

        if custom_url:
            self.url = custom_url
            self.provider_name = "Custom TSA"
            self.provider_key = None
        elif provider in self.PROVIDERS:
            config = self.PROVIDERS[provider]
            self.url = config['url']
            self.provider_name = config['name']
            self.provider_key = provider
            hashname = config['hashname']
        else:
            raise ValueError(
                f"未知的 TSA 提供商: {provider}。"
                f"支持的提供商: {', '.join(self.PROVIDERS.keys())}"
            )

        self.hashname = hashname
        self.timeout = timeout

        # 初始化 rfc3161ng 时间戳器
        # 注意:某些环境可能遇到 SSL 握手问题,这是正常的
        # 我们的设计会自动降级到本地哈希
        try:
            self.stamper = rfc3161ng.RemoteTimestamper(
                url=self.url,
                hashname=self.hashname,
                timeout=self.timeout
            )
        except Exception as e:
            # 如果初始化失败,记录错误但不抛出异常
            # 后续调用时会返回失败状态
            self.stamper = None
            self._init_error = str(e)

    FAILOVER_ORDER: List[str] = ['digicert', 'freetsa', 'identrust']

    def _call_with_retry(self, hash_bytes: bytes, max_retries: int = 3) -> bytes:
        """
        带指数退避和提供商故障转移的 TSA 调用

        Args:
            hash_bytes: 要签名的哈希值(二进制)
            max_retries: 每个提供商的最大重试次数

        Returns:
            TSR 令牌(二进制)

        Raises:
            Exception: 所有提供商均失败
        """
        # 构建尝试顺序:当前提供商优先,然后是其他提供商
        providers_to_try = []
        if self.provider_key:
            providers_to_try.append(self.provider_key)
            for p in self.FAILOVER_ORDER:
                if p != self.provider_key:
                    providers_to_try.append(p)
        else:
            # 自定义 URL,无故障转移
            providers_to_try = [None]

        last_error = None
        for provider_key in providers_to_try:
            if provider_key is not None:
                config = self.PROVIDERS[provider_key]
                url = config['url']
                hashname = config['hashname']
                provider_name = config['name']
            else:
                url = self.url
                hashname = self.hashname
                provider_name = self.provider_name

            for attempt in range(max_retries):
                try:
                    stamper = rfc3161ng.RemoteTimestamper(
                        url=url,
                        hashname=hashname,
                        timeout=self.timeout
                    )
                    tsr_token = stamper(digest=hash_bytes)
                    # 成功后更新当前提供商信息
                    if provider_key is not None:
                        self.provider_name = provider_name
                        self.provider_key = provider_key
                        self.url = url
                        self.stamper = stamper
                    return tsr_token
                except Exception as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        time.sleep(2 ** attempt)

        raise Exception(
            f"所有 TSA 提供商均失败 (尝试: {', '.join(p or 'custom' for p in providers_to_try)}): {last_error}"
        )

    def timestamp_data(self, data: bytes) -> bytes:
        """
        对原始数据生成时间戳

        Args:
            data: 要加时间戳的数据(二进制)

        Returns:
            TSR (Time-Stamp Response) 二进制令牌

        Raises:
            Exception: 时间戳请求失败
        """
        try:
            # 计算数据的哈希值
            hash_obj = hashlib.new(self.hashname)
            hash_obj.update(data)
            data_hash = hash_obj.digest()

            # 调用 TSA 服务获取时间戳(带重试和故障转移)
            tsr_token = self._call_with_retry(data_hash)

            return tsr_token
        except Exception as e:
            raise Exception(f"时间戳请求失败: {str(e)}")

    def timestamp_file(self, file_path: Path, output_path: Optional[Path] = None) -> Dict:
        """
        为文件生成时间戳(流式处理,不将整个文件载入内存)

        Args:
            file_path: 要加时间戳的文件路径
            output_path: TSR 文件输出路径(可选,默认为 file_path.tsr)

        Returns:
            包含时间戳信息的字典
        """
        file_path = Path(file_path)

        if not file_path.exists():
            return {
                'success': False,
                'message': f"文件不存在: {file_path}"
            }

        try:
            # 流式计算文件哈希(8KB 分块,避免大文件内存溢出)
            hash_obj = hashlib.new(self.hashname)
            with open(file_path, 'rb') as f:
                while True:
                    chunk = f.read(8192)
                    if not chunk:
                        break
                    hash_obj.update(chunk)

            file_hash = hash_obj.hexdigest()

            # 确定输出路径
            if output_path is None:
                output_path = file_path.parent / f"{file_path.stem}.tsr"
            else:
                output_path = Path(output_path)

            # 委托给 timestamp_hash(处理 TSA 调用、重试和文件写入)
            result = self.timestamp_hash(file_hash, output_path)

            # 确保返回文件哈希
            if result.get('success'):
                result['hash'] = file_hash

            return result

        except Exception as e:
            return {
                'success': False,
                'message': f"时间戳生成失败: {str(e)}"
            }

    def timestamp_hash(self, hash_value: str, output_path: Path) -> Dict:
        """
        为已知的哈希值生成时间戳(用于 Merkle Root 等场景)

        Args:
            hash_value: 十六进制哈希值字符串
            output_path: TSR 文件输出路径

        Returns:
            包含时间戳信息的字典
        """
        try:
            # 将十六进制哈希值转换为二进制
            hash_bytes = bytes.fromhex(hash_value)

            # 获取时间戳令牌(带重试和故障转移)
            tsr_token = self._call_with_retry(hash_bytes)

            # 保存 TSR 文件
            output_path = Path(output_path)
            output_path.parent.mkdir(parents=True, exist_ok=True)
            with open(output_path, 'wb') as f:
                f.write(tsr_token)

            # 从 TSR 令牌中提取 TSA 认证时间(而非本机时钟)
            tsa_timestamp = datetime.now().isoformat()  # fallback
            tsa_issuer = self.provider_name
            try:
                from asn1crypto import tsp, cms
                # Try TimeStampResp first, then ContentInfo
                signed_data = None
                try:
                    ts_resp = tsp.TimeStampResp.load(tsr_token)
                    signed_data = ts_resp['time_stamp_token']['content']
                except (ValueError, KeyError, TypeError):
                    try:
                        ci = cms.ContentInfo.load(tsr_token)
                        if ci['content_type'].native == 'signed_data':
                            signed_data = ci['content']
                    except (ValueError, KeyError, TypeError):
                        pass

                if signed_data:
                    tst_info = signed_data['encap_content_info']['content'].parsed
                    gen_time = tst_info['gen_time'].native
                    if gen_time:
                        tsa_timestamp = gen_time.isoformat()

                    # Extract issuer CN from signer info
                    try:
                        signer_infos = signed_data['signer_infos']
                        if signer_infos:
                            sid = signer_infos[0]['sid']
                            if sid.name == 'issuer_and_serial_number':
                                for rdn in sid.chosen['issuer'].chosen:
                                    for attr in rdn:
                                        if attr['type'].dotted == '2.5.4.3':
                                            tsa_issuer = attr['value'].native
                                            break
                    except (KeyError, IndexError, ValueError):
                        pass
            except ImportError:
                pass  # asn1crypto not available, use fallback values

            return {
                'success': True,
                'timestamp': tsa_timestamp,
                'hash': hash_value,
                'issuer': tsa_issuer,
                'tsr_path': str(output_path),
                'algorithm': self.hashname.upper(),
                'message': f"时间戳生成成功,TSR 文件: {output_path.name}"
            }

        except Exception as e:
            return {
                'success': False,
                'message': f"时间戳生成失败: {str(e)}"
            }

    def verify_tsr(
        self,
        tsr_path: Path,
        data: Optional[bytes] = None,
        digest: Optional[bytes] = None,
    ) -> Dict:
        """
        验证 TSR 时间戳令牌

        Args:
            tsr_path: TSR 文件路径
            data: 原始数据(可选,库内部计算哈希后比对)
            digest: 预计算的哈希值(可选,直接与 TSR 中记录的哈希比对)
                    data 和 digest 二选一;都不传则仅验证 TSR 结构

        Returns:
            验证结果字典,包含 'valid', 'message', 'issuer' 等字段。
            当 data/digest 未提供时,结果包含 'partial_verification': True 标志。
        """
        tsr_path = Path(tsr_path)

        if not tsr_path.exists():
            return {
                'valid': False,
                'message': f"TSR 文件不存在: {tsr_path}"
            }

        try:
            with open(tsr_path, 'rb') as f:
                tsr_token = f.read()

            if digest is not None:
                # 直接用预计算的哈希比对(用于 Merkle Root 等场景)
                verified = rfc3161ng.check_timestamp(tsr_token, digest=digest)
                if verified:
                    return {
                        'valid': True,
                        'message': '时间戳验证通过(数据完整性已确认)',
                        'issuer': self._extract_issuer_from_tsr(tsr_token),
                    }
                else:
                    return {
                        'valid': False,
                        'message': '时间戳验证失败:哈希值不匹配'
                    }
            elif data is not None:
                # 传入原始数据,由库内部计算哈希后比对
                verified = rfc3161ng.check_timestamp(tsr_token, data=data)
                if verified:
                    return {
                        'valid': True,
                        'message': '时间戳验证通过(数据完整性已确认)',
                        'issuer': self._extract_issuer_from_tsr(tsr_token),
                    }
                else:
                    return {
                        'valid': False,
                        'message': '时间戳验证失败:哈希值不匹配'
                    }
            else:
                # 结构验证:无原始数据时,验证 TSR 文件结构有效性
                return self._verify_tsr_structure(tsr_token)

        except Exception as e:
            return {
                'valid': False,
                'message': f'时间戳验证失败: {str(e)}'
            }

    def _extract_issuer_from_tsr(self, tsr_token: bytes) -> str:
        """Extract the actual issuer CN from a TSR token using asn1crypto.

        Falls back to self.provider_name if parsing fails or asn1crypto
        is unavailable.  This ensures verify_tsr() reports the real issuer
        even after failover to a different TSA provider.
        """
        try:
            from asn1crypto import tsp, cms

            signed_data = None
            try:
                ts_resp = tsp.TimeStampResp.load(tsr_token)
                signed_data = ts_resp['time_stamp_token']['content']
            except (ValueError, KeyError, TypeError):
                try:
                    ci = cms.ContentInfo.load(tsr_token)
                    if ci['content_type'].native == 'signed_data':
                        signed_data = ci['content']
                except (ValueError, KeyError, TypeError):
                    pass

            if signed_data:
                signer_infos = signed_data['signer_infos']
                if signer_infos:
                    sid = signer_infos[0]['sid']
                    if sid.name == 'issuer_and_serial_number':
                        for rdn in sid.chosen['issuer'].chosen:
                            for attr in rdn:
                                if attr['type'].dotted == '2.5.4.3':  # CN
                                    return attr['value'].native
        except (ImportError, Exception):
            pass

        return self.provider_name

    def _verify_tsr_structure(self, tsr_token: bytes) -> Dict:
        """验证 TSR 令牌的 ASN.1 结构(不验证数据完整性)"""
        if len(tsr_token) < 20:
            return {'valid': False, 'message': 'TSR 文件过小,可能已损坏'}

        if tsr_token[0] != 0x30:
            return {'valid': False, 'message': 'TSR 文件不是有效的 ASN.1 DER 格式'}

        # 尝试 asn1crypto 深度结构验证
        try:
            from asn1crypto import tsp, cms

            # 格式 1: TimeStampResp
            try:
                ts_resp = tsp.TimeStampResp.load(tsr_token)
                status = ts_resp['status']['status'].native
                if status not in ('granted', 'granted_with_mods'):
                    return {
                        'valid': False,
                        'message': f'TSR 状态异常: {status}'
                    }
                return {
                    'valid': True,
                    'message': '时间戳结构验证通过(未验证数据完整性)',
                    'issuer': self.provider_name,
                    'partial_verification': True
                }
            except (ValueError, KeyError, TypeError):
                pass

            # 格式 2: ContentInfo/SignedData (DigiCert 等)
            try:
                content_info = cms.ContentInfo.load(tsr_token)
                if content_info['content_type'].native == 'signed_data':
                    return {
                        'valid': True,
                        'message': '时间戳结构验证通过(未验证数据完整性)',
                        'issuer': self.provider_name,
                        'partial_verification': True
                    }
            except (ValueError, KeyError, TypeError):
                pass

            return {'valid': False, 'message': '无法解析 TSR 结构'}

        except ImportError:
            # asn1crypto 不可用,基础结构检查已通过(size + 0x30 tag)
            return {
                'valid': True,
                'message': '时间戳基础结构检查通过(安装 asn1crypto 可获得深度验证)',
                'issuer': self.provider_name,
                'partial_verification': True
            }

    @classmethod
    def get_provider_info(cls, provider: str) -> Optional[Dict]:
        """获取预定义服务提供商的信息"""
        return cls.PROVIDERS.get(provider)

    @classmethod
    def list_providers(cls) -> Dict[str, Dict]:
        """列出所有预定义的服务提供商"""
        return cls.PROVIDERS.copy()
tools/rights/tsa_client.py:TSAClient

Audit highlights:

  • Default constructor: provider='freetsa'
  • The UI calls batch_protect_works(tsa_provider='digicert'), so the user's actual default is DigiCert
  • Failover order: ['digicert', 'freetsa', 'identrust']
  • Exponential backoff: sleep(2 ** attempt), i.e. 1s, 2s, 4s
  • Up to 3 retries per provider
  • The initial fallback value of tsa_timestamp in timestamp_hash is datetime.now().isoformat() (local clock); it is only replaced with the TSA-asserted time after asn1crypto parses it successfully

2.4 Batch Timestamping Main Flow

rights/logic.py · L660-L810
GitHub
def batch_protect_works(
    file_paths: List[Path],
    author_name: str,
    inspiration: Optional[str] = None,
    output_dir: Optional[Path] = None,
    password: Optional[str] = None,
    progress_callback=None,
    tsa_provider: str = 'digicert',
    tsa_timeout: int = 30,
    cert_mode: str = 'simple',
) -> Dict:
    """
    批量保护作品(数字存证核心流程)
    
    流程:
    1. 计算所有文件哈希
    2. 构建 Merkle Tree
    3. 生成 manifest.json
    4. 生成缩略图拼贴
    5. 调用 TSA 获取时间戳(使用根哈希)
    6. 生成 PDF 报告
    7. 打包为 .nep 文件
    
    Args:
        file_paths: 文件路径列表
        author_name: 作者名称
        inspiration: 创作灵感(可选)
        output_dir: 输出目录
        password: .nep 文件密码(可选)
        progress_callback: 进度回调 (current, total, message)
    
    Returns:
        包含处理结果的字典
    """
    from .utils import build_merkle_tree_from_files
    from .rights_packer import RightsPacker
    from .pdf_generator import PDFGenerator
    
    if not file_paths:
        raise RightsError("文件列表为空")
    
    if output_dir is None:
        output_dir = Path.cwd() / "digital_evidence"
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        # 步骤 1: 计算文件哈希并构建 Merkle Tree
        if progress_callback:
            progress_callback(0, len(file_paths), "正在计算文件哈希...")
        
        def hash_progress(current, total):
            if progress_callback:
                progress_callback(current, total, f"计算哈希: {current}/{total}")
        
        tree = build_merkle_tree_from_files(file_paths, progress_callback=hash_progress)
        root_hash = tree.root_hash

        image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.bmp'}

        # 步骤 2: 收集文件哈希和作品信息 + 计算感知哈希
        # Reuse hashes already computed by Merkle Tree (avoid double I/O)
        file_hashes = dict(tree.file_hashes)
        works = []
        fingerprints = []  # Perceptual hashes for image files

        for file_path in file_paths:
            file_hash = file_hashes.get(str(file_path), calculate_file_hash(file_path))

            # 获取文件创建时间
            try:
                creation_time = datetime.fromtimestamp(file_path.stat().st_mtime)
            except (OSError, ValueError, OverflowError):
                creation_time = datetime.now()

            works.append({
                'title': file_path.stem,
                'creation_date': creation_time.isoformat(),
                'file_path': str(file_path),
                'file_hash': file_hash
            })

            # Compute perceptual hash for image files (non-blocking, best-effort)
            if file_path.suffix.lower() in image_extensions:
                try:
                    from .fingerprint import compute_fingerprint
                    fp = compute_fingerprint(file_path, file_sha256=file_hash)
                    fingerprints.append(fp)
                except Exception as e:
                    import logging as _logging
                    _logging.getLogger(__name__).debug(
                        "Skipping perceptual hash for %s: %s", file_path.name, e
                    )
        
        # 步骤 3: 生成 manifest.json
        if progress_callback:
            progress_callback(len(file_paths), len(file_paths), "正在生成清单...")
        
        packer = RightsPacker(output_dir / "evidence.nep", password=password)
        manifest_data = packer.create_manifest(
            author_name=author_name,
            inspiration=inspiration,
            works=works,
            file_hashes=file_hashes
        )
        
        # 步骤 4: 生成缩略图(仅图片文件)
        image_paths = [p for p in file_paths if p.suffix.lower() in image_extensions]
        
        thumbnail_path = None
        if image_paths:
            if progress_callback:
                progress_callback(len(file_paths), len(file_paths), "正在生成缩略图...")
            
            thumbnail_path = output_dir / "thumbnail.jpg"
            packer.generate_thumbnail(image_paths, thumbnail_path)
        
        # 步骤 5: 调用 DigiCert 获取真实 RFC 3161 时间戳
        if progress_callback:
            progress_callback(len(file_paths), len(file_paths), "正在获取 DigiCert 时间戳...")

        tsa_binary_path = output_dir / "proof.tsa"    # RFC 3161 二进制
        local_json_path = output_dir / "proof.json"   # 本地降级 JSON
        timestamp_file = tsa_binary_path              # 最终实际写入的文件

        try:
            from .tsa_client import TSAClient

            # 初始化 TSA 客户端(可配置提供商)
            tsa_client = TSAClient(provider=tsa_provider, timeout=tsa_timeout)

            # 用 Merkle Root Hash 获取时间戳
            tsa_result = tsa_client.timestamp_hash(root_hash, tsa_binary_path)

            if tsa_result['success']:
                timestamp_file = tsa_binary_path
                timestamp_info = {
                    'timestamp': tsa_result['timestamp'],
                    'hash': root_hash,
                    'issuer': tsa_result['issuer'],
                    'algorithm': tsa_result['algorithm'],
                    'valid': True,
                    'tsr_path': str(tsa_binary_path)
                }
            else:
                # TSA 失败,使用本地时间戳(写 .json,不混淆 .tsa)
                timestamp_file = local_json_path
                timestamp_info = {
                    'timestamp': datetime.now().isoformat(),
                    'hash': root_hash,
                    'issuer': 'Nephele Workshop (本地)',
tools/rights/logic.py:batch_protect_works() (core snippet)

Key behavior:

  • cert_mode defaults to "simple" and does not automatically detect source-file upgrades
  • The UI layer (PipelineWindow.qml) explicitly passes "full" based on certifySourceFiles.length > 0
  • rfc3161ng not installed → forced downgrade to a local timestamp, without blocking the flow
  • When computing manifest_sha256, its own key is excluded to prevent a circular dependency

2.5 .nep Packaging Implementation

rights/rights_packer.py · L196-L232
GitHub
    def _write_zip_contents(zipf, manifest_data, thumbnail_path, timestamp_file,
                            pdf_report, additional_files, source_files):
        """将内容写入 ZIP 文件(供 pack() 内部使用)"""
        # 1. 添加 manifest.json
        manifest_json = json.dumps(manifest_data, indent=2, ensure_ascii=False)
        zipf.writestr('manifest.json', manifest_json.encode('utf-8'))

        # 2. 添加缩略图
        if thumbnail_path and thumbnail_path.exists():
            zipf.write(thumbnail_path, 'thumbnail.jpg')

        # 3. 添加时间戳文件(保持原始扩展名:.tsa=RFC 3161 二进制,.json=本地降级)
        if timestamp_file and timestamp_file.exists():
            archive_name = 'proof.tsa' if timestamp_file.suffix == '.tsa' else 'proof.json'
            zipf.write(timestamp_file, archive_name)

        # 4. 添加 PDF 报告
        if pdf_report and pdf_report.exists():
            zipf.write(pdf_report, 'VerificationReport.pdf')

        # 5. 原始作品文件 → works/ 目录
        # Use indexed filenames (e.g. works/000_photo.jpg) to avoid collisions
        # when multiple source files share the same basename.  Must match the
        # indexed keys in manifest_data['works_map'].
        if source_files:
            for idx, file_path in enumerate(source_files):
                if isinstance(file_path, str):
                    file_path = Path(file_path)
                if file_path.exists() and file_path.is_file():
                    zipf.write(file_path, f'works/{idx:03d}_{file_path.name}')

        # 6. 添加额外文件
        if additional_files:
            for file_path in additional_files:
                if file_path.exists():
                    zipf.write(file_path, f'additional/{file_path.name}')
tools/rights/rights_packer.py:RightsPacker._write_zip_contents()

Password-protection logic (the pack() method):

  • If pyzipper is available and a password is set: use AESZipFile + WZ_AES (AES-256)
  • If pyzipper is unavailable but a password is set: fall back to standard ZIP and write a _warning into the manifest
  • No password: standard zipfile.ZipFile

2.6 Standalone Verifier (verify.arisfusion.com)

The .nep standalone verifier is publicly deployed at verify.arisfusion.com.

Deployment form:

  • A single HTML file (about 2,200 lines), with no build tools, no bundling pipeline, and no external dependencies
  • Pure client-side computation (SubtleCrypto + a pure-JS ASN.1 parser)
  • "View Page Source" in the browser is enough to audit the complete logic

Verification chain:

  1. The user uploads a .nep → the browser unpacks it locally
  2. SHA-256 is recomputed over works/ → a Merkle Tree is built in lexicographic order of file names
  3. proof.tsa (the RFC 3161 TSR ASN.1 structure) is parsed → messageImprint.hashedMessage is extracted
  4. The local Merkle Root is compared against the digest embedded in the TSR
  5. The genTime and tsa fields in the TSR are parsed to display the issuing authority and the time

Trust boundary:

  • The verifier itself does not issue any timestamp; it only reads the proof.tsa already present inside the .nep
  • In the current version, cryptographic verification of the TSA signature (certificate chain + public-key chain) is a structural comparison + TSA public-key fingerprint match; full CA-chain verification is recommended via openssl ts -verify or rfc3161ng for cross-confirmation
  • The source code is MIT-licensed, so anyone can stand up their own mirror or use it offline (simply save the HTML file)

2.7 Deep Verification Implementation

rights/logic.py · L553-L660
GitHub
def verify_evidence_package(
    tsa_path: Path,
    file_paths: List[Path],
) -> Tuple[bool, Dict]:
    """
    深度验证存证包:重算文件哈希 → 重建 Merkle Tree → 与 TSR 中签名的哈希比对。

    验证链路:
      文件列表 → SHA-256 → Merkle Tree → Root Hash → 与 TSR 内 messageImprint 比对

    Args:
        tsa_path: .tsa/.tsr 时间戳文件路径
        file_paths: 被存证的原始文件路径列表(顺序必须与存证时一致)

    Returns:
        (是否通过, 验证结果字典)
    """
    from .utils import build_merkle_tree_from_files

    if not tsa_path.exists():
        return False, {'valid': False, 'message': f"时间戳文件不存在: {tsa_path}"}

    missing = [str(p) for p in file_paths if not p.exists()]
    if missing:
        return False, {'valid': False, 'message': f"原始文件缺失: {', '.join(missing)}"}

    try:
        # Step 1: 重算文件哈希,重建 Merkle Tree
        tree = build_merkle_tree_from_files(file_paths)
        computed_root = tree.root_hash

        # Step 2: 检查是否为本地时间戳(无第三方签名,不具备证明力)
        suffix = tsa_path.suffix.lower()
        if suffix == '.json':
            tsr_result = parse_tsr(tsa_path)
            tsr_hash = tsr_result.get('hash', tsr_result.get('work_identity', ''))
            if computed_root.lower() == tsr_hash.lower():
                return False, {
                    'valid': False,
                    'message': "本地时间戳无第三方签名,不具备密码学证明力。文件哈希一致但无法证明时间。",
                    'root_hash': computed_root,
                    'local_only': True,
                    'file_count': len(file_paths),
                }
            else:
                return False, {
                    'valid': False,
                    'message': "验证失败:Merkle Root 与本地时间戳记录不匹配",
                    'computed_root': computed_root,
                    'file_count': len(file_paths),
                }

        # Step 3: RFC 3161 TSR — 用 rfc3161ng 做真正的密码学签名验证
        computed_digest = bytes.fromhex(computed_root)
        try:
            from .tsa_client import TSAClient
            tsa_client = TSAClient()
            verify_result = tsa_client.verify_tsr(tsa_path, digest=computed_digest)

            if verify_result.get('valid'):
                # 同时解析 TSR 获取时间戳详情(签发时间、签发方)
                tsr_result = parse_tsr(tsa_path)
                return True, {
                    'valid': True,
                    'message': f"深度验证通过:{len(file_paths)} 个文件的 Merkle Root 与 TSA 签名匹配(密码学验证)",
                    'timestamp': tsr_result.get('timestamp'),
                    'issuer': tsr_result.get('issuer'),
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                }
            else:
                return False, {
                    'valid': False,
                    'message': f"TSA 签名验证失败:{verify_result.get('message', 'unknown')}",
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                }

        except ImportError:
            # rfc3161ng 未安装,降级为结构性比对(明确标注)
            tsr_result = parse_tsr(tsa_path)
            tsr_hash_raw = tsr_result.get('hash', '')
            tsr_hash = tsr_hash_raw.split(':', 1)[1] if ':' in tsr_hash_raw else tsr_hash_raw

            if computed_root.lower() == tsr_hash.lower():
                return True, {
                    'valid': True,
                    'message': f"结构验证通过(安装 rfc3161ng 可启用密码学签名验证)",
                    'timestamp': tsr_result.get('timestamp'),
                    'issuer': tsr_result.get('issuer'),
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                    'partial_verification': True,
                }
            else:
                return False, {
                    'valid': False,
                    'message': "验证失败:Merkle Root 与 TSR 记录的哈希不匹配",
                    'computed_root': computed_root,
                    'tsr_hash': tsr_hash,
                    'file_count': len(file_paths),
                }

    except Exception as e:
        return False, {'valid': False, 'message': f"深度验证失败: {e}"}


def batch_protect_works(
tools/rights/logic.py:verify_evidence_package()

3. Infringement Evidence Capture Core Implementation

3.1 Browser Screenshot

browser/session.py · L1057-L1071
GitHub
    def screenshot(self, path: Optional[str] = None) -> dict:
        """截图保存"""
        if not self._page:
            return {"success": False, "message": "浏览器未打开"}
        try:
            return self._run(self._async_screenshot(path))
        except Exception as e:
            return {"success": False, "message": f"截图失败: {e}"}

    async def _async_screenshot(self, path: Optional[str]) -> dict:
        import tempfile
        if not path:
            path = os.path.join(tempfile.gettempdir(), "nephele_screenshot.png")
        await self._page.screenshot(path=path, full_page=False)
        return {"success": True, "message": f"截图已保存: {path}", "output_path": path}
core/browser/session.py:BrowserManager.screenshot / _async_screenshot

Key fact: full_page=False. This is a viewport screenshot, not a scrolling full-page screenshot. Content below the fold on very long pages is not visually captured.


3.2 Infringement Evidence Capture Main Flow

rights/url_evidence.py · L669-L803
GitHub
    def capture(
        self,
        url: str,
        progress_callback=None,
    ) -> Dict:
        """
        Execute the full URL evidence capture pipeline.

        Args:
            url: Target URL to capture
            progress_callback: Optional (step, total, message) callback

        Returns:
            {
                "success": bool,
                "evidence_id": str,
                "output_dir": str,
                "manifest": dict,
                "timestamp_info": dict,
                "message": str,
            }
        """
        total_phases = 5
        self._record(f"Starting evidence capture for: {url}")

        try:
            # Phase 1: Environment + DNS + TLS certificate
            if progress_callback:
                progress_callback(1, total_phases, "Collecting environment info...")
            environment = self.collect_environment()
            dns_info = self.resolve_dns(url)
            tls_info = self.capture_tls_certificate(url)

            # Phase 2: Browser capture (navigate + screenshot + HTML + images)
            if progress_callback:
                progress_callback(2, total_phases, "Capturing page...")
            artifacts = self.capture_page(url, progress_callback=None)

            # Phase 3: Hash all artifacts (including TLS cert files)
            if progress_callback:
                progress_callback(3, total_phases, "Computing hashes...")
            file_hashes = self.hash_artifacts(artifacts)

            # Also hash TLS cert + response headers files
            for extra_key in ("der_path", "pem_path"):
                p = Path(tls_info.get(extra_key, ""))
                if p.exists():
                    file_hashes[p.name] = {
                        "sha256": self._sha256_file(p),
                        "size": p.stat().st_size,
                        "type": "tls_certificate",
                    }
            resp_headers_path = artifacts.get("response_headers_path")
            if resp_headers_path and Path(resp_headers_path).exists():
                p = Path(resp_headers_path)
                file_hashes[p.name] = {
                    "sha256": self._sha256_file(p),
                    "size": p.stat().st_size,
                    "type": "response_headers",
                }

            # Phase 4: Generate manifest (save log first so it's included)
            log_path = Path(self.save_log())
            self._log_committed = True  # Log content is now frozen for hashing
            file_hashes[log_path.name] = {
                "sha256": self._sha256_file(log_path),
                "size": log_path.stat().st_size,
                "type": "operation_log",
            }

            if progress_callback:
                progress_callback(4, total_phases, "Generating manifest...")
            manifest = self.generate_manifest(
                target_url=url,
                environment=environment,
                dns_info=dns_info,
                artifacts=artifacts,
                file_hashes=file_hashes,
                tls_info=tls_info,
            )

            # Phase 5: Timestamp
            if progress_callback:
                progress_callback(5, total_phases, "Requesting timestamp...")
            ts_info = self.timestamp_manifest(manifest)

            # DO NOT re-save operation log — the version already hashed in manifest
            # is the authoritative one. Any further _record() calls only live in memory.

            image_count = len(artifacts.get("images", []))

            # Phase 6: Package as .nep (tamper-proof archive)
            # Uses files on disk (which match manifest hashes)
            nep_path = self._package_nep()

            return {
                "success": True,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "nep_path": str(nep_path),
                "manifest": manifest,
                "timestamp_info": ts_info,
                "page_title": artifacts.get("page_title", ""),
                "image_count": image_count,
                "file_count": len(file_hashes),
                "message": (
                    f"URL evidence captured: {len(file_hashes)} files, "
                    f"{image_count} images, timestamp by {ts_info.get('issuer', 'N/A')}"
                ),
            }

        except URLEvidenceError as e:
            self._record(f"FATAL: {e}")
            if not self._log_committed:
                self.save_log()
            return {
                "success": False,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "message": str(e),
            }
        except Exception as e:
            self._record(f"UNEXPECTED ERROR: {e}")
            if not self._log_committed:
                self.save_log()
            logger.exception("URL evidence capture failed")
            return {
                "success": False,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "message": f"Unexpected error: {e}",
            }

    # ===== Helpers =====
tools/rights/url_evidence.py:URLEvidenceCapture.capture()

Log immutability guarantee:

  • save_log() is called before the manifest is generated
  • After writing, self._log_committed = True is set
  • Thereafter _record() only appends to an in-memory list and no longer writes to disk
  • The file_hashes in the manifest include the SHA-256 of the log file
  • Therefore the manifest hash anchors the state of the log after it has been "frozen"

3.3 TLS Certificate Capture

rights/url_evidence.py · L98-L172
GitHub
    def capture_tls_certificate(self, url: str) -> Dict:
        """
        Capture the server's TLS certificate chain.
        This proves the connection was made to the authentic server —
        you can't forge a CA-signed certificate.
        """
        import ssl
        from urllib.parse import urlparse

        self._record("Capturing TLS server certificate")
        parsed = urlparse(url if "://" in url else f"https://{url}")
        hostname = parsed.hostname or ""
        port = parsed.port or 443

        if not hostname:
            return {"error": "Invalid hostname"}

        try:
            ctx = ssl.create_default_context()
            with ctx.wrap_socket(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM),
                server_hostname=hostname,
            ) as sock:
                sock.settimeout(10)
                sock.connect((hostname, port))
                cert = sock.getpeercert()
                cert_der = sock.getpeercert(binary_form=True)

            # Save DER certificate to file
            cert_path = self._output_dir / "server_certificate.der"
            cert_path.write_bytes(cert_der)

            # Also save human-readable PEM
            import base64
            pem_data = (
                "-----BEGIN CERTIFICATE-----\n"
                + base64.encodebytes(cert_der).decode()
                + "-----END CERTIFICATE-----\n"
            )
            pem_path = self._output_dir / "server_certificate.pem"
            pem_path.write_text(pem_data, encoding="utf-8")

            # Extract key fields
            subject = dict(x[0] for x in cert.get("subject", ()))
            issuer = dict(x[0] for x in cert.get("issuer", ()))
            cert_info = {
                "subject_cn": subject.get("commonName", ""),
                "issuer_cn": issuer.get("commonName", ""),
                "issuer_org": issuer.get("organizationName", ""),
                "not_before": cert.get("notBefore", ""),
                "not_after": cert.get("notAfter", ""),
                "serial_number": cert.get("serialNumber", ""),
                "san": [
                    entry[1]
                    for entry in cert.get("subjectAltName", ())
                    if entry[0] == "DNS"
                ],
                "der_path": str(cert_path),
                "pem_path": str(pem_path),
                "der_sha256": hashlib.sha256(cert_der).hexdigest(),
            }

            self._record(
                f"TLS cert captured: {cert_info['subject_cn']} "
                f"(issuer: {cert_info['issuer_org']}, "
                f"serial: {cert_info['serial_number'][:16]}...)"
            )
            return cert_info

        except Exception as e:
            self._record(f"TLS certificate capture failed: {e}")
            return {"error": str(e)}

    # ===== Step 2: DNS resolution =====
tools/rights/url_evidence.py:URLEvidenceCapture.capture_tls_certificate()

Failure scenarios: self-signed certificate, ssl.SSLError, connection timeout → returns {"error": ...}, non-fatal.


3.4 CAPTCHA Detection and Handling

rights/url_evidence.py · L195-L208
GitHub
    _CAPTCHA_KEYWORDS = (
        "验证码", "验证", "captcha", "verify", "challenge",
        "human verification", "robot", "机器人",
    )

    def _is_captcha_page(self, title: str, url: str) -> bool:
        """Detect if the loaded page is a CAPTCHA or anti-bot challenge."""
        text = (title or "").lower()
        url_lower = (url or "").lower()
        for kw in self._CAPTCHA_KEYWORDS:
            if kw in text or kw in url_lower:
                return True
        return False
tools/rights/url_evidence.py:URLEvidenceCapture._CAPTCHA_KEYWORDS / _is_captcha_page

Handling flow:

  1. Navigate headless
  2. Detect title keywords → determine CAPTCHA
  3. Close the headless browser and open a visible browser
  4. Re-navigate
  5. Poll with time.sleep(2), up to 120 seconds
  6. After timeout, capture the current state

Blocking risk: during polling, time.sleep(2) blocks the current thread.


4. Invisible Watermark Technical Audit

4.1 Architecture Overview

The invisible watermark module is split into three layers:

LayerFileResponsibility
Underlying libraryblind_watermark (PyPI)DWT+DCT+SVD core embedding/extraction algorithm
Engine layertools/packer/watermark_protection.pyWrapper layer: fixed-length encoding, round-trip verification, alpha preservation, exception fallback
Business layertools/packer/logic.py / agent_api.pyPackaging-parameter orchestration, overlay of visible and invisible watermarks
Worker layercore/workers/watermark_worker.pyBackground-thread extraction to avoid blocking the UI

Everything runs locally, with zero network dependency.


4.2 Underlying Library Algorithm (blind_watermark)

The underlying library Nephele uses is blind_watermark (github.com/guofei9987/blind_watermark), which adopts a three-stage hybrid-domain embedding strategy: DWT (Discrete Wavelet Transform) → DCT (Discrete Cosine Transform) → SVD (Singular Value Decomposition).

4.2.1 The WaterMark Wrapper Class

blind_watermark/blind_watermark.py

python
class WaterMark:
    def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
        self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
        self.password_wm = password_wm
        self.wm_bit = None
        self.wm_size = 0
 
    def read_img(self, filename=None, img=None):
        if img is None:
            img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
        self.bwm_core.read_img_arr(img=img)
        return img
 
    def read_wm(self, wm_content, mode='img'):
        if mode == 'bit':
            self.wm_bit = np.array(wm_content)
        # ... img / str 模式省略 ...
 
        self.wm_size = self.wm_bit.size
        # 水印加密:用 password_wm 作为种子对 bit 序列做伪随机置乱
        np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
        self.bwm_core.read_wm(self.wm_bit)
 
    def embed(self, filename=None, compression_ratio=None):
        embed_img = self.bwm_core.embed()
        if filename is not None:
            cv2.imwrite(filename=filename, img=embed_img)
        return embed_img
 
    def extract_decrypt(self, wm_avg):
        # 逆置乱:根据相同的 seed 生成相同的 shuffle index,再逆序还原
        wm_index = np.arange(self.wm_size)
        np.random.RandomState(self.password_wm).shuffle(wm_index)
        wm_avg[wm_index] = wm_avg.copy()
        return wm_avg
 
    def extract(self, filename=None, embed_img=None, wm_shape=None, mode='img'):
        if filename is not None:
            embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
        self.wm_size = np.array(wm_shape).prod()
 
        if mode in ('str', 'bit'):
            wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
        else:
            wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
 
        wm = self.extract_decrypt(wm_avg=wm_avg)
        return wm

Key facts:

  • password_wm is used for the pseudo-random scrambling of the watermark bit sequence (np.random.RandomState.shuffle)
  • password_img is passed to WaterMarkCore and is used for the selection-scrambling of image blocks
  • The encryption is essentially a "deterministic shuffle based on a known seed", not modern cryptographic encryption

4.2.2 The WaterMarkCore Core Engine

blind_watermark/bwm_core.py

python
class WaterMarkCore:
    def __init__(self, password_img=1, mode='common', processes=None):
        self.block_shape = np.array([4, 4])
        self.password_img = password_img
        self.d1, self.d2 = 36, 20  # 量化步长:越大鲁棒性越强,但失真越大
        self.pool = AutoPool(mode=mode, processes=processes)

Image preprocessing (read_img_arr):

python
    def read_img_arr(self, img):
        # 处理透明图
        self.alpha = None
        if img.shape[2] == 4:
            if img[:, :, 3].min() < 255:
                self.alpha = img[:, :, 3]
                img = img[:, :, :3]
 
        # BGR -> YUV,补白边使像素变偶数(DWT 要求)
        self.img = img.astype(np.float32)
        self.img_shape = self.img.shape[:2]
        self.img_YUV = cv2.copyMakeBorder(
            cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
            0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
            cv2.BORDER_CONSTANT, value=(0, 0, 0)
        )
 
        # 对 Y/U/V 三个通道分别做 1 级 Haar DWT
        self.ca_shape = [(i + 1) // 2 for i in self.img_shape]
        self.ca_block_shape = (
            self.ca_shape[0] // self.block_shape[0],
            self.ca_shape[1] // self.block_shape[1],
            self.block_shape[0], self.block_shape[1]
        )
 
        for channel in range(3):
            self.ca[channel], self.hvd[channel] = dwt2(
                self.img_YUV[:, :, channel], 'haar'
            )
            # 将 CA(近似系数)转为 4D 分块数组
            self.ca_block[channel] = np.lib.stride_tricks.as_strided(
                self.ca[channel].astype(np.float32),
                self.ca_block_shape,
                strides=4 * np.array([
                    self.ca_shape[1] * self.block_shape[0],
                    self.block_shape[1], self.ca_shape[1], 1
                ])
            )

Audit points:

  • Color space: BGR → YUV; the watermark is embedded in the DWT approximation sub-band of the Y (luminance) channel
  • DWT levels: only 1-level Haar, not a multi-level decomposition
  • Block size: fixed at 4×4, cutting the CA sub-band into non-overlapping small blocks

4.2.3 Block-Level Embedding Flow (block_add_wm_slow)

python
    def block_add_wm_slow(self, arg):
        block, shuffler, i = arg
        wm_1 = self.wm_bit[i % self.wm_size]
 
        # Step 1: 对 4x4 块做 DCT
        block_dct = dct(block)
 
        # Step 2: flatten 后按 shuffler 打乱顺序(块内置乱)
        block_dct_shuffled = block_dct.flatten()[shuffler].reshape(self.block_shape)
 
        # Step 3: SVD 分解
        u, s, v = svd(block_dct_shuffled)
 
        # Step 4: 在奇异值上嵌入 1 bit 水印
        # 量化公式:把 s[0] 量化到 d1 的整数倍,再根据 wm_1 偏移 1/4 个步长
        s[0] = (s[0] // self.d1 + 1/4 + 1/2 * wm_1) * self.d1
        if self.d2:
            s[1] = (s[1] // self.d2 + 1/4 + 1/2 * wm_1) * self.d2
 
        # Step 5: 逆 SVD
        block_dct_flatten = np.dot(u, np.dot(np.diag(s), v)).flatten()
 
        # Step 6: 逆置乱
        block_dct_flatten[shuffler] = block_dct_flatten.copy()
 
        # Step 7: 逆 DCT
        return idct(block_dct_flatten.reshape(self.block_shape))

Mathematical principle:

The embedding formula (using s[0] as an example):

text
s'[0] = (floor(s[0] / d1) + 1/4 + 1/2 * w) * d1

where w ∈ {0, 1} is the watermark bit. During extraction:

text
w = 1  if (s[0] mod d1) > (d1 / 2)  else 0

d1=36 means the quantization interval per bit is 36, and the maximum perturbation to a coefficient is about 0.75 × d1 = 27.


4.2.4 Block-Level Extraction Flow (block_get_wm_slow)

python
    def block_get_wm_slow(self, args):
        block, shuffler = args
        block_dct_shuffled = dct(block).flatten()[shuffler].reshape(self.block_shape)
        u, s, v = svd(block_dct_shuffled)
 
        # 从 s[0] 提取 bit
        wm = (s[0] % self.d1 > self.d1 / 2) * 1
        if self.d2:
            # s[1] 作为辅助,加权平均
            tmp = (s[1] % self.d2 > self.d2 / 2) * 1
            wm = (wm * 3 + tmp * 1) / 4
        return wm

Audit points:

  • d2 (default 20) is the auxiliary quantization step; s[0] has weight 3 and s[1] has weight 1
  • When d2=0, it degenerates to single-singular-value extraction

4.2.5 Global Embedding Flow (embed)

python
    def embed(self):
        self.init_block_index()
        embed_ca = copy.deepcopy(self.ca)
 
        # 生成块选择置乱序列(跨块置乱)
        self.idx_shuffle = random_strategy1(
            self.password_img, self.block_num,
            self.block_shape[0] * self.block_shape[1]
        )
 
        for channel in range(3):
            # 对每个块并行执行 block_add_wm
            tmp = self.pool.map(self.block_add_wm, [
                (self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i], i)
                for i in range(self.block_num)
            ])
 
            # 写回 4D 数组
            for i in range(self.block_num):
                self.ca_block[channel][self.block_index[i]] = tmp[i]
 
            # 4D -> 2D,拼接回 CA 子带
            self.ca_part[channel] = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
            embed_ca[channel][:self.part_shape[0], :self.part_shape[1]] = self.ca_part[channel]
 
            # 逆 DWT
            embed_YUV[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")
 
        # 合并三通道,YUV -> BGR,裁剪回原始尺寸
        embed_img_YUV = np.stack(embed_YUV, axis=2)
        embed_img_YUV = embed_img_YUV[:self.img_shape[0], :self.img_shape[1]]
        embed_img = cv2.cvtColor(embed_img_YUV, cv2.COLOR_YUV2BGR)
        embed_img = np.clip(embed_img, a_min=0, a_max=255)
 
        if self.alpha is not None:
            embed_img = cv2.merge([embed_img.astype(np.uint8), self.alpha])
        return embed_img

Key facts:

  • Cyclic embedding: the watermark bit sequence is embedded repeatedly in a cycle across the block_num blocks (wm_bit[i % wm_size])
  • Three independent channels: each of the Y/U/V channels embeds a full copy of the watermark; extraction averages them
  • In-block scrambling (idx_shuffle[i]): the order of the 16 DCT coefficients inside each 4×4 block is shuffled
  • Cross-block order (block_index): the block traversal order is a fixed row-column scan and is not scrambled

4.2.6 K-Means Binarization (one_dim_kmeans)

python
def one_dim_kmeans(inputs):
    threshold = 0
    e_tol = 10 ** (-6)
    center = [inputs.min(), inputs.max()]
    for i in range(300):
        threshold = (center[0] + center[1]) / 2
        is_class01 = inputs > threshold
        center = [inputs[~is_class01].mean(), inputs[is_class01].mean()]
        if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol:
            threshold = (center[0] + center[1]) / 2
            break
    is_class01 = inputs > threshold
    return is_class01

Used in the extract_with_kmeans mode (Nephele's mode="bit" does not pass through this path; it directly returns wm_avg).


4.2.7 Random Scrambling Strategy

python
def random_strategy1(seed, size, block_shape):
    return np.random.RandomState(seed) \
        .random(size=(size, block_shape)) \
        .argsort(axis=1)

Generates a size × block_shape random matrix and applies argsort per row to obtain each row's scramble index. For password_img, size = block_num and block_shape = 16.


4.3 Engine Wrapper Layer Source

On top of the underlying library, Nephele adds fixed-length encoding, round-trip verification, alpha-channel preservation, and exception fallback.

4.3.1 Constants and Utility Functions

packer/watermark_protection.py · L1-L67
GitHub
"""
Nephele Workshop - Watermark Protection Module (local)

Embeds/extracts invisible blind watermarks using blind_watermark locally.
No network dependency — works offline.

Uses file-based I/O + bit mode for reliable embedding/extraction
(numpy array mode has known issues with blind_watermark library).

Public API unchanged:
    protect_image(image, level, copyright_info) -> Image
    extract_watermark(image) -> str | None
    save_with_watermark(image, output_path) -> bool

Developer: ArisFusion Studio
"""

import logging
import tempfile
from enum import Enum
from pathlib import Path
from typing import Optional

import numpy as np
from PIL import Image

logger = logging.getLogger(__name__)

# Watermark payload: fixed 32 bytes (256 bits)
# Enough for 10 Chinese chars (UTF-8, 3 bytes each) or 32 ASCII chars
WATERMARK_BYTES = 32
WATERMARK_BITS = WATERMARK_BYTES * 8
_WM_PASSWORD_IMG = 2024
_WM_PASSWORD_WM = 1314


class ProtectionLevel(Enum):
    NONE = "none"
    INVISIBLE = "invisible"


LEVEL_ALIASES = {"maximum": "invisible"}


def _text_to_bits(text: str) -> list[int]:
    """Convert text to fixed-length bit array via UTF-8."""
    raw = text.encode("utf-8")[:WATERMARK_BYTES]
    padded = raw.ljust(WATERMARK_BYTES, b"\x00")
    bits = []
    for byte in padded:
        for i in range(7, -1, -1):
            bits.append((byte >> i) & 1)
    return bits


def _bits_to_text(bits: list) -> str:
    """Convert bit array back to text via UTF-8."""
    raw = bytearray()
    for i in range(0, len(bits), 8):
        chunk = bits[i:i+8]
        if len(chunk) < 8:
            break
        val = 0
        for b in chunk:
            val = (val << 1) | (1 if b > 0.5 else 0)
        raw.append(val)
    return raw.rstrip(b"\x00").decode("utf-8", errors="replace")
tools/packer/watermark_protection.py: constants & bit helpers

Audit points:

  • UTF-8 fixed-length truncation: overly long text is silently truncated to 32 bytes
  • Threshold decision: during extraction b > 0.5 is treated as 1, giving some tolerance to noise

4.3.2 Embedding Engine

packer/watermark_protection.py · L70-L122
GitHub
class WatermarkEngine:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def embed(self, image: Image.Image, text: str) -> Image.Image:
        """Embed invisible watermark using file-based blind_watermark."""
        try:
            from blind_watermark import WaterMark

            rgb = image.convert("RGB")
            alpha = image.split()[3] if image.mode == "RGBA" else None

            with tempfile.TemporaryDirectory() as tmpdir:
                orig_path = str(Path(tmpdir) / "orig.png")
                wm_path = str(Path(tmpdir) / "watermarked.png")

                rgb.save(orig_path, format="PNG")

                bits = _text_to_bits(text)

                bwm = WaterMark(password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM)
                bwm.read_img(orig_path)
                bwm.read_wm(np.array(bits), mode="bit")
                bwm.embed(wm_path)

                result_img = Image.open(wm_path).convert("RGB")

                # Verify extraction round-trip
                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(wm_path, wm_shape=WATERMARK_BITS, mode="bit")
                extracted_text = _bits_to_text(extracted_bits)

                if extracted_text == text[:WATERMARK_BYTES]:
                    logger.info("[Watermark] Verified: '%s'", extracted_text)
                else:
                    logger.warning("[Watermark] Verify mismatch: '%s' -> '%s'",
                                   text[:WATERMARK_BYTES], extracted_text)

            if alpha:
                result_img = result_img.convert("RGBA")
                result_img.putalpha(alpha)

            return result_img

        except Exception as e:
            logger.error("Embed failed: %s", e, exc_info=True)
            return image
tools/packer/watermark_protection.py:WatermarkEngine.__new__ / embed

Audit points:

  • Singleton pattern: WatermarkEngine is a singleton, but a new blind_watermark.WaterMark instance is created each time
  • File-level I/O: it works via a TemporaryDirectory + a temporary PNG file, sidestepping the dtype/shape compatibility bugs of the numpy-array mode
  • A verification failure only logs a warning and still returns the watermarked image
  • Alpha channel: RGBA input is first converted to RGB for embedding, then alpha is restored afterward
  • Exception fallback: any exception returns the original image, with the caller unaware of the failure

4.3.3 Extraction Engine

packer/watermark_protection.py · L123-L145
GitHub
    def extract(self, image: Image.Image) -> Optional[str]:
        """Extract invisible watermark using file-based blind_watermark."""
        try:
            from blind_watermark import WaterMark

            rgb = image.convert("RGB")
            bit_len = WATERMARK_BITS

            with tempfile.TemporaryDirectory() as tmpdir:
                img_path = str(Path(tmpdir) / "check.png")
                rgb.save(img_path, format="PNG")

                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(img_path, wm_shape=bit_len, mode="bit")

            text = _bits_to_text(extracted_bits)
            return text.strip() if text.strip() else None
        except Exception as e:
            logger.warning("Extract failed: %s", e)
            return None

tools/packer/watermark_protection.py:WatermarkEngine.extract

Audit points:

  • A failed extraction returns None, making it impossible to distinguish "the image has no watermark" from "an error occurred during extraction"
  • An empty string (all-zero padding) likewise returns None after strip()

4.3.4 Public API

packer/watermark_protection.py · L146-L167
GitHub
def protect_image(
    image: Image.Image,
    level: str = "none",
    copyright_info: str = "ARIS"
) -> Image.Image:
    level = LEVEL_ALIASES.get(level, level)
    if level == "invisible":
        return WatermarkEngine().embed(image, copyright_info)
    return image


def extract_watermark(image: Image.Image) -> Optional[str]:
    return WatermarkEngine().extract(image)


def save_with_watermark(image: Image.Image, output_path: str) -> bool:
    try:
        image.save(output_path, format='PNG')
        return True
    except Exception as e:
        logger.warning("Save failed: %s", e)
        return False
tools/packer/watermark_protection.py:protect_image / extract_watermark / save_with_watermark

4.4 Business-Layer Invocation

packer/agent_api.py · L19-L76
GitHub
def pack_image(
    input_path: str,
    watermark_path: Optional[str] = None,
    output_dir: Optional[str] = None,
    watermark_mode: str = "center",
    watermark_opacity: float = 0.3,
    preview_max_size: int = 1920,
    thumbnail_max_size: int = 500,
    protection_level: str = "none",
    copyright_info: str = "© ArisFusion Studio",
    output_folder_name: str = "Delivery_Pack",
) -> dict:
    """One-click image packing: HD + preview + thumbnail."""
    try:
        src = Path(input_path)
        if not src.exists():
            return api_err(f"文件不存在: {input_path}")

        mode_map = {"center": WatermarkMode.CENTER, "tile": WatermarkMode.TILE}
        wm_mode = mode_map.get(watermark_mode, WatermarkMode.CENTER)

        wm_path = Path(watermark_path) if watermark_path else None
        if wm_path and not wm_path.exists():
            return api_err(f"水印文件不存在: {watermark_path}")

        out_dir = Path(output_dir) if output_dir else None

        packer = DeliveryPacker(
            preview_max_size=preview_max_size,
            thumbnail_max_size=thumbnail_max_size,
            watermark_opacity=watermark_opacity,
            protection_level=protection_level,
            copyright_info=copyright_info,
            output_folder_name=output_folder_name,
        )

        result_dir, results = packer.process_image(
            input_path=src,
            watermark_path=wm_path,
            output_dir=out_dir,
            watermark_mode=wm_mode,
        )

        file_info = {k: str(v) for k, v in results.items()}
        return api_ok(
            f"打包完成,共生成 {len(results)} 个文件",
            output_path=str(result_dir),
            data={"files": file_info},
        )

    except PackerError as e:
        logger.error("打包失败: %s", e)
        return api_err(str(e))
    except Exception as e:
        logger.exception("打包时发生意外错误")
        return api_err(f"意外错误: {e}")

tools/packer/agent_api.py:pack_image()

Key facts:

  • protection_level defaults to "none", i.e. the invisible watermark is disabled by default
  • copyright_info is at most 32 bytes (silently truncated if longer)

4.5 Worker-Layer Implementation

workers/watermark_worker.py · L1-L47
GitHub
"""Watermark Extraction Worker"""
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication

_tr = QCoreApplication.translate

from .._utils import ensure_src_path


class WatermarkExtractWorker(QThread):
    """Worker thread for watermark extraction (rivaGan model loading is slow)."""
    finished = Signal(str)  # watermark result or empty string
    logMessage = Signal(str, str)  # (message, level)

    def __init__(self, image_path: str):
        super().__init__()
        self.image_path = image_path

    def run(self):
        """Extract watermark in background thread."""
        try:
            ensure_src_path()

            from PIL import Image
            from tools.packer.watermark_protection import extract_watermark

            img_path = Path(self.image_path)
            if not img_path.exists():
                self.logMessage.emit(_tr("WatermarkExtractWorker", "文件不存在: %s") % self.image_path, "error")
                self.finished.emit("")
                return

            self.logMessage.emit(_tr("WatermarkExtractWorker", "正在提取水印: %s") % img_path.name, "info")

            image = Image.open(img_path)
            watermark = extract_watermark(image)

            if watermark:
                self.logMessage.emit(_tr("WatermarkExtractWorker", "提取成功: %s") % watermark, "success")
                self.finished.emit(watermark)
            else:
                self.logMessage.emit("未检测到隐形水印", "warning")
                self.finished.emit("")

        except Exception as e:
            self.logMessage.emit(f"提取失败: {str(e)}", "error")
            self.finished.emit("")
core/workers/watermark_worker.py (full file)

4.6 Capacity and Encoding

EncodingBytes per characterMax characters
ASCII132
CJK (UTF-8)310
MixedDepends on the specific characters

5. White-Box Source Audit of AI Metadata Detection

This section audits Nephele Workshop's AI metadata / C2PA credential detection feature. This feature reads machine-readable evidence already present in image files, including C2PA content credentials, generation-tool metadata, platform declarations, and export traces. It does not use a visual style classification model, and it does not interpret "not detected" as "not AI".

5.1 Architecture Overview

AI metadata detection is split into four layers:

LayerFileResponsibility
Rule layertools/validator/logic.py:MetaDataDetectorMetadata reading, rule matching, evidence grading, final status output
C2PA layertools/validator/c2pa_verifier.pyOfficial C2PA SDK adapter, manifest reading, signature-chain and trust-state parsing
Worker layercore/workers/ai_detector_worker.pyBatch detection thread, error isolation, result signals
UI layergui/qml/views/AIValidatorView.qmlMaps raw evidence into user-friendly evidence labels

Data flow:

text
用户选择图片
  -> core/workers/ai_detector_worker.py
  -> tools/validator/logic.py:MetaDataDetector.detect()
       ├── Pillow 读取 PNG/JPEG/WebP/TIFF 元数据(PNG info / EXIF)
       ├── tools/validator/c2pa_verifier.py:verify_c2pa_file()(官方 SDK)
       └── 原始字节扫描(JUMBF / APP11 fallback)
  -> 返回 {status, reason, tool, evidence}
  -> UI 映射证据标签

Detection result structure:

python
{
    "status": "ai" | "unknown" | "human" | "error",
    "reason": str,
    "tool": str,
    "evidence": str,
}

注意

status="human" is a legacy field name. The UI should not present it as "definitively a human work", but rather as "no credentials found" or "insufficient credentials".


5.2 The MetaDataDetector Rule Layer

tools/validator/logic.py:MetaDataDetector is the main rule-matching class. It holds no state of its own, and __init__ is empty.

5.2.1 Rule Constants

validator/logic.py · L20-L105
GitHub
class MetaDataDetector:
    """
    Detects AI generation metadata from image files using heuristic analysis
    of EXIF, PNG info chunks, and generation parameters.
    """

    # 1. 明确的软件签名 (强特征,优先匹配专有字符串)
    # 顺序:Midjourney 先于 Gemini,以避免误判
    AI_SOFTWARE_SIGNATURES = {
        "Midjourney": [
            re.compile(r"job id:\s*[a-f0-9\-]+", re.IGNORECASE),  # 专有 Job ID
            re.compile(r"--ar\s*\d+:\d+", re.IGNORECASE),         # --ar 参数
            re.compile(r"--v\s*\d+", re.IGNORECASE),              # --v 参数
            re.compile(r"--stylize\s*\d+", re.IGNORECASE),        # --stylize
            re.compile(r"midjourney", re.IGNORECASE),             # 显式名称
            re.compile(r"mj v", re.IGNORECASE),
            re.compile(r"mj_", re.IGNORECASE),
        ],
        "ComfyUI": [
            re.compile(r"comfyui", re.IGNORECASE),
            re.compile(r"comfyland", re.IGNORECASE),
            # workflow/prompt JSON 在 detect 中单独处理
        ],
        "Gemini (Google)": [
            re.compile(r"gemini", re.IGNORECASE),
            re.compile(r"google deepmind", re.IGNORECASE),
            re.compile(r"generated by google", re.IGNORECASE),
            re.compile(r"google imagen", re.IGNORECASE),          # 必须有 "google"
            re.compile(r"imagen by google", re.IGNORECASE),
            re.compile(r"synthid", re.IGNORECASE),                # Google SynthID 水印
            re.compile(r"nano banana", re.IGNORECASE),            # 你的工具标识
            re.compile(r"nanobanana", re.IGNORECASE),
        ],
        "DALL-E": [re.compile(r"dall-e", re.IGNORECASE), re.compile(r"dalle", re.IGNORECASE)],
        "NovelAI": [re.compile(r"novelai", re.IGNORECASE), re.compile(r"nai-diffusion", re.IGNORECASE)],
        "InvokeAI": [re.compile(r"invokeai", re.IGNORECASE), re.compile(r"invoke ai", re.IGNORECASE)],
        "Fooocus": [re.compile(r"fooocus", re.IGNORECASE)],
        "Stable Diffusion": [re.compile(r"stable diffusion", re.IGNORECASE), re.compile(r"sd\.?next", re.IGNORECASE), re.compile(r"forge", re.IGNORECASE)],
        "Leonardo.ai": [re.compile(r"leonardo.ai", re.IGNORECASE)],
        "Adobe Firefly": [re.compile(r"adobe firefly", re.IGNORECASE)],
        "Bing Image Creator": [re.compile(r"bing image creator", re.IGNORECASE)],
    }

    # 2. 生成参数指纹 (次优先,当没有软件名时)
    # 移除 Gemini 专有,将 trainedAlgorithmicMedia 作为通用 AI 标记
    GENERATION_PARAM_FINGERPRINTS = [
        (r"Steps:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"CFG scale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Sampler:\s*\w+", "Stable Diffusion (Parameters)"),
        (r"Seed:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Model hash:\s*[a-f0-9]+", "Stable Diffusion (Parameters)"),
        (r"Model:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Negative prompt:", "Stable Diffusion (Parameters)"),
        (r"Size:\s*\d+x\d+", "Stable Diffusion (Parameters)"),
        (r"Clip skip:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Schedule type:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Denoising strength:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Hires upscale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        # 更新 regex 以匹配 IPTC/XMP 变体(包括 URL)
        (r"DigitalSourceType\s*[:=]\s*(?:http://cv\.iptc\.org/newscodes/digitalsourcetype/)?trainedAlgorithmicMedia", "Generative AI (IPTC/XMP Standard)"),
    ]

    RAW_METADATA_SCAN_LIMIT = 128 * 1024 * 1024

    C2PA_CONTAINER_MARKERS = [
        b"c2pa",
        b"jumbf",
        b"content credentials",
        b"contentcredentials",
        b"contentauth",
    ]

    C2PA_AI_MARKERS = [
        b"trainedalgorithmicmedia",
        b"compositewithtrainedalgorithmicmedia",
        b"algorithmicmedia",
        b"generated by ai",
        b"ai generated",
        b"adobe firefly",
        b"google imagen",
        b"synthid",
        b"dall-e",
        b"dalle",
        b"midjourney",
        b"stable diffusion",
    ]
tools/validator/logic.py:MetaDataDetector class header & rule constants

Audit points:

  • Rule hierarchy: software signatures (strong) → parameter fingerprints (secondary) → C2PA container + AI marker (strong) → weak features (file name)
  • Order-sensitive: AI_SOFTWARE_SIGNATURES is a dict, and Python 3.7+ preserves insertion order. Midjourney is listed before Gemini (Google), to prevent a Midjourney image that references a Google tool internally from being misclassified as Gemini.
  • SynthID ambiguity: the synthid string is treated as strong evidence for Gemini, but this is only a string match — this module does not decode the SynthID pixel watermark.
  • Generic IPTC marker: trainedAlgorithmicMedia is not exclusive to Google; it is attributed to Gemini only when Google evidence also appears in the same text, otherwise it is classified as Generative AI (Unknown).
  • Raw-scan upper limit: 128 MB; beyond that, the byte-level fallback is skipped (relying only on Pillow and the C2PA SDK).

5.2.2 The detect() Main Flow

validator/logic.py · L444-L657
GitHub
    def detect(self, image_path: str) -> Dict[str, str]:
        """
        检测图像文件中的 AI 生成元数据
        
        Args:
            image_path: 图像文件路径
            
        Returns:
            检测结果字典,包含:
            - status: "ai" | "human" | "error"
            - reason: 检测原因描述
            - tool: 检测到的 AI 工具名称(如果为 AI)
            - evidence: 证据描述
        """
        img_path = Path(image_path)
        if not img_path.exists():
            return {"status": "error", "reason": "文件不存在", "tool": "", "evidence": ""}

        evidence_found = []
        weak_evidence_found = []
        context_evidence_found = []
        detected_tool = None
        extension_mismatch = False
        minimal_web_jpeg = False
        
        try:
            with Image.open(img_path) as img:
                img.load()  # Ensure header is loaded
                actual_format = (img.format or "").upper()
                suffix = img_path.suffix.lower()
                if actual_format == "JPEG" and suffix not in {".jpg", ".jpeg", ".jpe"}:
                    extension_mismatch = True
                    context_evidence_found.append(f"File extension mismatch: {suffix} file contains JPEG data")
                elif actual_format == "PNG" and suffix != ".png":
                    extension_mismatch = True
                    context_evidence_found.append(f"File extension mismatch: {suffix} file contains PNG data")

                minimal_jpeg_keys = {"jfif", "jfif_density", "jfif_unit", "jfif_version", "progression", "progressive"}
                if actual_format == "JPEG" and set(img.info.keys()).issubset(minimal_jpeg_keys):
                    minimal_web_jpeg = True
                    context_evidence_found.append("Minimal JPEG metadata only")

                # --- 1. Check PNG Info / tEXt chunks ---
                if hasattr(img, 'info') and img.info:
                    software_value = img.info.get("Software") or img.info.get("software")
                    if isinstance(software_value, bytes):
                        software_text = software_value.decode("utf-8", errors="ignore")
                    else:
                        software_text = str(software_value or "")
                    software_lower = software_text.lower()
                    if "celsys" in software_lower or "clip studio" in software_lower:
                        context_evidence_found.append("Edited/exported by CELSYS/Clip Studio Paint")

                    if not detected_tool:
                        for key, val in img.info.items():
                            if not isinstance(val, (str, bytes)):
                                continue
                            structured_res = self._detect_structured_generator_metadata(key, val)
                            if structured_res:
                                detected_tool, marker = structured_res
                                evidence_found.append(marker)
                                break

                    # Case A: A1111 / SD
                    if 'parameters' in img.info:
                        val = img.info['parameters']
                        if isinstance(val, str) and ("Steps:" in val or "Prompt" in val):
                            detected_tool = "Stable Diffusion (A1111)"
                            evidence_found.append("Stable Diffusion parameters chunk")
                            res = self._analyze_text(val)
                            if res: 
                                detected_tool, marker = res
                                evidence_found.append(f"Parameters: {marker}")

                    # Case B: ComfyUI (专有检查)
                    if not detected_tool and ('workflow' in img.info or 'prompt' in img.info):
                        # 验证是否是 JSON
                        try:
                            if 'workflow' in img.info:
                                json.loads(img.info['workflow'])
                                evidence_found.append("Valid 'workflow' JSON")
                            if 'prompt' in img.info:
                                json.loads(img.info['prompt'])
                                evidence_found.append("Valid 'prompt' JSON")
                            detected_tool = "ComfyUI"
                        except json.JSONDecodeError:
                            pass  # 非 JSON,忽略

                    # Case C: Generic Scan (其他 info)
                    if not detected_tool:
                        for key, val in img.info.items():
                            if isinstance(val, (str, bytes)):
                                val_str = self._metadata_value_to_text(val)
                                res = self._analyze_text(val_str)
                                if res:
                                    detected_tool, marker = res
                                    evidence_found.append(f"PNG Info '{key}': {marker}")
                                    break

                    for key, val in img.info.items():
                        if not isinstance(val, (str, bytes)):
                            continue
                        val_str = val.decode("utf-8", errors="ignore") if isinstance(val, bytes) else str(val)
                        if "DigitalSourceType" in val_str and "trainedAlgorithmicMedia" in val_str:
                            marker = f"PNG Info '{key}': IPTC/XMP trainedAlgorithmicMedia"
                            if marker not in evidence_found:
                                evidence_found.append(marker)

                # --- 2. Check EXIF / XMP Data ---
                if not detected_tool:
                    exif = img.getexif()
                    if exif:
                        for tag_id, value in exif.items():
                            tag_name = ExifTags.TAGS.get(tag_id, str(tag_id))
                            
                            # Handle UserComment or other bytes
                            if isinstance(value, bytes):
                                try:
                                    value_str = value.decode('utf-8', errors='ignore')
                                except:
                                    continue
                            else:
                                value_str = str(value)

                            res = self._analyze_text(value_str)
                            if res:
                                detected_tool, marker = res
                                evidence_found.append(f"EXIF {tag_name}: {marker}")
                                break

        except Exception as e:
            return {"status": "error", "reason": f"读取错误: {str(e)}", "tool": "", "evidence": ""}

        # --- 3. Official C2PA manifest/signature verification ---
        c2pa_available = False
        c2pa_has_manifest = False
        c2pa_claim_generator = ""
        try:
            from tools.validator.c2pa_verifier import verify_c2pa_file

            c2pa_result = verify_c2pa_file(img_path)
            c2pa_available = c2pa_result.available
            if c2pa_result.has_manifest:
                c2pa_has_manifest = True
                c2pa_claim_generator = c2pa_result.claim_generator or ""
                evidence_found.append(c2pa_result.evidence_summary())
                if c2pa_result.ai_generated:
                    detected_tool = "Generative AI (C2PA Content Credentials)"
            elif c2pa_result.available and c2pa_result.error:
                evidence_found.append(f"C2PA verification error: {c2pa_result.error}")
            elif c2pa_result.available:
                evidence_found.append("No C2PA manifest found")
        except Exception as e:
            evidence_found.append(f"C2PA verification error: {e}")

        # --- 4. Raw metadata fallback: C2PA/JUMBF/XMP payloads ---
        if not detected_tool and not c2pa_has_manifest:
            res, has_c2pa_container = self._scan_raw_metadata(img_path)
            if res:
                detected_tool, marker = res
                prefix = "Raw metadata"
                if not c2pa_available and has_c2pa_container:
                    prefix = "Raw metadata (official C2PA verifier unavailable)"
                evidence_found.append(f"{prefix}: {marker}")
            elif has_c2pa_container:
                if c2pa_available:
                    evidence_found.append("C2PA Content Credentials found, no AI generation marker")
                else:
                    evidence_found.append("C2PA Content Credentials found, official verifier unavailable")

        # --- 5. Visible Google/Gemini watermark ---
        google_context = "google" in c2pa_claim_generator.lower()
        filename_lower = img_path.name.lower()
        filename_gemini_hint = "gemini" in filename_lower or "google" in filename_lower
        if not detected_tool and (google_context or filename_gemini_hint):
            if self._detect_google_visible_watermark(img_path):
                detected_tool = "Gemini (Google Visible Watermark)"
                evidence_found.append("Visible watermark: Google/Gemini sparkle mark")

        # --- 6. Final Fallback: Filename Check ---
        if not detected_tool:
            res = self._analyze_text(filename_lower, include_weak_markers=False)
            if res:
                weak_tool, marker = res
                if c2pa_has_manifest:
                    weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")
                else:
                    weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")

        if extension_mismatch and minimal_web_jpeg:
            context_evidence_found.append("Downloaded file appears re-encoded or metadata-stripped")

        # --- Result Construction ---
        if detected_tool:
            return {
                "status": "ai",
                "reason": f"检测到 {detected_tool} 元数据",
                "tool": detected_tool,
                "evidence": "; ".join(evidence_found + context_evidence_found)
            }
        else:
            if weak_evidence_found:
                return {
                    "status": "unknown",
                    "reason": "AI indicators found, but C2PA credentials do not declare AI generation",
                    "tool": "",
                    "evidence": "; ".join(evidence_found + context_evidence_found + weak_evidence_found)
                }
            reason = "C2PA Content Credentials do not declare AI generation" if c2pa_has_manifest else "No known AI generation metadata detected"
            return {
                "status": "human",
                "reason": reason,
                "tool": "",
                "evidence": "; ".join(evidence_found + context_evidence_found) if evidence_found or context_evidence_found else "No metadata signatures found"
tools/validator/logic.py:MetaDataDetector.detect()

Audit points (execution order):

  1. Open a Pillow handle and identify contextual evidence such as format/suffix mismatch, minimal JPEG metadata, and CELSYS export (these do not upgrade the status)
  2. PNG info: parameters → A1111; workflow / prompt JSON → ComfyUI; other keys are identified via _detect_structured_generator_metadata() as NovelAI / InvokeAI / Fooocus; finally a generic _analyze_text() pass is run
  3. EXIF: run _analyze_text() on each tag
  4. Official C2PA SDK (verify_c2pa_file): if a manifest exists, grab claim_generator and AI markers
  5. Raw-byte fallback: executed only when the first two steps both miss; it reads the entire file header (≤ 128 MB) and searches for c2pa / jumbf / contentauth containers + AI-marker strings
  6. Google visible watermark: enabled only when "there is a Google claim_generator" or "the file name contains gemini / google", to avoid a visual scan of every image
  7. Weak file-name hints: never trigger ai, at most produce unknown

Status convergence rules:

Evidence combinationstatus
detected_tool is assigned (any strong evidence hit)ai
No strong evidence, but weak_evidence_found (a file-name hint)unknown
No evidence at all + a C2PA manifest exists but does not declare AIhuman (reason: "C2PA Content Credentials do not declare AI generation")
No evidence at all + no manifesthuman (reason: "No known AI generation metadata detected")
Pillow raises an exceptionerror

注意

Both "no metadata" and "a C2PA manifest exists but declares non-AI" map to human in terms of status; they can only be distinguished via reason. The UI must read reason, otherwise it will mistakenly present an image whose "metadata has been stripped" as "confirmed non-AI".


5.2.3 Metadata Helper Parsing

validator/logic.py · L110-L171
GitHub
    def _decode_metadata_bytes(self, data: bytes) -> str:
        """Best-effort decoding for embedded XMP/C2PA text inside binary assets."""
        if not data:
            return ""
        text = data.decode("utf-8", errors="ignore")
        if len(text.strip()) < 8:
            text = data.decode("latin-1", errors="ignore")
        return text

    def _metadata_value_to_text(self, value: Any) -> str:
        if isinstance(value, bytes):
            return value.decode("utf-8", errors="ignore")
        return str(value)

    def _json_loads(self, value: str) -> Any:
        try:
            return json.loads(value)
        except Exception:
            return None

    def _json_has_keys(self, value: Any, keys: set[str]) -> bool:
        if isinstance(value, dict):
            lowered = {str(key).lower() for key in value.keys()}
            if lowered.intersection(keys):
                return True
            return any(self._json_has_keys(item, keys) for item in value.values())
        if isinstance(value, list):
            return any(self._json_has_keys(item, keys) for item in value)
        return False

    def _detect_structured_generator_metadata(self, key: str, value: Any) -> Optional[Tuple[str, str]]:
        text = self._metadata_value_to_text(value)
        lowered_key = key.lower()
        lowered_text = text.lower()
        parsed = self._json_loads(text)

        if "invoke" in lowered_key or "invokeai" in lowered_text or "invoke ai" in lowered_text:
            return "InvokeAI", f"PNG Info '{key}': InvokeAI metadata"

        if "fooocus" in lowered_key or "fooocus" in lowered_text:
            return "Fooocus", f"PNG Info '{key}': Fooocus metadata"

        if "novelai" in lowered_text or "nai-diffusion" in lowered_text:
            return "NovelAI", f"PNG Info '{key}': NovelAI metadata"

        if isinstance(parsed, dict):
            has_generation_keys = self._json_has_keys(
                parsed,
                {"sampler", "sampler_name", "steps", "scale", "cfg_scale", "seed", "model", "model_hash", "uc"},
            )
            has_novelai_shape = self._json_has_keys(parsed, {"uc"}) and self._json_has_keys(parsed, {"sampler", "steps", "scale"})
            if has_novelai_shape:
                return "NovelAI", f"PNG Info '{key}': NovelAI generation JSON"

            if "invoke" in lowered_key and has_generation_keys:
                return "InvokeAI", f"PNG Info '{key}': InvokeAI generation JSON"

            if lowered_key in {"sd-metadata", "sd_metadata", "generation_data", "generation_data_formatted"} and has_generation_keys:
                return "Stable Diffusion", f"PNG Info '{key}': Stable Diffusion generation JSON"

        return None
tools/validator/logic.py:MetaDataDetector helpers

Audit points:

  • NovelAI criterion: a JSON containing both uc and one of (sampler / steps / scale) → NovelAI generation JSON. This shape-based criterion allows NovelAI to remain identifiable even after explicit names have been stripped.
  • InvokeAI / SD JSON: both "key name + generation field" must hit simultaneously, to prevent arbitrary JSON from being treated as generation metadata.

5.2.4 Raw-Byte Scan (C2PA/JUMBF fallback)

validator/logic.py · L172-L317
GitHub
    def _analyze_c2pa_bytes(self, data: bytes) -> Optional[Tuple[str, str]]:
        """Detect AI signals embedded in C2PA/JUMBF Content Credentials payloads."""
        if not data:
            return None

        lowered = data.lower()
        has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
        if not has_c2pa_container:
            return None

        for marker in self.C2PA_AI_MARKERS:
            if marker in lowered:
                return "Generative AI (C2PA Content Credentials)", marker.decode("ascii", errors="ignore")

        text = self._decode_metadata_bytes(data)
        res = self._analyze_text(text, include_weak_markers=False)
        if res:
            tool, marker = res
            return tool, f"C2PA payload: {marker}"

        return None

    # PNG ancillary chunks that may carry metadata text. Skip IDAT/PLTE/etc.
    PNG_METADATA_CHUNKS = {b"tEXt", b"iTXt", b"zTXt", b"eXIf", b"iCCP", b"caBX", b"jumb"}
    # WebP RIFF chunks that may carry metadata.
    WEBP_METADATA_CHUNKS = {b"EXIF", b"XMP ", b"ICCP", b"JUMB"}

    def _extract_png_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        pos = 8  # skip signature
        n = len(data)
        iend_end = n
        while pos + 12 <= n:
            try:
                length = int.from_bytes(data[pos:pos + 4], "big")
                chunk_type = data[pos + 4:pos + 8]
            except Exception:
                break
            data_end = pos + 8 + length
            if length < 0 or data_end + 4 > n:
                break
            if chunk_type in self.PNG_METADATA_CHUNKS:
                out.extend(data[pos + 8:data_end])
                out.append(0)
            if chunk_type == b"IEND":
                iend_end = data_end + 4
                break
            pos = data_end + 4  # skip CRC
        # Some pipelines append C2PA/JUMBF payloads after IEND. Include any
        # trailing bytes verbatim — they cannot be pixel data.
        if iend_end < n:
            out.extend(data[iend_end:])
        return bytes(out)

    def _extract_jpeg_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        pos = 2  # skip SOI (FFD8)
        n = len(data)
        while pos + 4 <= n:
            if data[pos] != 0xFF:
                break
            # Skip fill bytes
            while pos < n and data[pos] == 0xFF:
                pos += 1
            if pos >= n:
                break
            marker = data[pos]
            pos += 1
            # Standalone markers without length payload
            if marker == 0xD9:  # EOI
                break
            if marker == 0xDA:  # SOS — compressed image data starts here
                break
            if marker == 0x00 or marker == 0x01 or 0xD0 <= marker <= 0xD8:
                continue
            if pos + 2 > n:
                break
            seg_len = int.from_bytes(data[pos:pos + 2], "big")
            if seg_len < 2 or pos + seg_len > n:
                break
            seg_data = data[pos + 2:pos + seg_len]
            # APP0..APP15 (E0..EF) and COM (FE) carry text-style metadata
            if 0xE0 <= marker <= 0xEF or marker == 0xFE:
                out.extend(seg_data)
                out.append(0)
            pos += seg_len
        return bytes(out)

    def _extract_webp_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        n = len(data)
        if n < 12:
            return b""
        pos = 12  # skip RIFF + size + WEBP
        while pos + 8 <= n:
            fourcc = data[pos:pos + 4]
            size = int.from_bytes(data[pos + 4:pos + 8], "little")
            if size < 0 or pos + 8 + size > n:
                break
            if fourcc in self.WEBP_METADATA_CHUNKS:
                out.extend(data[pos + 8:pos + 8 + size])
                out.append(0)
            pos += 8 + size + (size & 1)  # chunks are padded to even size
        return bytes(out)

    def _extract_metadata_blocks(self, data: bytes) -> bytes:
        """
        Return bytes from text/metadata-bearing containers only, skipping
        pixel/compressed payloads. This prevents random regex hits in raw
        IDAT / SOS data from being mistaken for AI markers.
        """
        if data.startswith(b"\x89PNG\r\n\x1a\n"):
            return self._extract_png_metadata(data)
        if data[:2] == b"\xff\xd8":
            return self._extract_jpeg_metadata(data)
        if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
            return self._extract_webp_metadata(data)
        # Unknown format: keep legacy behavior (whole file).
        return data

    def _scan_raw_metadata(self, image_path: Path) -> Tuple[Optional[Tuple[str, str]], bool]:
        """
        Scan only metadata-bearing chunks/segments of an image, not pixel data.

        C2PA manifests live in PNG `caBX`/`jumb` chunks or JPEG APP11 segments.
        Pillow may not surface them through Image.info, so we parse the
        container ourselves and feed only those bytes to the text analyzer.
        """
        try:
            size = image_path.stat().st_size
            if size > self.RAW_METADATA_SCAN_LIMIT:
                return None, False
            data = image_path.read_bytes()
        except OSError:
            return None, False

        metadata_bytes = self._extract_metadata_blocks(data)
        lowered = metadata_bytes.lower()
        has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
        c2pa_res = self._analyze_c2pa_bytes(metadata_bytes)
        if c2pa_res:
            return c2pa_res, has_c2pa_container

        text = self._decode_metadata_bytes(metadata_bytes)
        return self._analyze_text(text, include_weak_markers=False), has_c2pa_container
tools/validator/logic.py:_analyze_c2pa_bytes / metadata extractors / _scan_raw_metadata

Audit points:

  • Executed only when "the official C2PA SDK could not read a manifest" and "PNG info/EXIF produced no hit", to avoid a full-file scan of every image
  • It scans the entire file's bytes; performance cost depends on file size; files > 128 MB are skipped directly
  • If a container is found but there is no AI marker → has_c2pa_container=True, and detect() attaches it as contextual information in the evidence without upgrading the status
  • This path has no defense against maliciously forged C2PA strings (see §5.3 — the signature verification done by the official SDK is the trustworthy path)

5.2.5 Text Rule Analysis

validator/logic.py · L394-L443
GitHub
    def _analyze_text(self, text: str, include_weak_markers: bool = True) -> Optional[Tuple[str, str]]:
        """
        Analyze a string for AI markers.
        Returns: (tool_name, found_marker) or None
        
        检测优先级:
        1. 先检查明确的软件签名(使用 regex 更严格匹配)
        2. 再检查生成参数指纹(如果 IPTC 标记,检查是否有 Google 证据,否则通用)
        """
        if not text:
            return None
        
        text_lower = text.lower()

        # 1. Check Explicit Software Names (按字典顺序,Midjourney 先)
        weak_patterns = {r"mj_"}
        for tool, patterns in self.AI_SOFTWARE_SIGNATURES.items():
            for pattern in patterns:
                if not include_weak_markers and pattern.pattern in weak_patterns:
                    continue
                if pattern.search(text_lower):
                    return tool, pattern.pattern

        # 2. Check Generation Parameter Fingerprints
        match_count = 0
        evidence = []
        detected_tool = None
        weak_evidence_found = []
        for pattern, tool_name in self.GENERATION_PARAM_FINGERPRINTS:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                if "IPTC/XMP Standard" in tool_name:
                    # 如果是通用 IPTC,检查是否有 Google 证据
                    if any(re.search(p, text_lower) for p in self.AI_SOFTWARE_SIGNATURES["Gemini (Google)"]):
                        return "Gemini (Google)", "IPTC/XMP with Google Evidence"
                    else:
                        detected_tool = "Generative AI (Unknown)"
                        evidence.append("IPTC/XMP Signature")
                else:
                    match_count += 1
                    evidence.append(pattern)
        
        if match_count >= 1:
            return "Stable Diffusion WebUI", "Generation Parameters Detected"

        if detected_tool:
            return detected_tool, "; ".join(evidence)

        return None
tools/validator/logic.py:MetaDataDetector._analyze_text

Audit points:

  • include_weak_markers=False is the file-name scanning mode: a two-character prefix like mj_ is far too easy to false-match (for example, mj_portrait.jpg), so it is suppressed in the file-name context
  • A parameter-fingerprint match of ≥ 1 upgrades to SD: a single Steps: or Sampler: is enough to determine SD — lenient, but it may produce false positives on non-AI images "where the user copied SD parameters into a comment"
  • IPTC attribution branch: trainedAlgorithmicMedia is attributed to Gemini when it co-occurs with a Google keyword, otherwise marked Generative AI (Unknown). It is never attributed to Midjourney / DALL-E

5.2.6 Visible Google/Gemini Watermark

validator/logic.py · L318-L393
GitHub
    def _detect_google_visible_watermark(self, image_path: Path) -> bool:
        """
        Detect the visible Google/Gemini sparkle mark often placed near the
        lower-right area of generated images. This is visual evidence, not a
        SynthID decoder.
        """
        try:
            with Image.open(image_path) as img:
                img = img.convert("RGB")
                scale = 512 / max(img.size)
                if scale < 1:
                    img = img.resize((round(img.width * scale), round(img.height * scale)))

                width, height = img.size
                pix = img.load()
                mask = set()
                for y in range(height // 2, height):
                    for x in range(width // 2, width):
                        r, g, b = pix[x, y]
                        saturation = max(r, g, b) - min(r, g, b)
                        luminance = (r * 299 + g * 587 + b * 114) // 1000
                        if 110 <= luminance <= 245 and saturation < 28:
                            if not (luminance > 235 and saturation < 8):
                                mask.add((x, y))

                seen = set()
                for pt in list(mask):
                    if pt in seen:
                        continue
                    stack = [pt]
                    seen.add(pt)
                    xs = []
                    ys = []
                    while stack:
                        x, y = stack.pop()
                        xs.append(x)
                        ys.append(y)
                        for nx in (x - 1, x, x + 1):
                            for ny in (y - 1, y, y + 1):
                                npt = (nx, ny)
                                if npt in mask and npt not in seen:
                                    seen.add(npt)
                                    stack.append(npt)

                    area = len(xs)
                    min_x, max_x = min(xs), max(xs)
                    min_y, max_y = min(ys), max(ys)
                    comp_w = max_x - min_x + 1
                    comp_h = max_y - min_y + 1
                    center_x = (min_x + max_x) / 2
                    center_y = (min_y + max_y) / 2
                    density = area / max(1, comp_w * comp_h)

                    if not (80 <= area <= 900):
                        continue
                    if not (14 <= comp_w <= 60 and 14 <= comp_h <= 80):
                        continue
                    if not (0.12 <= density <= 0.70):
                        continue
                    if center_x < width * 0.62 or center_y < height * 0.55:
                        continue

                    # A sparkle mark has a sparse center-heavy diamond shape.
                    mid_x = (min_x + max_x) / 2
                    mid_y = (min_y + max_y) / 2
                    near_center = sum(
                        1 for x, y in zip(xs, ys)
                        if abs(x - mid_x) <= comp_w * 0.25 and abs(y - mid_y) <= comp_h * 0.25
                    )
                    if near_center / area >= 0.18:
                        return True
        except Exception:
            return False

        return False
tools/validator/logic.py:MetaDataDetector._detect_google_visible_watermark

Audit points:

  • Not a SynthID decoder, a purely morphological discriminator: low saturation, medium-to-high brightness, lower-right quadrant, area 80–900 px, aspect-ratio constraint, dense center
  • The long edge is scaled to 512 px to normalize the criteria
  • Enabled only in a Google context (see step 5 of the §5.2.2 flow), to avoid running an O(W·H) scan on every image
  • Miss scenarios: a white-background image, the lower-right corner cropped out, or heavy recompression that fragments the sparkle's connected component
  • False-positive scenarios: the lower-right corner originally contains a low-saturation decorative element (moon, star, logo, etc.)

5.3 Official C2PA SDK Adapter

tools/validator/c2pa_verifier.py isolates the optional c2pa-python dependency into a separate module. When installed, it reads the manifest via the official SDK and requires the SDK to verify the manifest / signature chain.

5.3.1 C2PAVerificationResult

validator/c2pa_verifier.py · L17-L87
GitHub
AI_DIGITAL_SOURCE_MARKERS = {
    "trainedalgorithmicmedia",
    "compositewithtrainedalgorithmicmedia",
    "algorithmicmedia",
    "generated by ai",
    "ai generated",
    "adobe firefly",
    "google imagen",
    "synthid",
    "dall-e",
    "dalle",
    "midjourney",
    "stable diffusion",
}


@dataclass
class C2PAVerificationResult:
    available: bool
    has_manifest: bool = False
    verified: bool | None = None
    trusted: bool | None = None
    validation_state: str = ""
    validation_results: dict[str, Any] | None = None
    manifest_store: dict[str, Any] | None = None
    active_manifest: dict[str, Any] | None = None
    sdk_version: str = ""
    embedded: bool | None = None
    remote_url: str | None = None
    ai_markers: list[str] = field(default_factory=list)
    claim_generator: str = ""
    error: str = ""
    asset_format: str = ""
    extension_mismatch: bool = False
    validation_issues: list[str] = field(default_factory=list)

    @property
    def ai_generated(self) -> bool:
        return bool(self.ai_markers)

    def evidence_summary(self) -> str:
        if not self.has_manifest:
            if self.available:
                parts = ["No C2PA manifest found"]
                if self.asset_format:
                    parts.append(f"asset_format={self.asset_format}")
                if self.extension_mismatch:
                    parts.append("extension_mismatch=true")
                return "; ".join(parts)
            return self.error or "C2PA verifier unavailable"

        parts = ["C2PA manifest found"]
        if self.validation_state:
            parts.append(f"validation_state={self.validation_state}")
        if self.verified is not None:
            parts.append(f"signature_chain={'verified' if self.verified else 'failed'}")
        if self.trusted is not None:
            parts.append(f"trust={'trusted' if self.trusted else 'untrusted'}")
        if self.claim_generator:
            parts.append(f"claim_generator={self.claim_generator}")
        if self.ai_markers:
            parts.append("ai_markers=" + ",".join(self.ai_markers[:4]))
        if self.validation_issues:
            parts.append("validation_issues=" + ",".join(self.validation_issues[:4]))
        if self.remote_url:
            parts.append(f"remote_manifest={self.remote_url}")
        if self.sdk_version:
            parts.append(f"c2pa_sdk={self.sdk_version}")
        return "; ".join(parts)

tools/validator/c2pa_verifier.py:AI_DIGITAL_SOURCE_MARKERS / C2PAVerificationResult

Audit points:

  • verified / trusted use the three-valued bool | None: None = undeterminable, not a failure
  • ai_generated is a derived property: it is True whenever AI_DIGITAL_SOURCE_MARKERS matches any string at any depth in the manifest store, without requiring a signature / trust
  • evidence_summary() is a UI summary, listing at most 4 AI markers and 4 validation issues

5.3.2 The verify_c2pa_file Entry Point and SDK Configuration

validator/c2pa_verifier.py · L285-L388
GitHub
def _read_json(reader: Any) -> dict[str, Any]:
    raw = reader.json()
    if isinstance(raw, bytes):
        raw = raw.decode("utf-8", errors="replace")
    return json.loads(raw)


def _make_context(c2pa_module: Any) -> Any:
    settings = {
        "verify": {
            "verify_after_reading": True,
            "verify_trust": True,
            "verify_timestamp_trust": True,
            "ocsp_fetch": True,
            "remote_manifest_fetch": True,
        },
        "trust": {
            "verify_trust_list": True,
        },
    }
    context_cls = getattr(c2pa_module, "Context", None)
    if context_cls and hasattr(context_cls, "from_dict"):
        return context_cls.from_dict(settings)
    if hasattr(c2pa_module, "load_settings"):
        c2pa_module.load_settings(settings)
    return None


def _detect_asset_format(path: Path) -> tuple[str, bool]:
    suffix = path.suffix.lower()
    try:
        header = path.read_bytes()[:16]
    except OSError:
        return "", False

    asset_format = ""
    if header.startswith(b"\x89PNG\r\n\x1a\n"):
        asset_format = "png"
    elif header.startswith(b"\xff\xd8\xff"):
        asset_format = "jpeg"
    elif header[:4] == b"RIFF" and header[8:12] == b"WEBP":
        asset_format = "webp"

    expected_suffixes = {
        "png": {".png"},
        "jpeg": {".jpg", ".jpeg", ".jpe"},
        "webp": {".webp"},
    }
    mismatch = bool(asset_format and suffix and suffix not in expected_suffixes.get(asset_format, set()))
    return asset_format, mismatch


def verify_c2pa_file(image_path: str | Path) -> C2PAVerificationResult:
    path = Path(image_path)

    try:
        import c2pa
    except Exception as exc:
        return C2PAVerificationResult(
            available=False,
            error=f"c2pa-python unavailable: {exc}",
        )

    result = C2PAVerificationResult(available=True)
    try:
        result.asset_format, result.extension_mismatch = _detect_asset_format(path)
        result.sdk_version = str(c2pa.sdk_version()) if hasattr(c2pa, "sdk_version") else ""
        context = _make_context(c2pa)
        if result.extension_mismatch and result.asset_format:
            with path.open("rb") as stream:
                try:
                    reader = c2pa.Reader.try_create(result.asset_format, stream, None, context)
                except TypeError:
                    reader = c2pa.Reader.try_create(result.asset_format, stream)
        else:
            try:
                reader = c2pa.Reader.try_create(str(path), None, None, context)
            except TypeError:
                reader = c2pa.Reader.try_create(str(path))

        if reader is None:
            return result

        with reader:
            result.has_manifest = True
            result.manifest_store = _read_json(reader)
            result.validation_state = str(reader.get_validation_state() or "")
            result.validation_results = reader.get_validation_results() or None
            result.active_manifest = reader.get_active_manifest() or None
            result.embedded = bool(reader.is_embedded())
            result.remote_url = reader.get_remote_url() or None

        result.verified, result.trusted = _verification_flags(
            result.validation_state,
            result.validation_results,
        )
        result.validation_issues = _collect_validation_issues(result.validation_results)
        result.ai_markers = _find_ai_markers(result.manifest_store or {})
        result.claim_generator = _extract_claim_generator(result.active_manifest)
        return result

    except Exception as exc:
        result.error = str(exc)
        return result
tools/validator/c2pa_verifier.py:_make_context / _detect_asset_format / verify_c2pa_file

Audit points:

  • SDK fallback on extension mismatch: if the file header indicates JPEG but the suffix is .png, it switches to the Reader.try_create(format, stream, ...) streaming interface to tell the SDK the real format. Passing the path directly would make the SDK judge by the suffix and fail.
  • SDK configuration enabled: trust list, timestamp trust, OCSP revocation checking, and remote-manifest fetching are all turned on
  • SDK version compatibility: both try_create signatures are attempted (with / without context), to be compatible with different versions of c2pa-python
  • An empty reader is not an error: has_manifest stays False, available=True, and the error field is empty

5.3.3 Separation of Signature Chain and Trust State

validator/c2pa_verifier.py · L141-L284
GitHub
def _contains_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if key_lower in {"failure", "failures", "error", "errors"} and item:
                return True
            if _contains_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        return any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
    return False


def _contains_trust_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if "trust" in key_lower and _contains_failure(item):
                return True
            if _contains_trust_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_trust_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        return "trust" in lowered and any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
    return False


def _contains_non_trust_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if key_lower in {"failure", "failures", "error", "errors"} and item:
                if not _contains_only_trust_related(item):
                    return True
            if _contains_non_trust_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_non_trust_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        if any(token in lowered for token in ("invalid", "failure", "error")):
            return "trust" not in lowered and "untrusted" not in lowered
    return False


def _contains_only_trust_related(value: Any) -> bool:
    strings = list(_iter_strings(value))
    if not strings:
        return False
    for text in strings:
        lowered = text.lower()
        if any(token in lowered for token in ("invalid", "failure", "error")):
            if "trust" not in lowered and "untrusted" not in lowered:
                return False
    return any("trust" in text.lower() or "untrusted" in text.lower() for text in strings)


def _contains_trust_signal(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            if "trust" in str(key).lower():
                return True
            if _contains_trust_signal(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_trust_signal(item):
                return True
    elif isinstance(value, str):
        return "trust" in value.lower()
    return False


def _active_manifest_results(validation_results: dict[str, Any] | None) -> dict[str, Any] | None:
    if not isinstance(validation_results, dict):
        return None
    active = validation_results.get("activeManifest")
    return active if isinstance(active, dict) else None


def _has_validation_code(value: Any, code_fragment: str) -> bool:
    if isinstance(value, dict):
        code = value.get("code")
        if isinstance(code, str) and code_fragment in code:
            return True
        return any(_has_validation_code(item, code_fragment) for item in value.values())
    if isinstance(value, list):
        return any(_has_validation_code(item, code_fragment) for item in value)
    return False


def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
    issues: list[str] = []
    if not validation_results:
        return issues
    if _has_validation_code(validation_results, "ingredient.malformed"):
        issues.append("ingredient_malformed")
    if _has_validation_code(validation_results, "timeStamp.untrusted"):
        issues.append("timestamp_untrusted")
    return issues


def _verification_flags(validation_state: str, validation_results: dict[str, Any] | None) -> tuple[bool | None, bool | None]:
    state = (validation_state or "").lower()
    active_results = _active_manifest_results(validation_results) or validation_results
    has_failure = _contains_failure(active_results) if active_results else False
    has_non_trust_failure = _contains_non_trust_failure(active_results) if active_results else False
    has_trust_failure = _contains_trust_failure(active_results) if active_results else False
    has_trust_signal = _contains_trust_signal(active_results) if active_results else False
    active_signature_valid = _has_validation_code(active_results, "claimSignature.validated")
    active_data_hash_valid = _has_validation_code(active_results, "assertion.dataHash.match")

    verified: bool | None
    if active_signature_valid and active_data_hash_valid and not has_non_trust_failure:
        verified = True
    elif "valid" in state and "invalid" not in state and not has_non_trust_failure:
        verified = True
    elif "invalid" in state or (has_failure and has_non_trust_failure):
        verified = False
    else:
        verified = None

    trusted: bool | None
    if has_trust_failure:
        trusted = False
    elif verified is True and has_trust_signal:
        trusted = True
    else:
        trusted = None

    return verified, trusted

tools/validator/c2pa_verifier.py: failure / trust / verification helpers

Audit points:

  • Active manifest first: _active_manifest_results() first takes validation_results["activeManifest"], falling back to the full tree only if that is unavailable
  • "Signature valid + data hash valid" and "no non-trust-class failures"verified=True. Even if the validation_state field itself contains "invalid" (possibly from an issue in the ingredient chain), this does not lower the signature conclusion of the active manifest
  • Trust evaluated independently: a trust failure does not turn verified into False. The UI can obtain a combination like signature_chain=verified; trust=untrusted, indicating that the signature itself is verifiable but the signing certificate is not in the current trust list (common for Google and OpenAI, which are not yet in the default trust anchors)

5.3.4 AI Marker and claim_generator Extraction

validator/c2pa_verifier.py · L88-L140
GitHub
def _iter_strings(value: Any) -> Iterable[str]:
    if isinstance(value, str):
        yield value
    elif isinstance(value, dict):
        for key, item in value.items():
            yield str(key)
            yield from _iter_strings(item)
    elif isinstance(value, list):
        for item in value:
            yield from _iter_strings(item)


def _find_ai_markers(manifest_store: dict[str, Any]) -> list[str]:
    found: list[str] = []
    seen = set()
    for text in _iter_strings(manifest_store):
        lowered = text.lower()
        for marker in AI_DIGITAL_SOURCE_MARKERS:
            if marker in lowered and marker not in seen:
                seen.add(marker)
                found.append(marker)
    return found


def _extract_claim_generator(active_manifest: dict[str, Any] | None) -> str:
    if not active_manifest:
        return ""

    claim_generator = active_manifest.get("claim_generator")
    if isinstance(claim_generator, str):
        return claim_generator
    if isinstance(claim_generator, dict):
        name = claim_generator.get("name") or claim_generator.get("identifier")
        version = claim_generator.get("version")
        if name and version:
            return f"{name} {version}"
        if name:
            return str(name)

    infos = active_manifest.get("claim_generator_info")
    if isinstance(infos, list) and infos:
        first = infos[0]
        if isinstance(first, dict):
            name = first.get("name") or first.get("identifier")
            version = first.get("version")
            if name and version:
                return f"{name} {version}"
            if name:
                return str(name)

    return ""

tools/validator/c2pa_verifier.py:_iter_strings / _find_ai_markers / _extract_claim_generator

Audit points:

  • Recursively traverses all strings in the manifest store (including keys), matching case-insensitively
  • C2PA / IPTC semantics such as trainedalgorithmicmedia / algorithmicmedia / synthid are treated as strong evidence
  • Does not decode the invisible SynthID watermark; the presence of synthid in the manifest store represents a C2PA declaration that "the image contains SynthID", not that local SynthID decoding was performed
  • claim_generator is compatible with three forms: a string, a dict ({name, version} / {identifier, version}), and a claim_generator_info array

5.3.5 Validation Issues

validator/c2pa_verifier.py · L232-L253
GitHub
def _has_validation_code(value: Any, code_fragment: str) -> bool:
    if isinstance(value, dict):
        code = value.get("code")
        if isinstance(code, str) and code_fragment in code:
            return True
        return any(_has_validation_code(item, code_fragment) for item in value.values())
    if isinstance(value, list):
        return any(_has_validation_code(item, code_fragment) for item in value)
    return False


def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
    issues: list[str] = []
    if not validation_results:
        return issues
    if _has_validation_code(validation_results, "ingredient.malformed"):
        issues.append("ingredient_malformed")
    if _has_validation_code(validation_results, "timeStamp.untrusted"):
        issues.append("timestamp_untrusted")
    return issues

tools/validator/c2pa_verifier.py:_has_validation_code / _collect_validation_issues

A typical evidence-summary output:

text
C2PA manifest found;
validation_state=Invalid;
signature_chain=verified;
trust=untrusted;
ai_markers=algorithmicmedia,trainedalgorithmicmedia,synthid;
validation_issues=ingredient_malformed,timestamp_untrusted

Audit points:

  • validation_state=Invalid does not necessarily mean the current image data was tampered with. The active manifest's signature and data hash can be valid while the ingredient chain has an issue
  • The UI should present this as "signature chain valid / certificate not in the current trust list / link issue present", not as "signature failed"

5.4 Worker Layer

workers/ai_detector_worker.py · L1-L63
GitHub
"""AI Metadata Detection Worker"""
import logging
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication

_tr = QCoreApplication.translate

logger = logging.getLogger(__name__)


class AIDetectorWorker(QThread):
    """Worker thread for AI metadata detection."""

    progress = Signal(int, int, str)  # (current, total, filename)
    item_finished = Signal(str, str, str, str, str)  # (path, status, reason, tool, evidence)
    all_finished = Signal()
    model_status = Signal(str)  # Status for DynamicIsland

    def __init__(self, file_paths: list):
        super().__init__()
        self.file_paths = file_paths

    def run(self):
        """Execute metadata detection in background thread."""
        try:
            from .._utils import ensure_src_path
            ensure_src_path()
            
            from tools.validator.logic import MetaDataDetector
            detector = MetaDataDetector()

            self.model_status.emit(_tr("AIDetectorWorker", "扫描元数据..."))

            total = len(self.file_paths)
            logger.info("开始检测 %d 个文件", total)

            for i, path in enumerate(self.file_paths):
                if self.isInterruptionRequested():
                    break

                filename = Path(path).name
                self.progress.emit(i + 1, total, filename)

                try:
                    res = detector.detect(path)
                    self.item_finished.emit(
                        path,
                        res["status"],
                        res["reason"],
                        res["tool"] or "",
                        res["evidence"] or ""
                    )
                except Exception as e:
                    logger.error("[MetaDetector] 检测文件出错 %s: %s", path, e)
                    self.item_finished.emit(path, "error", _tr("AIDetectorWorker", "检测出错: %s") % str(e), "", "")

            logger.info("[MetaDetector] 检测完成,共 %d 个文件", total)

        except Exception as e:
            logger.error("[MetaDetector] Worker error: %s", e, exc_info=True)
        finally:
            self.all_finished.emit()
core/workers/ai_detector_worker.py (full file)

Audit points:

  • Per-file error isolation: an exception on a single file is caught and converted into a status="error" signal, without affecting subsequent files
  • Interruptible: isInterruptionRequested() allows the UI to cancel a batch task
  • Detector instance reuse: the whole batch shares one MetaDataDetector, but the class itself is stateless (__init__ is empty), so there is no risk of cross-file contamination
  • Signal payload: item_finished is emitted one image at a time, to avoid backlog

5.5 Evidence Grading Table

EvidenceLevelTriggers ai?
C2PA ai_markers (trainedAlgorithmicMedia / synthid / ...)StrongYes
ComfyUI workflow / prompt JSONStrongYes
Stable Diffusion parameters chunkStrongYes
NovelAI generation JSON (uc + sampler/steps/scale)StrongYes
InvokeAI / Fooocus metadataStrongYes
Midjourney Job ID / --ar / --v / --stylizeStrongYes
IPTC/XMP trainedAlgorithmicMediaStrongYes
Google visible sparkle watermark (under Google context)MediumYes
File name contains a platform wordWeakNo (at most unknown)
Extension mismatchContextualNo
Minimal JPEG metadataContextualNo
CELSYS / Clip Studio export markerContextualNo
No metadata at allNo evidenceNo

5.6 Test Coverage

tests/test_validator_c2pa.py currently covers:

  • C2PA raw payload AI marker
  • C2PA manifest with no AI marker does not false-positive
  • Official C2PA verifier mock-driven detection
  • Suppression of raw false positives when a Google C2PA image has no AI marker
  • Google / Gemini visible-watermark contextual detection
  • Midjourney Job ID and IPTC AI source
  • A file name alone for Gemini outputs only unknown
  • Platform recompression / a .png-suffixed JPEG does not upgrade to AI
  • A1111 Stable Diffusion parameters
  • NovelAI generation JSON
  • InvokeAI metadata
  • Fooocus metadata
  • The separated explanation of "active C2PA signature valid but the ingredient chain has an issue"

It is recommended to continuously add a real-sample regression set: downloaded images from platforms such as OpenAI, Gemini, Midjourney, ComfyUI, A1111, Forge, Fooocus, NovelAI, InvokeAI, Adobe Firefly, Tusi / Liblib / TensorArt, etc.


6. Threat Model Overview

6.1 Digital Timestamping

ThreatMitigationResidual Risk
User tampers with the original fileMerkle Tree root-hash verificationNone (tampering is always detected)
TSA private key leakedMulti-provider failoverA single TSA leak does not affect historical verification
Local JSON modifiedOptional AES-256 encryptionModifiable when unencrypted, but file-hash verification still exposes it
Merkle second-preimageNo domain-separation prefixDoes not resist adversarial collision construction
Author identity forgeryUser self-declarationauthor_name has no third-party verification

6.2 Infringement Evidence Capture

ThreatMitigationResidual Risk
Target page deletedImmediate capture + RFC 3161If deleted before capture, it cannot be recovered
Locally forged web pageTLS certificate captureOnly verifies the domain certificate, not content authenticity
Screenshot edited in Photoshopmanifest SHA-256A screenshot itself cannot prove "no Photoshopping"
Browser identified as a botstealth + visible fallbackSome platforms may still block
Missing HAR / certificateMulti-source collectionA single-point failure does not invalidate the whole package
Very long pageViewport screenshotfull_page=False; content below the fold is not captured

6.3 Invisible Watermark

ThreatMitigationResidual Risk
Hardcoded passwordCompile-time constantAll user instances share the same password pair; once reverse-engineered it can be extracted in bulk
Unauthenticated watermarkDual-password systemCannot prove "I embedded this watermark", only that "the image contains this text"
Forged watermarkPassword secrecyOnce the password is known, arbitrary text can be embedded and claimed to come from Nephele
Watermark removalQuantized embedding (d1=36)Heavy compression, rotation, and large-area cropping (> 50%) can destroy it
Silent truncationFixed-length 32-byte encodingOverly long text is silently truncated; the user may mistakenly believe it was fully embedded
Output despite verification failureround-trip checkOn mismatch only a warning is logged; the image is still output

注意

Within Nephele, the invisible watermark is positioned as an auxiliary provenance tool, not a cryptographic digital signature. Its core value lies in "increasing the cost for an image thief to remove the watermark", not in "providing unforgeable proof of ownership". If you need legal-grade proof of ownership, please use the digital timestamping feature.

6.4 AI Metadata Detection

Threat / ScenarioResult
Original ComfyUI PNGworkflow / prompt can be detected
Original A1111 PNGparameters can be detected
OpenAI / Google C2PA imageThe manifest, AI markers, signature chain, and trust state can be read
Midjourney retaining Job ID / XMPStrong evidence can be detected
Platform-recompressed image (downloaded from Weibo / Twitter / Xiaohongshu)Can only flag insufficient credentials, status=human (distinguished by reason)
ScreenshotOriginal metadata is usually lost, undeterminable
Maliciously cleaned metadataDeleted evidence cannot be recovered
Maliciously forged non-C2PA text metadataNo cryptographic authenticity guarantee; may false-positive
Only the visual style looks like AINot judged
Non-AI image where the user copied SD parameters into a commentMay false-positive as ai
Non-AI image with a low-saturation decoration in the lower-right corner (under Google context)May trigger a sparkle false positive

注意

The audit conclusion for this feature is: it is suitable as an AI-generation-credential and metadata screening tool, and should not be promoted as a general-purpose AI image authenticity detector. "Not detected" is not equivalent to "not AI-generated".


7. Dependency List and Degradation Behavior

LibraryPurposeBehavior When Missing
rfc3161ngTSA communicationTSA fully unavailable; forced downgrade to a local .json
asn1cryptoTSR parsingFalls back to the local clock and provider_name
pyzipper.nep AES-256The password has no effect; standard ZIP
PillowThumbnail / image I/O / metadata readingTimestamping flow blocked / packaging blocked / AI detection blocked
reportlabPDF reportTimestamping flow blocked
qrcodePDF QR codeFalls back to a plain-text URL
playwrightBrowser evidence captureFeature entirely unavailable
blind_watermarkInvisible-watermark DWT embedding / extractionInvisible-watermark feature entirely unavailable; returns the original image
numpyInvisible-watermark bit-array conversion / image processingInvisible-watermark feature entirely unavailable
pywtWavelet transform (transitive dependency of blind_watermark)Invisible-watermark feature entirely unavailable
c2pa-pythonOfficial C2PA SDKFalls back to a byte-level scan (no signature verification); evidence notes "official C2PA verifier unavailable"

8. Privacy and Network Behavior

8.1 Default Network Paths

FeatureNetwork ActionCan It Be Disabled?
Digital timestamping (TSA)Sends a SHA-256 digest request to DigiCert / FreeTSA / IdenTrustCan switch back to a purely local timestamp (degraded)
Infringement evidence captureVia Playwright, makes HTTPS requests, a TLS handshake, and DNS resolution to the target siteNo (the feature is networked evidence capture by nature)
AI metadata detection (C2PA SDK)ocsp_fetch=True + remote_manifest_fetch=TrueCurrently hardcoded on via SDK settings, with no UI toggle
Invisible watermarkNone

8.2 Offline Notes

  • Digital timestamping: only a hash value is sent to the TSA — the original file content is not sent.

  • Infringement evidence capture: by design it makes a full request to the target URL; that is the evidence capture itself.

  • AI metadata detection: regular PNG info / EXIF / byte scanning is done locally. However, while verifying the signature chain, the C2PA SDK may:

    • Fetch a remote manifest (remote_manifest_fetch)
    • Check certificate revocation (ocsp_fetch)
    • Verify timestamp trust (verify_timestamp_trust)

    Therefore product copy should not blanketly claim that "C2PA verification never goes online". If a user needs a strict offline mode, a toggle to disable remote manifest / OCSP should be provided (not yet implemented).

  • Invisible watermark: entirely local; neither embedding nor extraction touches the network.

Last updated Jun 21, 2026·Applies to v0.5.2-beta