d73da7cda9
- 新增 _req_safe():API 错误返回 None 而非 sys.exit(1) - blocked_check / _unblock_issues_blocked_by / _get_blocking_refs 改用 _req_safe - API 失败时保守处理:保持 blocked 状态(不误解除) - 验证:#18 正确识别被 #57 阻塞 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
454 lines
17 KiB
Python
454 lines
17 KiB
Python
"""Helper for QE/Dev agents to interact with Gitea issues and PRs.
|
|
|
|
Usage:
|
|
python scripts/agent_poller.py --action list
|
|
python scripts/agent_poller.py --action list --labels test-code
|
|
python scripts/agent_poller.py --action get --issue 1
|
|
python scripts/agent_poller.py --action comment --issue 1 --body "Working on this"
|
|
python scripts/agent_poller.py --action create-issue --title "My issue" --labels test-code --body "..."
|
|
python scripts/agent_poller.py --action create-pr --issue 1 --branch test/issue-1
|
|
python scripts/agent_poller.py --action pr-status --pr 4
|
|
python scripts/agent_poller.py --action merge-pr --pr 4
|
|
python scripts/agent_poller.py --action close-issue --issue 2 --body "Done"
|
|
python scripts/agent_poller.py --action lifecycle --issue 2
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
GITEA_URL = os.environ.get("GITEA_URL", "http://localhost:3000")
|
|
GITEA_REPO = os.environ.get("GITEA_REPO", "pzhang_zywl/document_analyzer")
|
|
GITEA_TOKEN = os.environ.get("GITEA_API_TOKEN", "")
|
|
DEV_AGENT_ID = os.environ.get("DEV_AGENT_ID", "da-01")
|
|
QE_AGENT_ID = os.environ.get("QE_AGENT_ID", "")
|
|
|
|
# Signature appended to all comments / PR bodies
|
|
if QE_AGENT_ID:
|
|
AGENT_ID = QE_AGENT_ID
|
|
AGENT_SIG = f"\n\n---\n[qe-agent: {QE_AGENT_ID}]"
|
|
else:
|
|
AGENT_ID = DEV_AGENT_ID
|
|
AGENT_SIG = f"\n\n---\n[{DEV_AGENT_ID}]"
|
|
|
|
BASE = f"{GITEA_URL}/api/v1/repos/{GITEA_REPO}"
|
|
|
|
|
|
def _req(method, path, data=None):
|
|
url = f"{BASE}{path}"
|
|
payload = json.dumps(data).encode("utf-8") if data else None
|
|
req = urllib.request.Request(url, data=payload, method=method)
|
|
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
raw = resp.read()
|
|
if not raw:
|
|
return {} # Gitea merge returns 200 with empty body
|
|
return json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
print(f"API Error {e.code}: {body}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def _req_safe(method, path, data=None):
|
|
"""Like _req but returns None on HTTPError instead of crashing.
|
|
Used for probing issue/PR existence where the caller can handle absence.
|
|
"""
|
|
url = f"{BASE}{path}"
|
|
payload = json.dumps(data).encode("utf-8") if data else None
|
|
req = urllib.request.Request(url, data=payload, method=method)
|
|
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
raw = resp.read()
|
|
if not raw:
|
|
return {}
|
|
return json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
print(f"API Error {e.code}: {body}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
# ── Issue operations ─────────────────────────────────────────────────────────
|
|
|
|
def list_issues(labels: list[str] | None = None):
|
|
url = "/issues?state=open"
|
|
if labels:
|
|
for lb in labels:
|
|
url += f"&labels={lb}"
|
|
issues = _req("GET", url)
|
|
if not issues:
|
|
label_hint = f" (filtered by {labels})" if labels else ""
|
|
print(f"No open issues found{label_hint}.")
|
|
return []
|
|
for i in issues:
|
|
issue_labels = [l["name"] for l in i.get("labels", [])]
|
|
print(f"#{i['number']} [{', '.join(issue_labels) if issue_labels else 'no label'}] {i['title']}")
|
|
return issues
|
|
|
|
|
|
def _get_blocking_refs(issue_num: int) -> set[int]:
|
|
"""Extract all issue references from an issue body + comments.
|
|
|
|
Scans both the issue body and all comments for #N patterns,
|
|
returning a set of referenced issue numbers.
|
|
"""
|
|
refs: set[int] = set()
|
|
# Body
|
|
issue = _req_safe("GET", f"/issues/{issue_num}")
|
|
if issue is None:
|
|
return refs # API error → return empty set, keep blocked
|
|
body = issue.get("body", "") or ""
|
|
refs.update(int(m.group(1)) for m in re.finditer(r'#(\d+)', body))
|
|
# Comments
|
|
comments = _req_safe("GET", f"/issues/{issue_num}/comments")
|
|
if comments:
|
|
for c in comments:
|
|
cbody = c.get("body", "") or ""
|
|
refs.update(int(m.group(1)) for m in re.finditer(r'#(\d+)', cbody))
|
|
return refs
|
|
|
|
|
|
def blocked_check():
|
|
"""Check all blocked issues: if blocking issues are now closed, unblock.
|
|
|
|
Scans issue body + comments for blocking references.
|
|
If no references found or all referenced issues are closed,
|
|
removes the 'blocked' label.
|
|
"""
|
|
try:
|
|
all_blocked = _req("GET", "/issues?state=open&labels=blocked")
|
|
except SystemExit:
|
|
print("No blocked issues found.")
|
|
return
|
|
|
|
if not all_blocked:
|
|
print("No blocked issues found.")
|
|
return
|
|
|
|
unblocked_count = 0
|
|
for issue in all_blocked:
|
|
blocking_nums = _get_blocking_refs(issue["number"])
|
|
|
|
all_resolved = True
|
|
for blk in blocking_nums:
|
|
blk_issue = _req_safe("GET", f"/issues/{blk}")
|
|
if blk_issue is None:
|
|
all_resolved = False # API error → keep blocked
|
|
break
|
|
if blk_issue.get("state") != "closed":
|
|
all_resolved = False
|
|
break
|
|
|
|
if all_resolved:
|
|
current_label_names = [l["name"] for l in issue.get("labels", [])]
|
|
new_label_names = [l for l in current_label_names if l != "blocked"]
|
|
new_label_ids = _label_names_to_ids(new_label_names)
|
|
_req("PUT", f"/issues/{issue['number']}/labels", {"labels": new_label_ids})
|
|
reason = "所有阻塞 Issue 均已关闭" if blocking_nums else "无阻塞引用,移除残留 blocked 标签"
|
|
print(f"Unblocked #{issue['number']}: {issue['title']}")
|
|
comment_issue(issue["number"], f"阻塞已解除:{reason}。")
|
|
unblocked_count += 1
|
|
|
|
if unblocked_count == 0:
|
|
print(f"Checked {len(all_blocked)} blocked issue(s): still blocked.")
|
|
|
|
|
|
def get_issue(num):
|
|
i = _req("GET", f"/issues/{num}")
|
|
print(f"## #{i['number']}: {i['title']}")
|
|
print(f"State: {i['state']}")
|
|
labels = [l["name"] for l in i.get("labels", [])]
|
|
print(f"Labels: {', '.join(labels) if labels else 'none'}")
|
|
print()
|
|
print(i.get("body", "(no description)"))
|
|
return i
|
|
|
|
|
|
def comment_issue(num, body):
|
|
i = _req("POST", f"/issues/{num}/comments", {"body": body + AGENT_SIG})
|
|
print(f"Comment added to #{num}")
|
|
return i
|
|
|
|
|
|
def close_issue(num, body=None):
|
|
"""Close an issue, optionally with a final comment (signature auto-appended).
|
|
|
|
After closing, automatically unblocks any issues that were blocked by this one
|
|
if no other blocking issues remain open.
|
|
"""
|
|
if body:
|
|
comment_issue(num, body) # comment_issue already appends AGENT_SIG
|
|
i = _req("PATCH", f"/issues/{num}", {"state": "closed"})
|
|
print(f"Issue #{num} closed")
|
|
_unblock_issues_blocked_by(num)
|
|
return i
|
|
|
|
|
|
def _unblock_issues_blocked_by(closed_num):
|
|
"""Check issues blocked by *closed_num* and unblock if all blockers resolved.
|
|
|
|
Scans both body and comments for #N references. If *closed_num* appears
|
|
in any blocked issue and all referenced issues are now closed,
|
|
removes the 'blocked' label and comments on the unblocked issue.
|
|
"""
|
|
all_blocked = _req_safe("GET", "/issues?state=open&labels=blocked")
|
|
if not all_blocked:
|
|
return
|
|
|
|
for issue in all_blocked:
|
|
blocking_nums = _get_blocking_refs(issue["number"])
|
|
if closed_num not in blocking_nums:
|
|
continue
|
|
|
|
# Check all referenced issues — are they all closed?
|
|
all_resolved = True
|
|
for blk in blocking_nums:
|
|
if blk == closed_num:
|
|
continue
|
|
blk_issue = _req_safe("GET", f"/issues/{blk}")
|
|
if blk_issue is None:
|
|
all_resolved = False # API error → keep blocked
|
|
break
|
|
if blk_issue.get("state") != "closed":
|
|
all_resolved = False
|
|
break
|
|
|
|
if all_resolved:
|
|
current_label_names = [l["name"] for l in issue.get("labels", [])]
|
|
new_label_names = [l for l in current_label_names if l != "blocked"]
|
|
new_label_ids = _label_names_to_ids(new_label_names)
|
|
_req("PUT", f"/issues/{issue['number']}/labels", {"labels": new_label_ids})
|
|
print(f" -> Unblocked #{issue['number']}: all blocking issues resolved")
|
|
comment_issue(issue["number"],
|
|
f"阻塞已解除:#{closed_num} 及其他阻塞 Issue 均已关闭。")
|
|
|
|
|
|
def create_issue(title, body=None, labels=None):
|
|
"""Create a new Gitea issue.
|
|
|
|
Labels convention (per project rules):
|
|
- Product/feature issues → product-code
|
|
- Test code issues → test-code
|
|
"""
|
|
payload = {"title": title}
|
|
if body:
|
|
payload["body"] = body + AGENT_SIG
|
|
if labels:
|
|
label_names = [l.strip() for l in labels.split(",") if l.strip()]
|
|
# Gitea 1.22 expects label IDs (int64). Resolve names → IDs.
|
|
label_ids = _label_names_to_ids(label_names)
|
|
if label_ids:
|
|
payload["labels"] = label_ids
|
|
i = _req("POST", "/issues", payload)
|
|
issue_labels = [l["name"] for l in i.get("labels", [])]
|
|
print(f"Issue #{i['number']} created: {i['title']}")
|
|
if issue_labels:
|
|
print(f"Labels: {', '.join(issue_labels)}")
|
|
print(f"URL: {i.get('html_url', i.get('url', ''))}")
|
|
return i
|
|
|
|
|
|
def _label_names_to_ids(names: list[str]) -> list[int]:
|
|
"""Resolve label names to Gitea label IDs. Returns empty list on failure."""
|
|
try:
|
|
all_labels = _req("GET", "/labels")
|
|
name_to_id = {l["name"]: l["id"] for l in all_labels}
|
|
ids = []
|
|
for name in names:
|
|
if name in name_to_id:
|
|
ids.append(name_to_id[name])
|
|
else:
|
|
print(f"Warning: label '{name}' not found, skipping", file=sys.stderr)
|
|
return ids
|
|
except SystemExit:
|
|
return []
|
|
|
|
|
|
# ── PR operations ────────────────────────────────────────────────────────────
|
|
|
|
def create_pr(issue_num, branch, body=None):
|
|
"""Create a PR for the given issue and branch."""
|
|
issue = _req("GET", f"/issues/{issue_num}")
|
|
title = f"fix: {issue['title']} - Closes #{issue_num}"
|
|
if body is None:
|
|
body = f"Closes #{issue_num}\n\n{issue.get('body', '')}"
|
|
body += AGENT_SIG
|
|
pr = _req("POST", "/pulls", {
|
|
"title": title,
|
|
"head": branch,
|
|
"base": "main",
|
|
"body": body,
|
|
})
|
|
pr_url = pr.get('html_url', pr.get('url', ''))
|
|
print(f"PR created: {pr_url}")
|
|
return pr
|
|
|
|
|
|
def pr_status(pr_num):
|
|
"""Check PR state and CI status."""
|
|
pr = _req("GET", f"/pulls/{pr_num}")
|
|
print(f"PR #{pr['number']}: {pr['title']}")
|
|
print(f"State: {pr['state']}")
|
|
print(f"Merged: {pr.get('merged', False)}")
|
|
print(f"Mergeable: {pr.get('mergeable', 'unknown')}")
|
|
print(f"URL: {pr['html_url']}")
|
|
|
|
# CI status
|
|
sha = pr.get("head", {}).get("sha", "")
|
|
if sha:
|
|
try:
|
|
status = _req("GET", f"/commits/{sha}/status")
|
|
print(f"CI Status: {status.get('state', 'pending')}")
|
|
for s in status.get('statuses', []):
|
|
desc = s.get('description', '')
|
|
print(f" - {s.get('context')}: {s.get('state')} ({desc})")
|
|
except SystemExit:
|
|
print("CI Status: no statuses found")
|
|
return pr
|
|
|
|
|
|
def merge_pr(pr_num):
|
|
"""Merge a PR. Fails if CI hasn't passed or PR is not mergeable."""
|
|
pr = _req("GET", f"/pulls/{pr_num}")
|
|
|
|
if pr.get("state") == "closed":
|
|
if pr.get("merged"):
|
|
print(f"PR #{pr_num} already merged")
|
|
return pr
|
|
else:
|
|
print(f"PR #{pr_num} is closed (not merged)", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Check CI
|
|
sha = pr.get("head", {}).get("sha", "")
|
|
ci_passed = True
|
|
if sha:
|
|
try:
|
|
status = _req("GET", f"/commits/{sha}/status")
|
|
ci_state = status.get("state", "pending")
|
|
if ci_state in ("failure", "error"):
|
|
ci_passed = False
|
|
print(f"CI status: {ci_state} — cannot merge", file=sys.stderr)
|
|
for s in status.get('statuses', []):
|
|
print(f" {s.get('context')}: {s.get('state')}", file=sys.stderr)
|
|
except SystemExit:
|
|
pass # No statuses, proceed
|
|
|
|
if not ci_passed:
|
|
print("Merge blocked: CI has not passed", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = _req("POST", f"/pulls/{pr_num}/merge", {"Do": "merge"})
|
|
# Verify merge success by re-checking PR state
|
|
pr_after = _req("GET", f"/pulls/{pr_num}")
|
|
if pr_after.get("merged"):
|
|
print(f"PR #{pr_num} merged successfully")
|
|
elif result.get("merged"):
|
|
print(f"PR #{pr_num} merged successfully")
|
|
else:
|
|
print(f"Merge result: {result.get('message', 'unknown')}")
|
|
return result
|
|
|
|
|
|
# ── Lifecycle management ─────────────────────────────────────────────────────
|
|
|
|
def lifecycle(issue_num):
|
|
"""Print the full lifecycle status for an issue: branch, PR, CI, merge."""
|
|
print(f"=== Issue #{issue_num} Lifecycle ===\n")
|
|
|
|
issue = _req("GET", f"/issues/{issue_num}")
|
|
print(f"Issue: {issue['title']}")
|
|
print(f"State: {issue['state']}\n")
|
|
|
|
# Find associated PRs
|
|
prs = _req("GET", "/pulls?state=all")
|
|
related = [p for p in prs if f"Closes #{issue_num}" in p.get("body", "")
|
|
or f"#{issue_num}" in p.get("title", "")]
|
|
if related:
|
|
for pr in related:
|
|
print(f"PR #{pr['number']}: {pr['state']} (merged={pr.get('merged', False)})")
|
|
pr_status(pr["number"])
|
|
else:
|
|
print("No associated PR found.")
|
|
|
|
|
|
# ── CLI ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Dev agent Gitea helper")
|
|
parser.add_argument("--action", required=True,
|
|
choices=["list", "get", "comment", "close-issue",
|
|
"create-issue", "create-pr", "pr-status", "merge-pr", "lifecycle",
|
|
"blocked-check"])
|
|
parser.add_argument("--issue", type=int)
|
|
parser.add_argument("--pr", type=int)
|
|
parser.add_argument("--title", help="Issue title (for 'create-issue' action)")
|
|
parser.add_argument("--branch")
|
|
parser.add_argument("--body")
|
|
parser.add_argument("--labels", help="Comma-separated labels (filter for 'list', assign for 'create-issue')")
|
|
args = parser.parse_args()
|
|
|
|
if not GITEA_TOKEN:
|
|
print("Error: GITEA_API_TOKEN environment variable is not set.", file=sys.stderr)
|
|
print("Set it via: export GITEA_API_TOKEN=your-token", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.action == "list":
|
|
label_filter = [l.strip() for l in args.labels.split(",") if l.strip()] if args.labels else None
|
|
list_issues(label_filter)
|
|
elif args.action == "get":
|
|
if not args.issue:
|
|
print("--issue is required for 'get' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
get_issue(args.issue)
|
|
elif args.action == "comment":
|
|
if not args.issue or not args.body:
|
|
print("--issue and --body are required for 'comment' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
comment_issue(args.issue, args.body)
|
|
elif args.action == "close-issue":
|
|
if not args.issue:
|
|
print("--issue is required for 'close-issue' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
close_issue(args.issue, args.body)
|
|
elif args.action == "create-issue":
|
|
if not args.title:
|
|
print("--title is required for 'create-issue' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
create_issue(args.title, args.body, args.labels)
|
|
elif args.action == "create-pr":
|
|
if not args.issue or not args.branch:
|
|
print("--issue and --branch are required for 'create-pr' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
create_pr(args.issue, args.branch, args.body)
|
|
elif args.action == "pr-status":
|
|
if not args.pr:
|
|
print("--pr is required for 'pr-status' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
pr_status(args.pr)
|
|
elif args.action == "merge-pr":
|
|
if not args.pr:
|
|
print("--pr is required for 'merge-pr' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
merge_pr(args.pr)
|
|
elif args.action == "blocked-check":
|
|
blocked_check()
|
|
elif args.action == "lifecycle":
|
|
if not args.issue:
|
|
print("--issue is required for 'lifecycle' action", file=sys.stderr)
|
|
sys.exit(1)
|
|
lifecycle(args.issue)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|