learn-claude-code/s03_permission at main · shareAI-lab/learn-claude-code
其实之前的代码中已经有一些安全机制了
比如safe_path,限制只能在指定的目录下工作
防止模型路径逃逸
但如果模型搞一个rm -rf出来,仍然是非常危险的
安全绝对不能依靠概率模型,要靠代码在工具使用之前判断

Permission
Block 1
设定了一个硬拒绝列表,在列表内的操作直接拒绝
例如:sudo、rm -rf /、……
Block 2
规则匹配
- 需要询问用户的情况
- 直接放行的情况
Block 3
用户审核
code
1#!/usr/bin/env python3
2"""
3s02: Tool Use — 在 s01 基础上新增 4 个工具 + 分发映射。
4
5运行: python s02_tool_use/code.py
6需要: pip install anthropic python-dotenv + .env 中配置 ANTHROPIC_API_KEY
7
8本文件 = s01 的全部代码 + 以下新增:
9 + run_read / run_write / run_edit / run_glob 四个工具实现
10 + TOOL_HANDLERS 分发映射(替代 s01 中硬编码的 run_bash 调用)
11 + safe_path 路径安全校验
12
13循环本身(agent_loop)与 s01 完全一致。
14"""
15
16import os, subprocess
17from pathlib import Path
18
19try:
20 import readline
21 readline.parse_and_bind('set bind-tty-special-chars off')
22 readline.parse_and_bind('set input-meta on')
23 readline.parse_and_bind('set output-meta on')
24 readline.parse_and_bind('set convert-meta off')
25except ImportError:
26 pass
27
28from anthropic import Anthropic
29from dotenv import load_dotenv
30
31load_dotenv(override=True)
32if os.getenv("ANTHROPIC_BASE_URL"):
33 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
34
35WORKDIR = Path.cwd()
36client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
37MODEL = os.environ["MODEL_ID"]
38
39SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."
40
41
42# ═══════════════════════════════════════════════════════════
43# FROM s01 (unchanged)
44# ═══════════════════════════════════════════════════════════
45
46def run_bash(command: str) -> str:
47 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
48 if any(d in command for d in dangerous):
49 return "Error: Dangerous command blocked"
50 try:
51 r = subprocess.run(command, shell=True, cwd=WORKDIR,
52 capture_output=True, text=True,
53 encoding="utf-8", errors="replace", timeout=120)
54 out = (r.stdout + r.stderr).strip()
55 return out[:50000] if out else "(no output)"
56 except subprocess.TimeoutExpired:
57 return "Error: Timeout (120s)"
58 except (FileNotFoundError, OSError) as e:
59 return f"Error: {e}"
60
61
62# ═══════════════════════════════════════════════════════════
63# NEW in s02: 4 个新工具
64# ═══════════════════════════════════════════════════════════
65
66def safe_path(p: str) -> Path:
67 path = (WORKDIR / p).resolve()
68 if not path.is_relative_to(WORKDIR):
69 raise ValueError(f"Path escapes workspace: {p}")
70 return path
71
72
73def run_read(path: str, limit: int | None = None) -> str:
74 try:
75 lines = safe_path(path).read_text().splitlines()
76 if limit and limit < len(lines):
77 lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
78 return "\n".join(lines)
79 except Exception as e:
80 return f"Error: {e}"
81
82
83def run_write(path: str, content: str) -> str:
84 try:
85 file_path = safe_path(path)
86 file_path.parent.mkdir(parents=True, exist_ok=True)
87 file_path.write_text(content)
88 return f"Wrote {len(content)} bytes to {path}"
89 except Exception as e:
90 return f"Error: {e}"
91
92
93def run_edit(path: str, old_text: str, new_text: str) -> str:
94 try:
95 file_path = safe_path(path)
96 text = file_path.read_text()
97 if old_text not in text:
98 return f"Error: text not found in {path}"
99 file_path.write_text(text.replace(old_text, new_text, 1))
100 return f"Edited {path}"
101 except Exception as e:
102 return f"Error: {e}"
103
104
105def run_glob(pattern: str) -> str:
106 import glob as g
107 try:
108 results = []
109 for match in g.glob(pattern, root_dir=WORKDIR):
110 if (WORKDIR / match).resolve().is_relative_to(WORKDIR):
111 results.append(match)
112 return "\n".join(results) if results else "(no matches)"
113 except Exception as e:
114 return f"Error: {e}"
115
116
117# ═══════════════════════════════════════════════════════════
118# NEW in s02: 工具定义(s01 只有一个 bash,现在扩展到 5 个)
119# ═══════════════════════════════════════════════════════════
120
121TOOLS = [
122 {"name": "bash", "description": "Run a shell command.",
123 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
124 {"name": "read_file", "description": "Read file contents.",
125 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
126 {"name": "write_file", "description": "Write content to a file.",
127 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
128 {"name": "edit_file", "description": "Replace exact text in a file once.",
129 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
130 {"name": "glob", "description": "Find files matching a glob pattern.",
131 "input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
132]
133
134# ═══════════════════════════════════════════════════════════
135# NEW in s02: 工具分发映射(s01 是硬编码 run_bash,现在改为查表)
136# ═══════════════════════════════════════════════════════════
137
138TOOL_HANDLERS = {
139 "bash": run_bash, "read_file": run_read, "write_file": run_write,
140 "edit_file": run_edit, "glob": run_glob,
141}
142
143# my code in s03
144# -----------------------------------------------------------
145
146# Gate 1: Hard deny list — always forbidden
147DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if=", "> /dev/sda"]
148def check_deny_list(command: str) -> bool:
149 if any(d in command for d in DENY_LIST):
150 return "Error: Dangerous command blocked"
151 return None
152
153# Gate 2: Rule matching — context-dependent checks
154PERMISSION_RULES = [
155 {
156 "tools": ["write_file", "edit_file"], # 针对的工具
157 "check": lambda args: not (WORKDIR / args.get("path", "")).resolve().is_relative_to(WORKDIR), # lambda方法
158 "message": "Writing outside workspace" # 提供给用户的核验消息
159 },
160 {
161 "tools": ["bash"],
162 "check": lambda args: any(kw in args.get("command", "") for kw in ["rm ", "> /etc/", "chmod 777"]),
163 "message": "Potentially destructive command"
164 },
165]
166
167def check_rules(tool_name: str, args: dict) -> str | None:
168 for rule in PERMISSION_RULES:
169 # 如果在检测名单中 并且触发了危险信号 提交人工审核
170 if tool_name in rule["tools"] and rule["check"](args):
171 return f"Warning: {rule['message']}"
172 return None
173
174# Gate 3: User approval — wait for confirmation after rule match
175def ask_user(tool_name: str, args: dict, reason: str) -> str:
176 print(f"\n\033[33m⚠ {reason}\033[0m")
177 print(f" Tool: {tool_name}({args})")
178 choice = input(" Allow? [y/N] ").strip().lower()
179 return "allow" if choice in ("y", "yes") else "deny"
180
181# pipeline
182def check_permission_pipeline(block) -> str:
183 if block.name == 'bash':
184 reason = check_deny_list(block.input.get("command", ""))
185 if reason:
186 print(f"\n\033[31m⛔ {reason}\033[0m")
187 return False
188 reason = check_rules(block.name, block.input)
189 if reason:
190 decision = ask_user(block.name, block.input, reason)
191 if decision == "deny":
192 print(f"\n\033[31m⛔ User denied permission for {block.name}\033[0m")
193 return False
194 return True
195
196
197# -----------------------------------------------------------
198# ═══════════════════════════════════════════════════════════
199# agent_loop — 与 s01 结构完全一致,只改了工具执行那部分
200# s01: output = run_bash(block.input["command"])
201# s02: output = TOOL_HANDLERS[block.name](**block.input)
202# ═══════════════════════════════════════════════════════════
203
204def agent_loop(messages: list):
205 while True:
206 response = client.messages.create(
207 model=MODEL, system=SYSTEM, messages=messages,
208 tools=TOOLS, max_tokens=8000,
209 )
210 messages.append({"role": "assistant", "content": response.content})
211
212 if response.stop_reason != "tool_use":
213 return
214
215 results = []
216 for block in response.content:
217 if block.type == "tool_use":
218
219 print(f"\033[33m> {block.name}({block.input})\033[0m")
220 # 权限核验
221 if not check_permission_pipeline(block):
222 results.append({"type": "tool_result", "tool_use_id": block.id, "content": "Permission denied"})
223 print(f"\033[31m⛔ Permission denied for {block.name}\033[0m")
224 continue
225
226 print(f"\033[33m> {block.name}\033[0m")
227 handler = TOOL_HANDLERS.get(block.name)
228 output = handler(**block.input) if handler else f"Unknown: {block.name}"
229 print(str(output)[:200])
230 results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
231
232 messages.append({"role": "user", "content": results})
233
234
235if __name__ == "__main__":
236 print("s02: Tool Use — 在 s01 基础上加了 4 个工具")
237 print("输入问题,回车发送。输入 q 退出。\n")
238
239 history = []
240 while True:
241 try:
242 query = input("\033[36ms02 >> \033[0m")
243 except (EOFError, KeyboardInterrupt):
244 break
245 if query.strip().lower() in ("q", "exit", ""):
246 break
247 history.append({"role": "user", "content": query})
248 agent_loop(history)
249 for block in history[-1]["content"]:
250 if getattr(block, "type", None) == "text":
251 print(block.text)
252 print()
总体来说还是比较粗糙
有一个明显的bug,如果一直permission,llm可能会反复重试(当然是多种方案)
Claude Code
Permission
CC分为四个档
allow:直接允许ask:询问deny:直接拒绝passthrough:CC独有,暂时搁置,对当前状态暂不进行拒绝,留到之后的工具、pipeline进行判定
Source
CC允许在不同层级(8个来源)中设置权限规则
例如
1{ toolName: "Bash", ruleBehavior: "deny", ruleContent: "npm publish:*" }
当规则发生冲突时,按照优先级进行处理
当前会话优先,最后是全局用户选项
flowchart TB
CLI["CLI Args / Command / Session<br/>会话 / 命令行参数"]
Policy["Policy<br/>企业策略"]
Flags["Flags<br/>特性开关"]
Local["Local<br/>本地覆盖"]
Project["Project<br/>项目"]
User["User<br/>用户全局"]
CLI -->|覆盖| Policy -->|覆盖| Flags -->|覆盖| Local -->|覆盖| Project -->|覆盖| User
本地allow的行为可能会被企业Policy直接deny
Pipeline
Agent发起工具请求时,必须经过一个checkPermissionsAndCallTool()主导的流水线
- 静态验证
- Zod Schema 验证:首先检查 Agent 传过来的参数类型对不对(比如数字是不是写成了字符串)
validateInput():工具级别的语义验证。工具自己会检查参数是否合规backfillObservableInput():向下兼容,补全遗留或缺失的属性字段
- 动态策略(Hooks + 核心管线 )
4. PreToolUse Hooks:挂载第三方插件或自定义逻辑,钩子可以直接返回allow/deny/ask
5.resolveHookPermissionDecision():协调钩子+管线决策
6. 核心规则检查- 绝对禁用:命中高优先级Source的
deny - ASK:命中高优先级Source的
ask - 自定义检查:调用工具自身的
tool.checkPermissions() - 安全越界:触发系统级安全检查违规(转化为ask)
- 全局放行:命中
allow规则 - 兜底处理:若全未命中(
passthrough) ➔ 最终保守退化为ask
- 绝对禁用:命中高优先级Source的
isDestructive
在源码中会看到一个 isDestructive 属性,教学版往往容易误把它当成防御闸门。但实际上:
- 它纯粹是 UI 展示标签。
- 它的唯一作用是在终端列表里标红显示
[destructive]以警示用户。 - 它不参与任何权限逻辑的决策,真正的安全机制完全由上述的 Pipeline 保证。
YoloClassifier
Auto模式下启用LLM进行判断放行
- 把工具调用 + 对话上下文发给一个分类器 LLM 判断是否安全
- 先尝试 acceptEdits 模式模拟,如果 acceptEdits 允许 → 直接批准
- 查安全工具白名单
- 最后:分类器,分类器连续拒绝太多次 → 回退到人工审批
Bubble
子 Agent(通过 AgentTool fork 出来的)的 permissionMode 设为 'bubble'
权限弹窗冒泡到父 Agent 的终端