learn-claude-code/s03_permission at main · shareAI-lab/learn-claude-code

其实之前的代码中已经有一些安全机制了

比如safe_path,限制只能在指定的目录下工作

防止模型路径逃逸

但如果模型搞一个rm -rf出来,仍然是非常危险的

安全绝对不能依靠概率模型,要靠代码在工具使用之前判断

Permission

Block 1

设定了一个硬拒绝列表,在列表内的操作直接拒绝

例如:sudorm -rf /、……

Block 2

规则匹配

  • 需要询问用户的情况
  • 直接放行的情况

Block 3

用户审核

code

  1#!/usr/bin/env python3
  2"""
  3s02: Tool Use — 在 s01 基础上新增 4 个工具 + 分发映射。
  4
  5运行: python s02_tool_use/code.py
  6需要: pip install anthropic python-dotenv + .env 中配置 ANTHROPIC_API_KEY
  7
  8本文件 = s01 的全部代码 + 以下新增:
  9  + run_read / run_write / run_edit / run_glob 四个工具实现
 10  + TOOL_HANDLERS 分发映射(替代 s01 中硬编码的 run_bash 调用)
 11  + safe_path 路径安全校验
 12
 13循环本身(agent_loop)与 s01 完全一致。
 14"""
 15
 16import os, subprocess
 17from pathlib import Path
 18
 19try:
 20    import readline
 21    readline.parse_and_bind('set bind-tty-special-chars off')
 22    readline.parse_and_bind('set input-meta on')
 23    readline.parse_and_bind('set output-meta on')
 24    readline.parse_and_bind('set convert-meta off')
 25except ImportError:
 26    pass
 27
 28from anthropic import Anthropic
 29from dotenv import load_dotenv
 30
 31load_dotenv(override=True)
 32if os.getenv("ANTHROPIC_BASE_URL"):
 33    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
 34
 35WORKDIR = Path.cwd()
 36client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
 37MODEL = os.environ["MODEL_ID"]
 38
 39SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."
 40
 41
 42# ═══════════════════════════════════════════════════════════
 43#  FROM s01 (unchanged)
 44# ═══════════════════════════════════════════════════════════
 45
 46def run_bash(command: str) -> str:
 47    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
 48    if any(d in command for d in dangerous):
 49        return "Error: Dangerous command blocked"
 50    try:
 51        r = subprocess.run(command, shell=True, cwd=WORKDIR,
 52                           capture_output=True, text=True,
 53                           encoding="utf-8", errors="replace", timeout=120)
 54        out = (r.stdout + r.stderr).strip()
 55        return out[:50000] if out else "(no output)"
 56    except subprocess.TimeoutExpired:
 57        return "Error: Timeout (120s)"
 58    except (FileNotFoundError, OSError) as e:
 59        return f"Error: {e}"
 60
 61
 62# ═══════════════════════════════════════════════════════════
 63#  NEW in s02: 4 个新工具
 64# ═══════════════════════════════════════════════════════════
 65
 66def safe_path(p: str) -> Path:
 67    path = (WORKDIR / p).resolve()
 68    if not path.is_relative_to(WORKDIR):
 69        raise ValueError(f"Path escapes workspace: {p}")
 70    return path
 71
 72
 73def run_read(path: str, limit: int | None = None) -> str:
 74    try:
 75        lines = safe_path(path).read_text().splitlines()
 76        if limit and limit < len(lines):
 77            lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
 78        return "\n".join(lines)
 79    except Exception as e:
 80        return f"Error: {e}"
 81
 82
 83def run_write(path: str, content: str) -> str:
 84    try:
 85        file_path = safe_path(path)
 86        file_path.parent.mkdir(parents=True, exist_ok=True)
 87        file_path.write_text(content)
 88        return f"Wrote {len(content)} bytes to {path}"
 89    except Exception as e:
 90        return f"Error: {e}"
 91
 92
 93def run_edit(path: str, old_text: str, new_text: str) -> str:
 94    try:
 95        file_path = safe_path(path)
 96        text = file_path.read_text()
 97        if old_text not in text:
 98            return f"Error: text not found in {path}"
 99        file_path.write_text(text.replace(old_text, new_text, 1))
