forked from 1jehuang/jcode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcheck_panic_budget.py
More file actions
200 lines (166 loc) · 6.72 KB
/
check_panic_budget.py
File metadata and controls
200 lines (166 loc) · 6.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python3
"""Enforce a ratcheting production panic-prone usage budget.
Counts production Rust occurrences of:
- `.unwrap(`
- `.expect(`
- `panic!`, `todo!`, `unimplemented!`
Policy:
- Existing files may not increase their count.
- New production files may not introduce panic-prone usage.
- Total count may not increase.
- `--update` refreshes the baseline after intentional cleanup.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parent.parent
BASELINE_FILE = REPO_ROOT / "scripts" / "panic_budget.json"
SCAN_ROOTS = (REPO_ROOT / "src", REPO_ROOT / "crates")
PATTERN = re.compile(r"\.unwrap\(|\.expect\(|\b(?:panic!|todo!|unimplemented!)")
CFG_TEST_RE = re.compile(r"^\s*#\s*\[\s*cfg\s*\(\s*test\s*\)\s*\]")
ITEM_START_RE = re.compile(r"^\s*(?:pub(?:\([^)]*\))?\s+)?(?:mod|fn)\b")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--update", action="store_true", help="refresh the baseline")
return parser.parse_args()
def is_test_rust_file(path: Path) -> bool:
rel = path.relative_to(REPO_ROOT).as_posix()
if path.suffix != ".rs":
return False
parts = rel.split("/")
if parts[0] == "tests" or any(
part == "tests" or part.endswith("_tests") or part.endswith("_test") or part.startswith("tests_")
for part in parts
):
return True
name = path.name
return (
name == "tests.rs"
or name.endswith("_tests.rs")
or name.endswith("_test.rs")
or name.startswith("tests_")
)
def production_rust_files() -> list[Path]:
files: list[Path] = []
for root in SCAN_ROOTS:
if not root.exists():
continue
for path in sorted(root.rglob("*.rs")):
if path.suffix == ".rs" and not is_test_rust_file(path):
files.append(path)
return files
def brace_delta(line: str) -> int:
"""Approximate Rust block nesting for budget classification.
The panic budget is a ratchet, not a parser. This intentionally simple scan
ignores comments and strings, which is acceptable for excluding normal
`#[cfg(test)] mod tests { ... }` blocks from production counts.
"""
return line.count("{") - line.count("}")
def production_lines(path: Path) -> list[str]:
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
output: list[str] = []
skip_stack: list[int] = []
pending_cfg_test = False
for line in lines:
stripped = line.strip()
current_depth = sum(skip_stack)
if current_depth == 0:
if pending_cfg_test and ITEM_START_RE.match(line):
delta = brace_delta(line)
if delta > 0:
skip_stack.append(delta)
pending_cfg_test = False
continue
if pending_cfg_test and stripped and not stripped.startswith("#"):
pending_cfg_test = False
if CFG_TEST_RE.match(line):
pending_cfg_test = True
continue
output.append(line)
else:
skip_stack[-1] += brace_delta(line)
if skip_stack[-1] <= 0:
skip_stack.pop()
return output
def current_counts() -> dict[str, int]:
counts: dict[str, int] = {}
for path in production_rust_files():
count = sum(1 for line in production_lines(path) if PATTERN.search(line))
if count:
counts[path.relative_to(REPO_ROOT).as_posix()] = count
return counts
def load_baseline() -> dict[str, Any]:
if not BASELINE_FILE.exists():
return {"version": 1, "total": 0, "tracked_files": {}}
data = json.loads(BASELINE_FILE.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise SystemExit(f"error: invalid baseline file format: {BASELINE_FILE}")
total = data.get("total")
tracked = data.get("tracked_files")
if not isinstance(total, int) or total < 0:
raise SystemExit(f"error: invalid total in {BASELINE_FILE}")
if not isinstance(tracked, dict) or any(
not isinstance(k, str) or not isinstance(v, int) or v <= 0 for k, v in tracked.items()
):
raise SystemExit(f"error: invalid tracked_files in {BASELINE_FILE}")
return data
def write_baseline(counts: dict[str, int]) -> None:
BASELINE_FILE.write_text(
json.dumps(
{"version": 1, "total": sum(counts.values()), "tracked_files": counts},
indent=2,
sort_keys=True,
)
+ "\n",
encoding="utf-8",
)
def main() -> int:
args = parse_args()
baseline = load_baseline()
current = current_counts()
current_total = sum(current.values())
if args.update:
write_baseline(current)
print(
"Updated panic-prone baseline: "
f"total={baseline['total']} -> {current_total}, files={len(baseline['tracked_files'])} -> {len(current)}"
)
return 0
tracked: dict[str, int] = baseline["tracked_files"]
regressions: list[str] = []
improvements: list[str] = []
if current_total > baseline["total"]:
regressions.append(f"total panic-prone count grew: {baseline['total']} -> {current_total}")
elif current_total < baseline["total"]:
improvements.append(f"total panic-prone count shrank: {baseline['total']} -> {current_total}")
for path, count in sorted(current.items()):
old_count = tracked.get(path)
if old_count is None:
regressions.append(f"new production panic-prone usage: {path} ({count})")
elif count > old_count:
regressions.append(f"production panic-prone usage grew: {path} ({old_count} -> {count})")
elif count < old_count:
improvements.append(f"production panic-prone usage shrank: {path} ({old_count} -> {count})")
for path, old_count in sorted(tracked.items()):
if path not in current:
improvements.append(f"production panic-prone usage removed: {path} ({old_count} -> 0)")
if regressions:
print("Panic-prone usage budget exceeded:", file=sys.stderr)
for entry in regressions:
print(f" - {entry}", file=sys.stderr)
print("Run scripts/check_panic_budget.py --update only after intentional cleanup.", file=sys.stderr)
return 1
if improvements:
print("Panic-prone usage budget improved:")
for entry in improvements:
print(f" - {entry}")
print("Consider running: scripts/check_panic_budget.py --update")
else:
print(f"Panic-prone budget OK: total={current_total} files={len(current)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())