"""Helper for QE/Dev agents to interact with Gitea issues and PRs. Usage: python scripts/agent_poller.py --action list python scripts/agent_poller.py --action list --labels test-code python scripts/agent_poller.py --action get --issue 1 python scripts/agent_poller.py --action comment --issue 1 --body "Working on this" python scripts/agent_poller.py --action create-issue --title "My issue" --labels test-code --body "..." python scripts/agent_poller.py --action create-pr --issue 1 --branch test/issue-1 python scripts/agent_poller.py --action pr-status --pr 4 python scripts/agent_poller.py --action merge-pr --pr 4 python scripts/agent_poller.py --action close-issue --issue 2 --body "Done" python scripts/agent_poller.py --action lifecycle --issue 2 """ import argparse import json import os import re import sys import urllib.request import urllib.error GITEA_URL = os.environ.get("GITEA_URL", "https://git.zywl.me") GITEA_REPO = os.environ.get("GITEA_REPO", "zeekrAI/document_analyzer") GITEA_TOKEN = os.environ.get("GITEA_API_TOKEN", "") DEV_AGENT_ID = os.environ.get("DEV_AGENT_ID", "da-01") QE_AGENT_ID = os.environ.get("QE_AGENT_ID", "") # Signature appended to all comments / PR bodies if QE_AGENT_ID: AGENT_ID = QE_AGENT_ID AGENT_SIG = f"\n\n---\n[qe-agent: {QE_AGENT_ID}]" else: AGENT_ID = DEV_AGENT_ID AGENT_SIG = f"\n\n---\n[{DEV_AGENT_ID}]" BASE = f"{GITEA_URL}/api/v1/repos/{GITEA_REPO}" def _req(method, path, data=None): url = f"{BASE}{path}" payload = json.dumps(data).encode("utf-8") if data else None req = urllib.request.Request(url, data=payload, method=method) req.add_header("Authorization", f"token {GITEA_TOKEN}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req) as resp: raw = resp.read() if not raw: return {} # Gitea merge returns 200 with empty body return json.loads(raw) except urllib.error.HTTPError as e: body = e.read().decode() print(f"API Error {e.code}: {body}", file=sys.stderr) sys.exit(1) def _req_safe(method, path, data=None): """Like _req but returns None on HTTPError instead of crashing. Used for probing issue/PR existence where the caller can handle absence. """ url = f"{BASE}{path}" payload = json.dumps(data).encode("utf-8") if data else None req = urllib.request.Request(url, data=payload, method=method) req.add_header("Authorization", f"token {GITEA_TOKEN}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req) as resp: raw = resp.read() if not raw: return {} return json.loads(raw) except urllib.error.HTTPError as e: body = e.read().decode() print(f"API Error {e.code}: {body}", file=sys.stderr) return None # ── Issue operations ───────────────────────────────────────────────────────── def list_issues(labels: list[str] | None = None): url = "/issues?state=open" if labels: for lb in labels: url += f"&labels={lb}" issues = _req("GET", url) if not issues: label_hint = f" (filtered by {labels})" if labels else "" print(f"No open issues found{label_hint}.") return [] for i in issues: issue_labels = [l["name"] for l in i.get("labels", [])] print(f"#{i['number']} [{', '.join(issue_labels) if issue_labels else 'no label'}] {i['title']}") return issues def _get_blocking_refs(issue_num: int) -> set[int]: """Extract all issue references from an issue body + comments. Scans both the issue body and all comments for #N patterns, returning a set of referenced issue numbers. """ refs: set[int] = set() # Body issue = _req_safe("GET", f"/issues/{issue_num}") if issue is None: return refs # API error → return empty set, keep blocked body = issue.get("body", "") or "" refs.update(int(m.group(1)) for m in re.finditer(r'#(\d+)', body)) # Comments comments = _req_safe("GET", f"/issues/{issue_num}/comments") if comments: for c in comments: cbody = c.get("body", "") or "" refs.update(int(m.group(1)) for m in re.finditer(r'#(\d+)', cbody)) return refs def blocked_check(): """Check all blocked issues: if blocking issues are now closed, unblock. Scans issue body + comments for blocking references. If no references found or all referenced issues are closed, removes the 'blocked' label. """ all_blocked = _req_safe("GET", "/issues?state=open&labels=blocked") if not all_blocked: print("No blocked issues found.") return unblocked_count = 0 for issue in all_blocked: blocking_nums = _get_blocking_refs(issue["number"]) all_resolved = True for blk in blocking_nums: blk_issue = _req_safe("GET", f"/issues/{blk}") if blk_issue is None: all_resolved = False # API error → keep blocked break if blk_issue.get("state") != "closed": all_resolved = False break if all_resolved: current_label_names = [l["name"] for l in issue.get("labels", [])] new_label_names = [l for l in current_label_names if l != "blocked"] new_label_ids = _label_names_to_ids(new_label_names) _req("PUT", f"/issues/{issue['number']}/labels", {"labels": new_label_ids}) reason = "所有阻塞 Issue 均已关闭" if blocking_nums else "无阻塞引用,移除残留 blocked 标签" print(f"Unblocked #{issue['number']}: {issue['title']}") comment_issue(issue["number"], f"阻塞已解除:{reason}。") unblocked_count += 1 if unblocked_count == 0: print(f"Checked {len(all_blocked)} blocked issue(s): still blocked.") def get_issue(num): i = _req("GET", f"/issues/{num}") print(f"## #{i['number']}: {i['title']}") print(f"State: {i['state']}") labels = [l["name"] for l in i.get("labels", [])] print(f"Labels: {', '.join(labels) if labels else 'none'}") print() print(i.get("body", "(no description)")) return i def comment_issue(num, body): i = _req("POST", f"/issues/{num}/comments", {"body": body + AGENT_SIG}) print(f"Comment added to #{num}") return i def close_issue(num, body=None): """Close an issue, optionally with a final comment (signature auto-appended). After closing, automatically unblocks any issues that were blocked by this one if no other blocking issues remain open. """ if body: comment_issue(num, body) # comment_issue already appends AGENT_SIG i = _req("PATCH", f"/issues/{num}", {"state": "closed"}) print(f"Issue #{num} closed") _unblock_issues_blocked_by(num) return i def reopen_issue(num, body=None): """Reopen a closed issue, optionally with a reason comment.""" if body: comment_issue(num, f"## REOPEN\n\n{body}") i = _req("PATCH", f"/issues/{num}", {"state": "open"}) print(f"Issue #{num} reopened") return i def _unblock_issues_blocked_by(closed_num): """Check issues blocked by *closed_num* and unblock if all blockers resolved. Scans both body and comments for #N references. If *closed_num* appears in any blocked issue and all referenced issues are now closed, removes the 'blocked' label and comments on the unblocked issue. """ all_blocked = _req_safe("GET", "/issues?state=open&labels=blocked") if not all_blocked: return for issue in all_blocked: blocking_nums = _get_blocking_refs(issue["number"]) if closed_num not in blocking_nums: continue # Check all referenced issues — are they all closed? all_resolved = True for blk in blocking_nums: if blk == closed_num: continue blk_issue = _req_safe("GET", f"/issues/{blk}") if blk_issue is None: all_resolved = False # API error → keep blocked break if blk_issue.get("state") != "closed": all_resolved = False break if all_resolved: current_label_names = [l["name"] for l in issue.get("labels", [])] new_label_names = [l for l in current_label_names if l != "blocked"] new_label_ids = _label_names_to_ids(new_label_names) _req("PUT", f"/issues/{issue['number']}/labels", {"labels": new_label_ids}) print(f" -> Unblocked #{issue['number']}: all blocking issues resolved") comment_issue(issue["number"], f"阻塞已解除:#{closed_num} 及其他阻塞 Issue 均已关闭。") def create_issue(title, body=None, labels=None): """Create a new Gitea issue. Labels convention (per project rules): - Product/feature issues → product-code - Test code issues → test-code """ payload = {"title": title} if body: payload["body"] = body + AGENT_SIG if labels: label_names = [l.strip() for l in labels.split(",") if l.strip()] # Gitea 1.22 expects label IDs (int64). Resolve names → IDs. label_ids = _label_names_to_ids(label_names) if label_ids: payload["labels"] = label_ids i = _req("POST", "/issues", payload) issue_labels = [l["name"] for l in i.get("labels", [])] print(f"Issue #{i['number']} created: {i['title']}") if issue_labels: print(f"Labels: {', '.join(issue_labels)}") print(f"URL: {i.get('html_url', i.get('url', ''))}") return i def _label_names_to_ids(names: list[str]) -> list[int]: """Resolve label names to Gitea label IDs. Returns empty list on failure.""" try: all_labels = _req("GET", "/labels") name_to_id = {l["name"]: l["id"] for l in all_labels} ids = [] for name in names: if name in name_to_id: ids.append(name_to_id[name]) else: print(f"Warning: label '{name}' not found, skipping", file=sys.stderr) return ids except SystemExit: return [] # ── PR operations ──────────────────────────────────────────────────────────── def create_pr(issue_num, branch, body=None): """Create a PR for the given issue and branch.""" issue = _req("GET", f"/issues/{issue_num}") title = f"fix: {issue['title']} - Closes #{issue_num}" if body is None: body = f"Closes #{issue_num}\n\n{issue.get('body', '')}" body += AGENT_SIG pr = _req("POST", "/pulls", { "title": title, "head": branch, "base": "main", "body": body, }) pr_url = pr.get('html_url', pr.get('url', '')) print(f"PR created: {pr_url}") return pr def pr_status(pr_num): """Check PR state and CI status.""" pr = _req("GET", f"/pulls/{pr_num}") print(f"PR #{pr['number']}: {pr['title']}") print(f"State: {pr['state']}") print(f"Merged: {pr.get('merged', False)}") print(f"Mergeable: {pr.get('mergeable', 'unknown')}") print(f"URL: {pr['html_url']}") # CI status sha = pr.get("head", {}).get("sha", "") if sha: try: status = _req("GET", f"/commits/{sha}/status") print(f"CI Status: {status.get('state', 'pending')}") for s in status.get('statuses', []): desc = s.get('description', '') print(f" - {s.get('context')}: {s.get('state')} ({desc})") except SystemExit: print("CI Status: no statuses found") return pr def merge_pr(pr_num): """Merge a PR. Fails if CI hasn't passed or PR is not mergeable.""" pr = _req("GET", f"/pulls/{pr_num}") if pr.get("state") == "closed": if pr.get("merged"): print(f"PR #{pr_num} already merged") return pr else: print(f"PR #{pr_num} is closed (not merged)", file=sys.stderr) sys.exit(1) # Check CI sha = pr.get("head", {}).get("sha", "") ci_passed = True if sha: try: status = _req("GET", f"/commits/{sha}/status") ci_state = status.get("state", "pending") if ci_state in ("failure", "error"): ci_passed = False print(f"CI status: {ci_state} — cannot merge", file=sys.stderr) for s in status.get('statuses', []): print(f" {s.get('context')}: {s.get('state')}", file=sys.stderr) except SystemExit: pass # No statuses, proceed if not ci_passed: print("Merge blocked: CI has not passed", file=sys.stderr) sys.exit(1) result = _req("POST", f"/pulls/{pr_num}/merge", {"Do": "merge"}) # Verify merge success by re-checking PR state pr_after = _req("GET", f"/pulls/{pr_num}") if pr_after.get("merged"): print(f"PR #{pr_num} merged successfully") elif result.get("merged"): print(f"PR #{pr_num} merged successfully") else: print(f"Merge result: {result.get('message', 'unknown')}") return result # ── Lifecycle management ───────────────────────────────────────────────────── def lifecycle(issue_num): """Print the full lifecycle status for an issue: branch, PR, CI, merge.""" print(f"=== Issue #{issue_num} Lifecycle ===\n") issue = _req("GET", f"/issues/{issue_num}") print(f"Issue: {issue['title']}") print(f"State: {issue['state']}\n") # Find associated PRs prs = _req("GET", "/pulls?state=all") related = [p for p in prs if f"Closes #{issue_num}" in p.get("body", "") or f"#{issue_num}" in p.get("title", "")] if related: for pr in related: print(f"PR #{pr['number']}: {pr['state']} (merged={pr.get('merged', False)})") pr_status(pr["number"]) else: print("No associated PR found.") # ── CLI ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Dev agent Gitea helper") parser.add_argument("--action", required=True, choices=["list", "get", "comment", "close-issue", "create-issue", "reopen-issue", "create-pr", "pr-status", "merge-pr", "lifecycle", "blocked-check"]) parser.add_argument("--issue", type=int) parser.add_argument("--pr", type=int) parser.add_argument("--title", help="Issue title (for 'create-issue' action)") parser.add_argument("--branch") parser.add_argument("--body") parser.add_argument("--labels", help="Comma-separated labels (filter for 'list', assign for 'create-issue')") args = parser.parse_args() if not GITEA_TOKEN: print("Error: GITEA_API_TOKEN environment variable is not set.", file=sys.stderr) print("Set it via: export GITEA_API_TOKEN=your-token", file=sys.stderr) sys.exit(1) if args.action == "list": label_filter = [l.strip() for l in args.labels.split(",") if l.strip()] if args.labels else None list_issues(label_filter) elif args.action == "get": if not args.issue: print("--issue is required for 'get' action", file=sys.stderr) sys.exit(1) get_issue(args.issue) elif args.action == "comment": if not args.issue or not args.body: print("--issue and --body are required for 'comment' action", file=sys.stderr) sys.exit(1) comment_issue(args.issue, args.body) elif args.action == "close-issue": if not args.issue: print("--issue is required for 'close-issue' action", file=sys.stderr) sys.exit(1) close_issue(args.issue, args.body) elif args.action == "create-issue": if not args.title: print("--title is required for 'create-issue' action", file=sys.stderr) sys.exit(1) create_issue(args.title, args.body, args.labels) elif args.action == "reopen-issue": if not args.issue: print("--issue is required for 'reopen-issue' action", file=sys.stderr) sys.exit(1) reopen_issue(args.issue, args.body) elif args.action == "create-pr": if not args.issue or not args.branch: print("--issue and --branch are required for 'create-pr' action", file=sys.stderr) sys.exit(1) create_pr(args.issue, args.branch, args.body) elif args.action == "pr-status": if not args.pr: print("--pr is required for 'pr-status' action", file=sys.stderr) sys.exit(1) pr_status(args.pr) elif args.action == "merge-pr": if not args.pr: print("--pr is required for 'merge-pr' action", file=sys.stderr) sys.exit(1) merge_pr(args.pr) elif args.action == "blocked-check": blocked_check() elif args.action == "lifecycle": if not args.issue: print("--issue is required for 'lifecycle' action", file=sys.stderr) sys.exit(1) lifecycle(args.issue) if __name__ == "__main__": main()