Files
document_analyzer/skills/conflict_detection_skill/scripts/LLM.py
pzhang_zywl fec4c09ee0
CI / test (push) Successful in 8s
sync: update all skills from latest workspace code
doc_parser_skill:
- New: verify_flowchart.py (flowchart validation)
- Updated: LLM.py (multi-provider: DeepSeek + DashScope)
- Updated: image_parser.py (logic tree support, external prompts)
- Updated: SKILL.md, prompts/image_prompt.md

conflict_detection_skill:
- Updated: LLM.py (multi-provider sync)
- Updated: detect_conflicts.py (logic tree text conversion)

ir_generation_skill:
- Replaced old scripts/LLM.py + ir_generator.py with standalone project
- New: main.py, config.py, step1-3_*.py, ensemble_merge.py
- New: prompts/, tests/ subdirectories

tests:
- New: acceptance/ test suite with schema validation
- Fixed: conftest no longer globally skips non-acceptance tests
- Updated: test_sample.py for new ir_generation structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-30 22:45:08 +08:00

181 lines
6.4 KiB
Python

import logging
import os
import time
from pathlib import Path
from typing import Optional
from openai import OpenAI
logger = logging.getLogger(__name__)
# Resolve secrets file: priority 1) env OPENCLAW_SECRETS,
# 2) workspace-document-analyzer/config/ (relative to skills dir),
# 3) .openclaw/config/
_SECRETS_FILE = None
for _candidate in (
os.environ.get("OPENCLAW_SECRETS", ""),
Path(__file__).resolve().parents[3] / "config" / "secrets.yaml",
Path(__file__).resolve().parents[5] / ".openclaw" / "config" / "secrets.yaml",
):
if _candidate and Path(_candidate).exists():
_SECRETS_FILE = Path(_candidate)
break
if _SECRETS_FILE is None:
_SECRETS_FILE = Path("") # empty fallback
def _load_secrets() -> dict:
"""Load API keys from secrets.yaml, with env-var overrides."""
secrets = {}
if _SECRETS_FILE.exists():
try:
import yaml
with open(_SECRETS_FILE, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
for provider in ("deepseek", "dashscope"):
if provider in data and isinstance(data[provider], dict):
secrets[provider] = data[provider]
except ImportError:
logger.warning("pyyaml not installed, cannot read %s", _SECRETS_FILE)
except Exception as e:
logger.warning("Failed to load %s: %s", _SECRETS_FILE, e)
# Env overrides
dk_env = os.environ.get("DEEPSEEK_API_KEY", "")
ds_env = os.environ.get("DASHSCOPE_API_KEY", "")
if dk_env:
secrets.setdefault("deepseek", {})["apiKey"] = dk_env
if ds_env:
secrets.setdefault("dashscope", {})["apiKey"] = ds_env
return secrets
class LLMClient:
"""Multi-provider LLM client with retry and token tracking.
Routes text models to DeepSeek, vision models to DashScope (Bailian).
Reads API keys from openclaw config/secrets.yaml, with env-var overrides.
Usage::
llm = LLMClient()
content = llm.chat("deepseek-v4-pro", [{"role": "user", "content": "Hello"}])
print(llm.usage)
"""
IMAGE_MODEL = "qwen3-vl-plus"
TEXT_MODEL = "deepseek-v4-flash"
DASHSCOPE_BASE = "https://dashscope.aliyuncs.com/compatible-mode/v1"
DEEPSEEK_BASE = "https://api.deepseek.com/v1"
TIMEOUT = 120
MAX_RETRIES = 3
_VISION_KEYWORDS = ("vl", "vision", "qwen-vl", "qwen3-vl")
def __init__(
self,
*,
timeout: int | None = None,
):
secrets = _load_secrets()
ds_cfg = secrets.get("dashscope", {})
dk_cfg = secrets.get("deepseek", {})
dashscope_key = ds_cfg.get("apiKey", "")
dashscope_url = ds_cfg.get("baseUrl", self.DASHSCOPE_BASE)
deepseek_key = dk_cfg.get("apiKey", "")
deepseek_url = dk_cfg.get("baseUrl", self.DEEPSEEK_BASE)
self._ds_client = OpenAI(api_key=dashscope_key, base_url=dashscope_url) if dashscope_key else None
self._dk_client = OpenAI(api_key=deepseek_key, base_url=deepseek_url) if deepseek_key else None
self._timeout = timeout or self.TIMEOUT
self._prompt_tokens = 0
self._completion_tokens = 0
@property
def usage(self) -> dict:
"""Return accumulated token counts as ``{prompt, completion, total}``."""
return {
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"total_tokens": self._prompt_tokens + self._completion_tokens,
}
@staticmethod
def estimate_tokens(text: str) -> int:
"""Quick token estimate. CJK ≈1.7/token, others ≈3.0/token."""
cjk = sum(1 for c in text if '\u4e00' <= c <= '\u9fff' or '\u3000' <= c <= '\u303f')
other = len(text) - cjk
return max(1, int(cjk / 1.7 + other / 3.0))
@staticmethod
def estimate_image_tokens() -> int:
"""Fixed estimate for one vision-model image (~500 tokens)."""
return 500
@staticmethod
def _is_vision_model(model: str) -> bool:
return any(kw in model.lower() for kw in LLMClient._VISION_KEYWORDS)
def _get_client(self, model: str) -> OpenAI:
if self._is_vision_model(model):
if self._ds_client is None:
raise ValueError("DASHSCOPE_API_KEY not set but required for vision model")
return self._ds_client
else:
if self._dk_client is None:
raise ValueError("DEEPSEEK_API_KEY not set but required for text model")
return self._dk_client
def chat(
self, model: str, messages: list[dict], *, timeout: int | None = None,
response_format: dict | None = None,
) -> str:
"""Send a chat completion request and return the response content.
Automatically retries on failure and accumulates token usage.
Routes to DeepSeek for text, DashScope for vision.
"""
label = f"chat({model})"
client = self._get_client(model)
def _call():
t0 = time.time()
kwargs = dict(model=model, messages=messages, timeout=timeout or self._timeout)
if response_format is not None:
kwargs["response_format"] = response_format
kwargs["temperature"] = 0
resp = client.chat.completions.create(**kwargs)
content = resp.choices[0].message.content
usg = resp.usage
if usg:
self._prompt_tokens += usg.prompt_tokens
self._completion_tokens += usg.completion_tokens
elapsed = time.time() - t0
logger.info("%s: %d chars in %.1fs", label, len(content) if content else 0, elapsed)
if not content:
raise RuntimeError("Empty response from LLM")
return content
return self._retry(_call, label)
def _retry(self, fn, label: str) -> str:
"""Call *fn()* with exponential-backoff retry."""
last_error: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
return fn()
except Exception as e:
last_error = e
logger.warning(
"%s error (attempt %d/%d): %s",
label, attempt + 1, self.MAX_RETRIES, e,
)
if attempt < self.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
raise RuntimeError(f"{label}: all retries exhausted") from last_error