100        return f"Edited {path}"
101    except Exception as e:
102        return f"Error: {e}"
103
104
105def run_glob(pattern: str) -> str:
106    import glob as g
107    try:
108        results = []
109        for match in g.glob(pattern, root_dir=WORKDIR):
110            if (WORKDIR / match).resolve().is_relative_to(WORKDIR):
111                results.append(match)
112        return "\n".join(results) if results else "(no matches)"
113    except Exception as e:
114        return f"Error: {e}"
115
116
117# ═══════════════════════════════════════════════════════════
118#  NEW in s02: 工具定义(s01 只有一个 bash,现在扩展到 5 个)
119# ═══════════════════════════════════════════════════════════
120
121TOOLS = [
122    {"name": "bash", "description": "Run a shell command.",
123     "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
124    {"name": "read_file", "description": "Read file contents.",
125     "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
126    {"name": "write_file", "description": "Write content to a file.",
127     "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
128    {"name": "edit_file", "description": "Replace exact text in a file once.",
129     "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
130    {"name": "glob", "description": "Find files matching a glob pattern.",
131     "input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
132]
133
134# ═══════════════════════════════════════════════════════════
135#  NEW in s02: 工具分发映射(s01 是硬编码 run_bash,现在改为查表)
136# ═══════════════════════════════════════════════════════════
137
138TOOL_HANDLERS = {
139    "bash": run_bash, "read_file": run_read, "write_file": run_write,
140    "edit_file": run_edit, "glob": run_glob,
141}
142
143# my code in s03
144# -----------------------------------------------------------
145
146# Gate 1: Hard deny list — always forbidden
147DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if=", "> /dev/sda"]
148def check_deny_list(command: str) -> bool:
149    if any(d in command for d in DENY_LIST):
150        return "Error: Dangerous command blocked"
151    return None
152
153# Gate 2: Rule matching — context-dependent checks
154PERMISSION_RULES = [
155    {
156        "tools": ["write_file", "edit_file"], # 针对的工具
157        "check": lambda args: not (WORKDIR / args.get("path", "")).resolve().is_relative_to(WORKDIR), # lambda方法
158        "message": "Writing outside workspace" # 提供给用户的核验消息
159    },
160    {
161        "tools": ["bash"],
162        "check": lambda args: any(kw in args.get("command", "") for kw in ["rm ", "> /etc/", "chmod 777"]),
163        "message": "Potentially destructive command"
164    },
165]
166
167def check_rules(tool_name: str, args: dict) -> str | None:
168    for rule in PERMISSION_RULES:
169        # 如果在检测名单中 并且触发了危险信号 提交人工审核
170        if tool_name in rule["tools"] and rule["check"](args):
171            return f"Warning: {rule['message']}"
172    return None
173
174# Gate 3: User approval — wait for confirmation after rule match
175def ask_user(tool_name: str, args: dict, reason: str) -> str:
176    print(f"\n\033[33m⚠  {reason}\033[0m")
177    print(f"   Tool: {tool_name}({args})")
178    choice = input("   Allow? [y/N] ").strip().lower()
179    return "allow" if choice in ("y", "yes") else "deny"
180
181# pipeline
182def check_permission_pipeline(block) -> str:
183    if block.name == 'bash':
184        reason = check_deny_list(block.input.get("command", ""))
185        if reason:
186            print(f"\n\033[31m⛔ {reason}\033[0m")
187            return False
188    reason = check_rules(block.name, block.input)
189    if reason:
190        decision = ask_user(block.name, block.input, reason)
191        if decision == "deny":
192            print(f"\n\033[31m⛔ User denied permission for {block.name}\033[0m")
193            return False
194    return True
195
196
197# -----------------------------------------------------------
198# ═══════════════════════════════════════════════════════════
199#  agent_loop — 与 s01 结构完全一致,只改了工具执行那部分
200#  s01: output = run_bash(block.input["command"])
201#  s02: output = TOOL_HANDLERS[block.name](**block.input)
202# ═══════════════════════════════════════════════════════════
203
204def agent_loop(messages: list):
205    while True:
206        response = client.messages.create(
207            model=MODEL, system=SYSTEM, messages=messages,
208            tools=TOOLS, max_tokens=8000,
209        )
210        messages.append({"role": "assistant", "content": response.content})
211
212        if response.stop_reason != "tool_use":
213            return
214
215        results = []
216        for block in response.content:
217            if block.type == "tool_use":
218                
219                print(f"\033[33m> {block.name}({block.input})\033[0m")
220                # 权限核验
221                if not check_permission_pipeline(block):
222                    results.append({"type": "tool_result", "tool_use_id": block.id, "content": "Permission denied"})
223                    print(f"\033[31m⛔ Permission denied for {block.name}\033[0m")
224                    continue
225
226                print(f"\033[33m> {block.name}\033[0m")
227                handler = TOOL_HANDLERS.get(block.name)
228                output = handler(**block.input) if handler else f"Unknown: {block.name}"
229                print(str(output)[:200])
230                results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
231
232        messages.append({"role": "user", "content": results})
233
234
235if __name__ == "__main__":
236    print("s02: Tool Use — 在 s01 基础上加了 4 个工具")
237    print("输入问题,回车发送。输入 q 退出。\n")
238
239    history = []
240    while True:
241        try:
242            query = input("\033[36ms02 >> \033[0m")
243        except (EOFError, KeyboardInterrupt):
244            break
245        if query.strip().lower() in ("q", "exit", ""):
246            break
247        history.append({"role": "user", "content": query})
248        agent_loop(history)
249        for block in history[-1]["content"]:
250            if getattr(block, "type", None) == "text":
251                print(block.text)
252        print()

总体来说还是比较粗糙

有一个明显的bug,如果一直permission,llm可能会反复重试(当然是多种方案)

Claude Code

Permission

CC分为四个档

  • allow:直接允许
  • ask:询问
  • deny:直接拒绝
  • passthrough:CC独有,暂时搁置,对当前状态暂不进行拒绝,留到之后的工具、pipeline进行判定

Source

CC允许在不同层级(8个来源)中设置权限规则

例如

1{ toolName: "Bash", ruleBehavior: "deny", ruleContent: "npm publish:*" }

当规则发生冲突时,按照优先级进行处理

当前会话优先,最后是全局用户选项

flowchart TB CLI["CLI Args / Command / Session<br/>会话 / 命令行参数"] Policy["Policy<br/>企业策略"] Flags["Flags<br/>特性开关"] Local["Local<br/>本地覆盖"] Project["Project<br/>项目"] User["User<br/>用户全局"] CLI -->|覆盖| Policy -->|覆盖| Flags -->|覆盖| Local -->|覆盖| Project -->|覆盖| User

本地allow的行为可能会被企业Policy直接deny

Pipeline

Agent发起工具请求时,必须经过一个checkPermissionsAndCallTool()主导的流水线

  • 静态验证
    1. Zod Schema 验证:首先检查 Agent 传过来的参数类型对不对(比如数字是不是写成了字符串)
    2. validateInput():工具级别的语义验证。工具自己会检查参数是否合规
    3. backfillObservableInput():向下兼容,补全遗留或缺失的属性字段
  • 动态策略(Hooks + 核心管线 )
    4. PreToolUse Hooks:挂载第三方插件或自定义逻辑,钩子可以直接返回 allow / deny / ask
    5. resolveHookPermissionDecision():协调钩子+管线决策
    6. 核心规则检查
    • 绝对禁用:命中高优先级Source的 deny
    • ASK:命中高优先级Source的 ask
    • 自定义检查:调用工具自身的 tool.checkPermissions()
    • 安全越界:触发系统级安全检查违规(转化为ask)
    • 全局放行:命中 allow 规则
    • 兜底处理:若全未命中(passthrough) ➔ 最终保守退化为 ask

isDestructive

在源码中会看到一个 isDestructive 属性,教学版往往容易误把它当成防御闸门。但实际上:

  • 纯粹是 UI 展示标签
  • 它的唯一作用是在终端列表里标红显示 [destructive] 以警示用户。
  • 不参与任何权限逻辑的决策,真正的安全机制完全由上述的 Pipeline 保证。

YoloClassifier

Auto模式下启用LLM进行判断放行

  • 把工具调用 + 对话上下文发给一个分类器 LLM 判断是否安全
    • 先尝试 acceptEdits 模式模拟,如果 acceptEdits 允许 → 直接批准
    • 查安全工具白名单
    • 最后:分类器,分类器连续拒绝太多次 → 回退到人工审批

Bubble

子 Agent(通过 AgentTool fork 出来的)的 permissionMode 设为 'bubble'

权限弹窗冒泡到父 Agent 的终端