import logging import os import time from pathlib import Path from typing import Optional from openai import OpenAI logger = logging.getLogger(__name__) # Resolve secrets file: priority 1) env OPENCLAW_SECRETS, # 2) workspace-document-analyzer/config/ (relative to skills dir), # 3) .openclaw/config/ _SECRETS_FILE = None for _candidate in ( os.environ.get("OPENCLAW_SECRETS", ""), Path(__file__).resolve().parents[3] / "config" / "secrets.yaml", Path(__file__).resolve().parents[5] / ".openclaw" / "config" / "secrets.yaml", ): if _candidate and Path(_candidate).exists(): _SECRETS_FILE = Path(_candidate) break if _SECRETS_FILE is None: _SECRETS_FILE = Path("") # empty fallback def _load_secrets() -> dict: """Load API keys from secrets.yaml, with env-var overrides.""" secrets = {} if _SECRETS_FILE.exists(): try: import yaml with open(_SECRETS_FILE, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} for provider in ("deepseek", "dashscope"): if provider in data and isinstance(data[provider], dict): secrets[provider] = data[provider] except ImportError: logger.warning("pyyaml not installed, cannot read %s", _SECRETS_FILE) except Exception as e: logger.warning("Failed to load %s: %s", _SECRETS_FILE, e) # Env overrides dk_env = os.environ.get("DEEPSEEK_API_KEY", "") ds_env = os.environ.get("DASHSCOPE_API_KEY", "") if dk_env: secrets.setdefault("deepseek", {})["apiKey"] = dk_env if ds_env: secrets.setdefault("dashscope", {})["apiKey"] = ds_env return secrets class LLMClient: """Multi-provider LLM client with retry and token tracking. Routes text models to DeepSeek, vision models to DashScope (Bailian). Reads API keys from openclaw config/secrets.yaml, with env-var overrides. Usage:: llm = LLMClient() content = llm.chat("deepseek-v4-pro", [{"role": "user", "content": "Hello"}]) print(llm.usage) """ IMAGE_MODEL = "qwen3.6-flash" TEXT_MODEL = "deepseek-v4-flash" DASHSCOPE_BASE = "https://dashscope.aliyuncs.com/compatible-mode/v1" DEEPSEEK_BASE = "https://api.deepseek.com/v1" TIMEOUT = 120 MAX_RETRIES = 3 _VISION_KEYWORDS = ("vl", "vision", "qwen-vl", "qwen3-vl", "qwen3.6") def __init__( self, *, timeout: int | None = None, ): secrets = _load_secrets() ds_cfg = secrets.get("dashscope", {}) dk_cfg = secrets.get("deepseek", {}) dashscope_key = ds_cfg.get("apiKey", "") dashscope_url = ds_cfg.get("baseUrl", self.DASHSCOPE_BASE) deepseek_key = dk_cfg.get("apiKey", "") deepseek_url = dk_cfg.get("baseUrl", self.DEEPSEEK_BASE) self._ds_client = OpenAI(api_key=dashscope_key, base_url=dashscope_url) if dashscope_key else None self._dk_client = OpenAI(api_key=deepseek_key, base_url=deepseek_url) if deepseek_key else None self._timeout = timeout or self.TIMEOUT self._prompt_tokens = 0 self._completion_tokens = 0 @property def usage(self) -> dict: """Return accumulated token counts as ``{prompt, completion, total}``.""" return { "prompt_tokens": self._prompt_tokens, "completion_tokens": self._completion_tokens, "total_tokens": self._prompt_tokens + self._completion_tokens, } @staticmethod def estimate_tokens(text: str) -> int: """Quick token estimate. CJK ≈1.7/token, others ≈3.0/token.""" cjk = sum(1 for c in text if '\u4e00' <= c <= '\u9fff' or '\u3000' <= c <= '\u303f') other = len(text) - cjk return max(1, int(cjk / 1.7 + other / 3.0)) @staticmethod def estimate_image_tokens() -> int: """Fixed estimate for one vision-model image (~500 tokens).""" return 500 @staticmethod def _is_vision_model(model: str) -> bool: return any(kw in model.lower() for kw in LLMClient._VISION_KEYWORDS) def _get_client(self, model: str) -> OpenAI: if self._is_vision_model(model): if self._ds_client is None: raise ValueError("DASHSCOPE_API_KEY not set but required for vision model") return self._ds_client else: if self._dk_client is None: raise ValueError("DEEPSEEK_API_KEY not set but required for text model") return self._dk_client def chat( self, model: str, messages: list[dict], *, timeout: int | None = None, response_format: dict | None = None, ) -> str: """Send a chat completion request and return the response content. Automatically retries on failure and accumulates token usage. Routes to DeepSeek for text, DashScope for vision. """ label = f"chat({model})" client = self._get_client(model) def _call(): t0 = time.time() kwargs = dict(model=model, messages=messages, timeout=timeout or self._timeout) if response_format is not None: kwargs["response_format"] = response_format kwargs["temperature"] = 0 resp = client.chat.completions.create(**kwargs) content = resp.choices[0].message.content usg = resp.usage if usg: self._prompt_tokens += usg.prompt_tokens self._completion_tokens += usg.completion_tokens elapsed = time.time() - t0 logger.info("%s: %d chars in %.1fs", label, len(content) if content else 0, elapsed) if not content: raise RuntimeError("Empty response from LLM") return content return self._retry(_call, label) def _retry(self, fn, label: str) -> str: """Call *fn()* with exponential-backoff retry.""" last_error: Optional[Exception] = None for attempt in range(self.MAX_RETRIES): try: return fn() except Exception as e: last_error = e logger.warning( "%s error (attempt %d/%d): %s", label, attempt + 1, self.MAX_RETRIES, e, ) if attempt < self.MAX_RETRIES - 1: time.sleep(2 ** attempt) raise RuntimeError(f"{label}: all retries exhausted") from last_error