-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathseccomp_sandbox.py
More file actions
188 lines (161 loc) · 5.52 KB
/
seccomp_sandbox.py
File metadata and controls
188 lines (161 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import sys
import asyncio
from dataclasses import dataclass
from enum import Enum
class ExecutionStatus(str, Enum):
SUCCESS = "success"
TIMEOUT = "timeout"
MEMORY_LIMIT = "memory_limit"
FORBIDDEN_SYSCALL = "forbidden_syscall"
SYNTAX_ERROR = "syntax_error"
OTHER_ERROR = "other_error"
@dataclass
class ExecutionResult:
stdout: str
stderr: str
return_code: int
status: ExecutionStatus
wrapped_code: str
async def run_sandboxed(
code: str,
input: str,
args: list[str] = [],
timeout_seconds: int = 2,
memory_limit_mb: int = 512,
use_seccomp: bool = True,
) -> ExecutionResult:
"""
Runs Python code in a sandboxed environment.
WARNING: This is a simple incomplete sandboxing mechanism, mostly used to prevent LLMs from doing unexpected things.
In the future we will use docker.
Args:
code: Python code to execute
input: Input to pass to the code
timeout_seconds: Maximum execution time in seconds
memory_limit_mb: Maximum memory usage in MB
Returns:
ExecutionResult containing stdout, stderr, and return code
"""
if not use_seccomp:
seccomp_filters = ""
else:
try:
import pyseccomp
except ImportError:
raise ImportError("pyseccomp is not installed. We cannot apply seccomp filters in the sandbox. Please install it with `pip install pyseccomp`.")
seccomp_filters = """
import pyseccomp
# Set up seccomp filter
f = pyseccomp.SyscallFilter(defaction=pyseccomp.KILL)
# allow bare minimum syscalls for python execution
# allow `write`ing to two already-opened files stdout and stderr
f.add_rule(
pyseccomp.ALLOW,
"write",
pyseccomp.Arg(0, pyseccomp.EQ, sys.stdout.fileno()),
)
f.add_rule(
pyseccomp.ALLOW,
"write",
pyseccomp.Arg(0, pyseccomp.EQ, sys.stderr.fileno()),
)
# Needed to read from stdin
f.add_rule(
pyseccomp.ALLOW, "read", pyseccomp.Arg(0, pyseccomp.EQ, sys.stdin.fileno())
)
# Needed for cleanup of file descriptors
f.add_rule(pyseccomp.ALLOW, "close")
# Needed to terminate the process normally
f.add_rule(pyseccomp.ALLOW, "exit")
# Similar to exit, needed for process termination
f.add_rule(pyseccomp.ALLOW, "exit_group")
# Needed for heap memory management
f.add_rule(pyseccomp.ALLOW, "brk")
# Needed for memory allocation and Python's memory management
f.add_rule(pyseccomp.ALLOW, "mmap")
# Needed to free memory mapped regions
f.add_rule(pyseccomp.ALLOW, "munmap")
# Used by Python to check stdin/stdout/stderr status
f.add_rule(pyseccomp.ALLOW, "fstat")
# Used for stdin/stdout/stderr positioning
f.add_rule(pyseccomp.ALLOW, "lseek")
# Python runtime needs these
f.add_rule(pyseccomp.ALLOW, "futex") # For threading/synchronization
f.add_rule(pyseccomp.ALLOW, "rt_sigreturn") # For signal handling
f.add_rule(pyseccomp.ALLOW, "rt_sigaction") # For signal handling setup
f.add_rule(pyseccomp.ALLOW, "ioctl") # For terminal/stream operations
f.add_rule(pyseccomp.ALLOW, "fcntl") # For file descriptor operations
f.add_rule(pyseccomp.ALLOW, "poll") # For I/O multiplexing
f.load()
"""
sandboxed_code = f"""
import sys
import resource
# Set up resource limits first
resource.setrlimit(resource.RLIMIT_NPROC, (1, 1))
resource.setrlimit(resource.RLIMIT_AS, ({memory_limit_mb * 1024 * 1024}, {memory_limit_mb * 1024 * 1024}))
resource.setrlimit(resource.RLIMIT_CPU, ({timeout_seconds}, {timeout_seconds}))
{seccomp_filters}
# Now run the actual code
{code}
"""
proc = await asyncio.create_subprocess_exec(
sys.executable,
"-c",
sandboxed_code,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await proc.communicate(input=input.encode())
stderr_str = stderr.decode(errors="replace")
return_code = proc.returncode or 0
# Determine execution status
if return_code == 0:
status = ExecutionStatus.SUCCESS
elif return_code == -31: # seccomp violation
status = ExecutionStatus.FORBIDDEN_SYSCALL
elif "MemoryError" in stderr_str or "Killed" in stderr_str:
status = ExecutionStatus.MEMORY_LIMIT
elif "SyntaxError" in stderr_str:
status = ExecutionStatus.SYNTAX_ERROR
else:
status = ExecutionStatus.OTHER_ERROR
return ExecutionResult(
stdout=stdout.decode(errors="replace"),
stderr=stderr_str,
return_code=return_code,
status=status,
wrapped_code=sandboxed_code,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return ExecutionResult(
stdout="",
stderr="Execution timed out",
return_code=-1,
status=ExecutionStatus.TIMEOUT,
wrapped_code=sandboxed_code,
)
def main():
async def run_test():
# Test 1: Normal execution with stdin/stdout
print("\n=== Test 1: Normal execution ===")
code1 = """
n = int(input())
print(f'Got number: {n}')
print('Hello stderr!', file=sys.stderr)
"""
result = await run_sandboxed(code1, input="123")
print(f"Return code: {result.return_code}")
print(f"Stdout: {result.stdout}")
print(f"Stderr: {result.stderr}")
assert result.return_code == 0
assert "Got number: 123" in result.stdout
assert "Hello stderr!" in result.stderr
asyncio.run(run_test())
if __name__ == "__main__":
main()