From 60dff8f01db7aba73cb007e9727c52ce411cf93e Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Wed, 27 Jul 2022 17:16:50 +0100 Subject: [PATCH 01/21] Add SDK and other utilities to Docker container --- Dockerfile | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Dockerfile b/Dockerfile index 59c2f6e7..e214ac7c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,6 +26,25 @@ RUN apt-get update && \ apt-get install -y pt-web-vnc && \ apt-get clean +# Install pitop SDK prerequisites +RUN apt-get update && \ + apt-get install -y pkg-config libsystemd0 libsystemd-dev && \ + pip3 install cmake && \ + apt-get clean + +# Install pitop SDK +# using pip for onnxruntime as there is only armhf debian build +RUN pip3 install pitop==0.26.3.post1 + +# Install useful extras from pt-os +RUN apt-get update && \ + # not installable apt-get install -y pt-os-ui-mods && \ + apt-get install -y chromium && \ + apt-get install -y vim && \ + apt-get install -y python3-matplotlib && \ + # no audio DEBIAN_FRONTEND='noninteractive' apt-get install -y sonic-pi python3-sonic && \ + apt-get clean + WORKDIR /further-link COPY pyproject.toml setup.py setup.cfg MANIFEST.in ./ From 8f479100a0d07866b1c8ca0d617eb217c82025c8 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Fri, 7 Oct 2022 13:49:58 +0100 Subject: [PATCH 02/21] Correct exposed novnc port range --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e214ac7c..cacebfa8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -59,5 +59,5 @@ ENV PYTHONUNBUFFERED=1 ENV PYTHON_PACKAGE_VERSION=$PYTHON_PACKAGE_VERSION EXPOSE 8028 -EXPOSE 60100-60999 +EXPOSE 61100-61999 CMD [ "further-link" ] From 5c4f88ccf80294dc6d768fe4220fde1b48069a6e Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Wed, 12 Oct 2022 18:08:45 +0100 Subject: [PATCH 03/21] Add FURTHER_LINK_MAX_PROCESSES env var --- Dockerfile | 3 ++- further_link/runner/process_handler.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index cacebfa8..ee723f11 100644 --- a/Dockerfile +++ b/Dockerfile @@ -55,9 +55,10 @@ COPY further_link further_link RUN pip3 install . ENV FURTHER_LINK_NOSSL=true +ENV FURTHER_LINK_MAX_PROCESSES=4 ENV PYTHONUNBUFFERED=1 ENV PYTHON_PACKAGE_VERSION=$PYTHON_PACKAGE_VERSION EXPOSE 8028 -EXPOSE 61100-61999 +EXPOSE 61100-61103 CMD [ "further-link" ] diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 0ab3e6d3..6590797a 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -40,7 +40,17 @@ class InvalidOperation(Exception): # each run will need a unique id # one use of the id is for the pt-web-vnc virtual display id and port numbers # so we must use +ve int < 1000, with 0-99 reserved for other uses -id_generator = IdGenerator(min_value=100, max_value=999) +# envvar FURTHER_LINK_MAX_PROCESSES can be used to limit the range +var_max_processes = os.environ.get("FURTHER_LINK_MAX_PROCESSES") +MAX = 900 +if isinstance(var_max_processes, str) and var_max_processes.isdigit(): + max_processes = int(var_max_processes) +else: + max_processes = MAX +if 1 > max_processes > MAX: + max_processes = MAX + +id_generator = IdGenerator(min_value=100, max_value=99 + max_processes) class ProcessHandler: From e6c063913c74ef65b7c83c67b4cbcfd683bc3812 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 13 Oct 2022 10:53:56 +0100 Subject: [PATCH 04/21] Set PITOP_VIRTUAL_HARDWARE in dockerfile --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index ee723f11..9241c81a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,6 +56,7 @@ RUN pip3 install . ENV FURTHER_LINK_NOSSL=true ENV FURTHER_LINK_MAX_PROCESSES=4 +ENV PITOP_VIRTUAL_HARDWARE=1 ENV PYTHONUNBUFFERED=1 ENV PYTHON_PACKAGE_VERSION=$PYTHON_PACKAGE_VERSION From 4e47f44319425042707853bb1eee2f1dd95fd87d Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 27 Oct 2022 17:16:53 +0100 Subject: [PATCH 05/21] Fix blocking io in upload utils --- further_link/endpoint/upload.py | 4 +- further_link/util/upload.py | 69 +++++++++++++++++++++++---------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/further_link/endpoint/upload.py b/further_link/endpoint/upload.py index 5c187506..9a9a98b2 100644 --- a/further_link/endpoint/upload.py +++ b/further_link/endpoint/upload.py @@ -1,4 +1,5 @@ import json +import logging from aiohttp import web @@ -23,7 +24,8 @@ async def upload(request): except (web.HTTPBadRequest, json.decoder.JSONDecodeError): raise web.HTTPBadRequest() - except BadUpload: + except BadUpload as e: + logging.exception(f"Upload error: {e}") raise web.HTTPInternalServerError() return web.Response(text="OK") diff --git a/further_link/util/upload.py b/further_link/util/upload.py index 0bd362d5..e6e7b7c9 100644 --- a/further_link/util/upload.py +++ b/further_link/util/upload.py @@ -1,4 +1,6 @@ +import asyncio import os +from functools import partial from shutil import rmtree import aiofiles @@ -14,6 +16,18 @@ class BadUpload(Exception): pass +async def exists(path): + await asyncio.to_thread(partial(os.path.exists, path)) + + +async def chown(path, uid, gid): + await asyncio.to_thread(partial(os.chown, path, uid, gid)) + + +async def symlink(src, dst): + await asyncio.to_thread(partial(os.symlink, src, dst)) + + def file_is_valid(file): return ( "type" in file @@ -52,6 +66,25 @@ def create_directory(directory_path: str, user: str = None): os.chown(subpath, uid=get_uid(user), gid=get_gid(user)) +def clear_directory(directory_path: str): + # clear the upload directory every time + for filename in os.listdir(directory_path): + file_path = os.path.join(directory_path, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + rmtree(file_path) + except Exception: + pass + + +def create_empty_directory(directory_path: str, user: str = None): + create_directory(directory_path, user) + # if the directory already existed, ensure it is empty + clear_directory(directory_path) + + def valid_url_content(content): return ( "url" in content @@ -121,6 +154,12 @@ def is_sub_directory(sub_dir, from_dir): return os.path.realpath(sub_dir).startswith(os.path.realpath(from_dir)) +def create_alias_subdirs(alias_path, user): + alias_dir = os.path.dirname(alias_path) + if not os.path.exists(alias_dir): + create_directory(alias_dir, user) + + async def do_upload(directory, work_dir, user=None): try: if user is None: @@ -129,26 +168,13 @@ async def do_upload(directory, work_dir, user=None): directory_name = directory["name"] directory_path = get_directory_path(work_dir, directory_name) - create_directory(directory_path, user) - - # clear the upload directory every time - for filename in os.listdir(directory_path): - file_path = os.path.join(directory_path, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - rmtree(file_path) - except Exception: - pass + await asyncio.to_thread(partial(create_empty_directory, directory_path, user)) for alias_name, file_info in directory["files"].items(): alias_path = get_alias_path(directory_path, alias_name) # support for creating subdirs in alias name - alias_dir = os.path.dirname(alias_path) - if not os.path.exists(alias_dir): - create_directory(alias_dir, user) + await asyncio.to_thread(partial(create_alias_subdirs, alias_path, user)) if file_info["type"] == "url": content = file_info["content"] @@ -162,19 +188,20 @@ async def do_upload(directory, work_dir, user=None): # url type files have a cache dir to prevent repeat download bucket_cache_path = get_bucket_cache_path(work_dir, bucket_name) - if not os.path.exists(bucket_cache_path): - create_directory(bucket_cache_path, user) + await asyncio.to_thread( + partial(create_directory, bucket_cache_path, user) + ) cache_file_path = get_cache_file_path(bucket_cache_path, file_name) # only download the file if it's not in the cache - if not os.path.exists(cache_file_path): + if not (await exists(cache_file_path)): await download_file(url, cache_file_path) # set ownership of file to the correct user if user: - os.chown(cache_file_path, uid=get_uid(user), gid=get_gid(user)) + await chown(cache_file_path, uid=get_uid(user), gid=get_gid(user)) # create a symlink pointing to the cached downloaded file - os.symlink(cache_file_path, alias_path) + await symlink(cache_file_path, alias_path) elif file_info["type"] == "text": content = file_info["content"] @@ -187,7 +214,7 @@ async def do_upload(directory, work_dir, user=None): # set ownership of file to the correct user if user: - os.chown(alias_path, uid=get_uid(user), gid=get_gid(user)) + await chown(alias_path, uid=get_uid(user), gid=get_gid(user)) except Exception as e: raise BadUpload(e) From 18782489a3f8ee5cdbcff1187013cd37b3a79eec Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 27 Oct 2022 17:23:54 +0100 Subject: [PATCH 06/21] Drop support for run-py API and debian buster --- .github/workflows/test.yml | 2 +- docs/README.md | 118 +---- further_link/__main__.py | 4 - further_link/endpoint/run_py.py | 137 ------ further_link/runner/run_py_process_handler.py | 237 ---------- tests/e2e/__init__.py | 1 - tests/e2e/conftest.py | 20 +- tests/e2e/test_run_py.py | 433 ------------------ tests/e2e/test_run_py_upload.py | 134 ------ 9 files changed, 3 insertions(+), 1083 deletions(-) delete mode 100644 further_link/endpoint/run_py.py delete mode 100644 further_link/runner/run_py_process_handler.py delete mode 100644 tests/e2e/test_run_py.py delete mode 100644 tests/e2e/test_run_py_upload.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dad0eeee..6ddd859f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - debian: [buster, bullseye] + debian: [bullseye] steps: - name: Checkout code uses: actions/checkout@v2.3.4 diff --git a/docs/README.md b/docs/README.md index b74a1057..a05649f0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -46,7 +46,7 @@ directory with the names `cert.pem` and `key.pem`. ### Working directory The default working directory where files are uploaded and executed from is `~/further`. This can be overridden by setting env var FURTHER_LINK_WORK_DIR. -For more information related to the `run-py` API see the options of the +For more information related to the `run` API see the options of the `upload` and `start` messages in the documentation below. ## API @@ -57,122 +57,6 @@ For more information related to the `run-py` API see the options of the ([example](../tests/test_data/upload_data.py)). Responds after creating files is complete with a simple 200 OK. -### Websocket Endpoint /run-py -Each websocket client connected on `/run-py` can manage a single python process -at a time. - -#### Example usage -- Connect websocket on `/run-py` (using [websocat](https://github.com/vi/websocat)): -``` -websocat ws://localhost:8028/run-py -``` -- Send `start` command with `sourceScript`: -``` -{ "type": "start", "data": { "sourceScript": "print('hi')" } } -``` -- Receive `stdout` response: -``` -{ "type": "stdout", "data": { "output": "hi\n" } } -``` -- Receive `stopped` response: -``` -{ "type": "stopped", "data": { "exitCode": 0 } } -``` - -#### Spec -##### Options -This connection has some options which can be selected with query -parameters: - -``` -/run-py?user=root -``` -The `user` parameter is used to select the Linux user which the code is -executed as. By default the `pi` user is selected if it exists, otherwise -the user executing the server is used. - -``` -/run-py?pty=1 -``` -The pty parameter, if set to 1 or true, will create a pseudoterminal interface -for the python process IO streams, in order to provide terminal behaviours such -as 'cooked mode'. This is useful to produce identical behaviour of programs to -that on the command line and to easily interface with terminal emulators such -as [xterm.js](https://github.com/xtermjs/xterm.js/). - -##### Message Types -Websocket messages sent between client and server must be in JSON with two top -level properties: required string `type` and optional object `data`. - -Message types accepted by the server are: -``` -{ - "type":"[ping|start|stop|stdin|upload|keyevent]", - "data": {...} -} -``` - -Message types sent from the server are: -``` -{ - "type":"[pong|error|started|stopped|stdout|stderr|uploaded|keylisten]", - "data": {...} -} -``` - -Message and response details: -- `ping` command from the client will be met with a `pong` response from - the server. This can be used to keep the socket active to prevent automatic - closures. -- `pong` response is sent by the server immediately after a `ping` is received -
- -- `error` response is sent for bad commands or server errors e.g. `data: { message: "something went wrong and it's not your python code" }` -
- -- `start` command will start a new python process. The code to run can be - specified in data as either a `souceScript` or `sourcePath`. For - `sourceScript` an additional `directoryName` can be passed to specify an - (uploaded) directory to run the script in, within the work dir, otherwise - `/tmp` is used. If `sourcePath` is not absolute, it is assumed to be - relative to the work dir. - e.g. `data: {sourceScript:"print('hi')", directoryName: "myproject"}` - or `data: {sourcePath: "myproject/run.py"}` -- `started` response is sent after a successful process `start`, has no data. -
- -- `stdin` command is used to send data to process stdin e.g. `data: { input: "this can be read by python\n" }`. -- `stdout` response is sent when process prints to stdout. e.g. `data: { output: "this was printed by python" }` -- `stderr` response is sent when process prints to stderr e.g. `data: { output: "Traceback bleh bleh" }` -
- -- `stop` command is used to stop a running process early, has no data. -- `stopped` response is sent when a process finished and has the exit code in e.g. `data: { exitCode: 0 }` -
- -- `upload` command is used to create a directory of files in the work dir. - Files are provided in the data as `text` type, with the text content, or - `url` type, with a url for the file to be downloaded from. - [Example](..tests/test_data/upload_data.py) -- `uploaded` response is sent after a successful upload , has no data. -
- -- `video` response is sent by the server with data.output containing a base64 - encoded mjpeg frame for the client to render as a video feed. -
- -- `keylisten` message is sent by the server to indicate it would like to - receive keyboard events from the client for a specific key. This would be - initiated by user code using the further_link.KeyboardButton python module. - Keys are specified as web [KeyboardEvent.key](https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key) - strings e.g. `data: { key: "ArrowUp" }` -- `keyevent` message is sent by the client to provide keyboard events to the - server so that they can be forwarded to user code using - further_link.KeyboardButton. The data includes a key string matching those - used in `keylisten` and an event string which is either "keydown" or - "keyup" e.g. `data: { key: "ArrowUp", event: "keydown" }` -
- ### Websocket Endpoint /run Each websocket client connected on `/run` can manage multuiple processes of different types, addressing them by a unique id. diff --git a/further_link/__main__.py b/further_link/__main__.py index 6d19591f..196c2358 100755 --- a/further_link/__main__.py +++ b/further_link/__main__.py @@ -9,7 +9,6 @@ from further_link.endpoint.apt_version import apt_version from further_link.endpoint.run import run as run_handler -from further_link.endpoint.run_py import run_py from further_link.endpoint.upload import upload from further_link.util import vnc from further_link.util.ssl_context import ssl_context @@ -56,9 +55,6 @@ async def version(_): status_resource = cors.add(app.router.add_resource("/upload")) cors.add(status_resource.add_route("POST", upload)) - exec_resource = cors.add(app.router.add_resource("/run-py")) - cors.add(exec_resource.add_route("GET", run_py)) - exec_resource = cors.add(app.router.add_resource("/run")) cors.add(exec_resource.add_route("GET", run_handler)) diff --git a/further_link/endpoint/run_py.py b/further_link/endpoint/run_py.py deleted file mode 100644 index bbd697f3..00000000 --- a/further_link/endpoint/run_py.py +++ /dev/null @@ -1,137 +0,0 @@ -import asyncio -import logging - -from aiohttp import web - -from ..runner.run_py_process_handler import InvalidOperation, RunPyProcessHandler -from ..util.message import BadMessage, create_message, parse_message -from ..util.upload import BadUpload, directory_is_valid, do_upload - - -async def handle_message(message, process_handler, socket): - m_type, m_data, m_process = parse_message(message) - - if ( - m_type == "start" - and "sourceScript" in m_data - and isinstance(m_data.get("sourceScript"), str) - ): - path = ( - m_data.get("directoryName") - if ( - "directoryName" in m_data - and isinstance(m_data.get("directoryName"), str) - and len(m_data.get("directoryName")) > 0 - ) - else None - ) - await process_handler.start( - script=m_data["sourceScript"], - path=path, - ) - - elif ( - m_type == "start" - and "sourcePath" in m_data - and isinstance(m_data.get("sourcePath"), str) - and len(m_data.get("sourcePath")) > 0 - ): - await process_handler.start(path=m_data["sourcePath"]) - - elif ( - m_type == "upload" - and "directory" in m_data - and directory_is_valid(m_data.get("directory")) - ): - await do_upload( - m_data.get("directory"), process_handler.work_dir, m_data.get("user") - ) - await socket.send_str(create_message("uploaded")) - - elif ( - m_type == "stdin" and "input" in m_data and isinstance(m_data.get("input"), str) - ): - await process_handler.send_input(m_data["input"]) - - elif m_type == "ping": - await socket.send_str(create_message("pong")) - - elif m_type == "stop": - process_handler.stop() - - elif ( - m_type == "keyevent" - and "key" in m_data - and isinstance(m_data.get("key"), str) - and "event" in m_data - and isinstance(m_data.get("event"), str) - ): - await process_handler.send_key_event(m_data["key"], m_data["event"]) - - else: - raise BadMessage() - - -async def run_py(request): - query_params = request.query - user = query_params.get("user", None) - pty = query_params.get("pty", "").lower() in ["1", "true"] - - socket = web.WebSocketResponse() - await socket.prepare(request) - - async def on_start(): - await socket.send_str(create_message("started")) - logging.info(f"{process_handler.id} Started") - - async def on_stop(exit_code): - try: - await socket.send_str(create_message("stopped", {"exitCode": exit_code})) - except ConnectionResetError: - pass # already disconnected - logging.info(f"{process_handler.id} Stopped") - - async def on_output(channel, output): - logging.debug(f"{process_handler.id} Sending Output {channel} {output}") - await socket.send_str(create_message(channel, {"output": output})) - - process_handler = RunPyProcessHandler(user=user, pty=pty) - process_handler.on_start = on_start - process_handler.on_stop = on_stop - process_handler.on_output = on_output - logging.info(f"{process_handler.id} New connection") - - try: - async for message in socket: - logging.debug(f"{process_handler.id} Received Message {message.data}") - try: - await handle_message(message.data, process_handler, socket) - - except BadUpload: - logging.exception(f"{process_handler.id} Bad Upload") - await socket.send_str( - create_message("error", {"message": "Bad upload"}) - ) - - except (BadMessage, InvalidOperation): - logging.exception(f"{process_handler.id} Bad Message") - await socket.send_str( - create_message("error", {"message": "Bad message"}) - ) - - except Exception: - logging.exception(f"{process_handler.id} Message Exception") - await socket.send_str( - create_message("error", {"message": "Message Exception"}) - ) - - except asyncio.CancelledError: - pass - finally: - await socket.close() - - logging.info(f"{process_handler.id} Closed connection") - if process_handler.is_running(): - process_handler.stop() - - return socket diff --git a/further_link/runner/run_py_process_handler.py b/further_link/runner/run_py_process_handler.py deleted file mode 100644 index 24e9a125..00000000 --- a/further_link/runner/run_py_process_handler.py +++ /dev/null @@ -1,237 +0,0 @@ -import asyncio -import os -import pathlib -import pty -import signal -from functools import partial - -import aiofiles - -from ..util.async_helpers import ringbuf_read -from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup -from ..util.sdk import get_first_display -from ..util.upload import create_directory -from ..util.user_config import ( - default_user, - get_current_user, - get_gid, - get_temp_dir, - get_uid, - get_working_directory, - user_exists, -) - -SERVER_IPC_CHANNELS = [ - "video", - "keylisten", -] - -dirname = pathlib.Path(__file__).parent.absolute() - - -class InvalidOperation(Exception): - pass - - -class RunPyProcessHandler: - def __init__(self, user=None, pty=False): - self.user = default_user() if user is None else user - self.pty = pty - self.work_dir = get_working_directory(user) - self.temp_dir = get_temp_dir() - - self.id = str(id(self)) - self.process = None - self.pgid = None - - def __del__(self): - if self.is_running(): - self.stop() - - async def start(self, script=None, path=None): - if self.is_running(): - raise InvalidOperation() - - entrypoint = await self._get_entrypoint(script, path) - self._remove_entrypoint = entrypoint if script is not None else None - - stdio = asyncio.subprocess.PIPE - - if self.pty: - # communicate through a pty for terminal 'cooked mode' behaviour - master, slave = pty.openpty() - self.pty_master = await aiofiles.open(master, "w+b", 0) - self.pty_slave = await aiofiles.open(slave, "r+b", 0) - - stdio = self.pty_slave - - cmd = "python3 -u " + entrypoint - if self.user != get_current_user() and user_exists(self.user): - cmd = f"sudo -u {self.user} {cmd}" - - process_env = os.environ.copy() - - # Ensure that DISPLAY is set, so that user can open GUI windows - display = get_first_display() - if display is not None: - process_env["DISPLAY"] = display - - self.process = await asyncio.create_subprocess_exec( - *cmd.split(), - stdin=stdio, - stdout=stdio, - stderr=stdio, - env=process_env, - cwd=os.path.dirname(entrypoint), - preexec_fn=os.setsid, - ) # make a process group for this and children - - self.pgid = os.getpgid(self.process.pid) # retain for cleanup - - asyncio.create_task(self._ipc_communicate()) # after exec as uses pgid - asyncio.create_task(self._process_communicate()) - - if self.on_start: - await self.on_start() - - def is_running(self): - return hasattr(self, "process") and self.process is not None - - def stop(self): - if not self.is_running(): - raise InvalidOperation() - # send TERM to process group in case we have child processes - try: - os.killpg(self.pgid, signal.SIGTERM) - except ProcessLookupError: - pass - - async def send_input(self, content): - if not self.is_running() or not isinstance(content, str): - raise InvalidOperation() - - content_bytes = content.encode("utf-8") - - if self.pty: - await self.pty_master.write(content_bytes) - else: - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() - - async def send_key_event(self, key, event): - if ( - not self.is_running() - or not isinstance(key, str) - or not isinstance(event, str) - ): - raise InvalidOperation() - - content_bytes = f"{key} {event}".encode("utf-8") - await async_ipc_send("keyevent", content_bytes, pgid=self.pgid) - - async def _get_entrypoint(self, script=None, path=None): - if isinstance(path, str): - # path is absolute or relative to work_dir - first_char = path[0] - if first_char != "/": - path = os.path.join(self.work_dir, path) - - path_dirs = path if isinstance(script, str) else os.path.dirname(path) - - # if there's a script to create, create path dirs for it to go in - if not os.path.exists(path_dirs) and isinstance(script, str): - create_directory(path_dirs, self.user) - - if isinstance(script, str): - # write script to file, at path if given, otherwise temp - entrypoint = self._get_script_filename(path) - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(script) - - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) - - return entrypoint - - if path is not None: - return path - - raise InvalidOperation() - - def _get_script_filename(self, path=None): - dir = path if isinstance(path, str) else self.temp_dir - return dir + "/" + self.id + ".py" - - async def _ipc_communicate(self): - self.ipc_tasks = [] - for channel in SERVER_IPC_CHANNELS: - self.ipc_tasks.append( - asyncio.create_task( - async_start_ipc_server( - channel, partial(self.on_output, channel), pgid=self.pgid - ) - ) - ) - - async def _process_communicate(self): - output_tasks = [] - if self.pty: - output_tasks.append( - asyncio.create_task(self._handle_output(self.pty_master, "stdout")) - ) - else: - output_tasks.append( - asyncio.create_task(self._handle_output(self.process.stdout, "stdout")) - ) - output_tasks.append( - asyncio.create_task(self._handle_output(self.process.stderr, "stderr")) - ) - - # wait for process to exit - exit_code = await self.process.wait() - - # stop ongoing io tasks - await asyncio.sleep(0.1) - for task in output_tasks: - task.cancel() - await asyncio.wait(output_tasks) - - for task in self.ipc_tasks: - task.cancel() - await asyncio.wait(self.ipc_tasks) - - await self._clean_up() - self.process = None - - if self.on_stop: - await self.on_stop(exit_code) - - async def _handle_output(self, stream, channel): - await ringbuf_read( - stream, - output_callback=partial(self.on_output, channel), - buffer_time=0.1, - max_chunks=50, - chunk_size=256, - done_condition=self.process.wait, - ) - - async def _clean_up(self): - # aiofiles.os.remove not released to debian buster - # os.remove should not block significantly, just fires a single syscall - try: - if self._remove_entrypoint is not None: - os.remove(self._remove_entrypoint) - except Exception: - pass - - try: - if self.pty: - self.pty_master.close() - self.pty_slave.close() - os.remove(self.pty_master) - os.remove(self.pty_slave) - except Exception: - pass - - for channel in SERVER_IPC_CHANNELS: - ipc_cleanup(channel, pgid=self.pgid) diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py index e274019b..73270660 100644 --- a/tests/e2e/__init__.py +++ b/tests/e2e/__init__.py @@ -6,4 +6,3 @@ VERSION_PATH = "/version" UPLOAD_PATH = "/upload" RUN_PATH = "/run" -RUN_PY_PATH = "/run-py" diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 681ef5e4..86c12a9c 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -5,7 +5,7 @@ from further_link.__main__ import create_app -from . import RUN_PATH, RUN_PY_PATH +from . import RUN_PATH @pytest.fixture @@ -19,24 +19,6 @@ async def http_client(aiohttp_client): yield client -@pytest.fixture() -async def run_py_ws_client(aiohttp_client): - client = await aiohttp_client(create_app()) - async with client.ws_connect(RUN_PY_PATH, receive_timeout=0.1) as client: - yield client - - -run_py_ws_client2 = run_py_ws_client - - -@pytest.fixture() -async def run_py_ws_client_query(aiohttp_client, query_params): - url = RUN_PY_PATH + "?" + urllib.parse.urlencode(query_params) - client = await aiohttp_client(create_app()) - async with client.ws_connect(url, receive_timeout=0.1) as client: - yield client - - @pytest.fixture() async def run_ws_client(aiohttp_client): client = await aiohttp_client(create_app()) diff --git a/tests/e2e/test_run_py.py b/tests/e2e/test_run_py.py deleted file mode 100644 index 5a3b6572..00000000 --- a/tests/e2e/test_run_py.py +++ /dev/null @@ -1,433 +0,0 @@ -import asyncio -import os -from datetime import datetime -from shutil import copy - -import pytest - -from further_link import __version__ -from further_link.util.message import create_message, parse_message - -from ..dirs import WORKING_DIRECTORY -from . import E2E_PATH -from .helpers import receive_data, wait_for_data - - -@pytest.mark.asyncio -async def test_bad_message(run_py_ws_client): - start_cmd = create_message("start") - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "error", "message", "Bad message") - - -@pytest.mark.asyncio -async def test_run_code_script(run_py_ws_client): - code = """\ -from datetime import datetime -print(datetime.now().strftime("%A")) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_script_with_directory(run_py_ws_client): - code = """\ -from datetime import datetime -print(datetime.now().strftime("%A")) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": "my-dirname"} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_relative_path(run_py_ws_client): - copy("{}/test_data/print_date.py".format(E2E_PATH), WORKING_DIRECTORY) - - start_cmd = create_message("start", {"sourcePath": "print_date.py"}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_absolute_path(run_py_ws_client): - start_cmd = create_message( - "start", {"sourcePath": "{}/test_data/print_date.py".format(E2E_PATH)} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("query_params", [{"user": "root"}]) -@pytest.mark.skip(reason="Won't work in CI due to old sudo version") -async def test_run_as_user(run_py_ws_client_query): - # This test assumes non-root user with nopasswd sudo access... - code = "import getpass\nprint(getpass.getuser())" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client_query.send_str(start_cmd) - - await receive_data(run_py_ws_client_query, "started") - - await wait_for_data(run_py_ws_client_query, "stdout", "output", "root\n") - - await wait_for_data(run_py_ws_client_query, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_stop_early(run_py_ws_client): - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - -@pytest.mark.asyncio -async def test_bad_code(run_py_ws_client): - code = "i'm not valid python" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await asyncio.sleep(0.1) # wait for data - message = await run_py_ws_client.receive() - m_type, m_data, m_process = parse_message(message.data) - assert m_type == "stderr" - lines = m_data["output"].split("\n") - assert lines[0].startswith(" File") - assert lines[1] == " i'm not valid python" - assert lines[2][-1] == "^" - assert lines[3] == "SyntaxError: EOL while scanning string literal" - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 1) - - -@pytest.mark.asyncio -async def test_input(run_py_ws_client): - code = """s = input() -while "BYE" != s: - print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()]) - s = input()""" - - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data( - run_py_ws_client, "stdout", "output", "HUH?! SPEAK UP, SONNY!\n" - ) - - user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "NO, NOT SINCE 1930\n") - - user_input = create_message("stdin", {"input": "BYE\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("query_params", [{"pty": "1"}]) -async def test_input_pty(run_py_ws_client_query): - code = """s = input() -while "BYE" != s: - print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()]) - s = input()""" - - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client_query.send_str(start_cmd) - - await receive_data(run_py_ws_client_query, "started") - - user_input = create_message("stdin", {"input": "hello\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data( - run_py_ws_client_query, - "stdout", - "output", - "hello\r\nHUH?! SPEAK UP, SONNY!\r\n", - ) - - user_input = create_message("stdin", {"input": "HEY GRANDMA\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data( - run_py_ws_client_query, - "stdout", - "output", - "HEY GRANDMA\r\nNO, NOT SINCE 1930\r\n", - ) - - user_input = create_message("stdin", {"input": "BYE\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data(run_py_ws_client_query, "stdout", "output", "BYE\r\n") - - await wait_for_data(run_py_ws_client_query, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_two_clients(run_py_ws_client, run_py_ws_client2): - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await run_py_ws_client2.send_str(start_cmd) - - await receive_data(run_py_ws_client2, "started") - - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15, 100) - - stop_cmd = create_message("stop") - await run_py_ws_client2.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client2, "stopped", "exitCode", -15, 100) - - -@pytest.mark.asyncio -async def test_out_of_order_commands(run_py_ws_client): - # send input - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send start - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - # started - await receive_data(run_py_ws_client, "started") - - # send start - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # stopped - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - -@pytest.mark.asyncio -async def test_discard_old_input(run_py_ws_client): - code = 'print("hello world")' - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - unterminated_input = create_message("stdin", {"input": "unterminated input"}) - await run_py_ws_client.send_str(unterminated_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "hello world\n", 100) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - code = "print(input())" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "hello\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_use_lib(run_py_ws_client): - code = """\ -from further_link import __version__ -print(__version__) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stdout", "output", f"{__version__}\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.skipif("DISPLAY" not in os.environ, reason="requires UI") -async def test_use_display(run_py_ws_client): - code = """\ -from turtle import color -color('red') -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0, 5000) - - -@pytest.mark.asyncio -async def test_keyevent(run_py_ws_client): - code = """\ -from further_link import KeyboardButton -from signal import pause -a = KeyboardButton('a') -b = KeyboardButton('b') -a.when_pressed = lambda: print('a pressed') -b.when_released = lambda: print('b released') -pause() -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - await wait_for_data(run_py_ws_client, "keylisten", "output", "a") - await wait_for_data(run_py_ws_client, "keylisten", "output", "b") - - # keyevent ipc server can take a moment to start up - await asyncio.sleep(0.1) - - await run_py_ws_client.send_str( - create_message("keyevent", {"key": "a", "event": "keydown"}) - ) - - await wait_for_data(run_py_ws_client, "stdout", "output", "a pressed\n") - - await run_py_ws_client.send_str( - create_message("keyevent", {"key": "b", "event": "keyup"}) - ) - - await wait_for_data(run_py_ws_client, "stdout", "output", "b released\n") - - await run_py_ws_client.send_str(create_message("stop")) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - -jpeg_pixel_b64 = "/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAABAAEBAREA/8QAFAABAAAAAAAAAAAAAAAAAAAAAP/EABQQAQAAAAAAAAAAAAAAAAAAAAD/2gAIAQEAAD8AP//Z" # noqa: E501 - - -@pytest.mark.asyncio -async def test_send_image_pil(run_py_ws_client): - code = """\ -from further_link import send_image -from PIL.Image import effect_noise -send_image(effect_noise((1, 1), 0)) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_send_image_opencv(run_py_ws_client): - code = """\ -from numpy import array -from further_link import send_image -from PIL.Image import effect_noise -send_image(array(effect_noise((1, 1), 0))) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_send_image_with_directory(run_py_ws_client): - code = """\ -from further_link import send_image -from PIL.Image import effect_noise -send_image(effect_noise((1, 1), 0)) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": "my-dirname"} - ) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) diff --git a/tests/e2e/test_run_py_upload.py b/tests/e2e/test_run_py_upload.py deleted file mode 100644 index 1b63a00e..00000000 --- a/tests/e2e/test_run_py_upload.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import os - -import aiofiles -import pytest - -from further_link.util.message import create_message, parse_message -from further_link.util.upload import get_bucket_cache_path, get_directory_path - -from ..dirs import WORKING_DIRECTORY -from .helpers import receive_data, wait_for_data -from .test_data.upload_data import directory - - -@pytest.mark.asyncio -async def test_upload(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - directory_path = get_directory_path(WORKING_DIRECTORY, directory["name"]) - - for alias_name, file_info in directory["files"].items(): - alias_path = os.path.join(directory_path, alias_name) - - assert os.path.isfile(alias_path) - - if file_info["type"] == "url": - content = file_info["content"] - bucket_name = content["bucketName"] - file_name = content["fileName"] - bucket_cache_path = get_bucket_cache_path(WORKING_DIRECTORY, bucket_name) - file_path = os.path.join(bucket_cache_path, file_name) - assert os.path.isfile(file_path) - - elif file_info["type"] == "url": - async with aiofiles.open(file_path) as file: - content = await file.read() - assert content == file_info["content"]["text"] - - -@pytest.mark.asyncio -async def test_upload_read_file(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - code = """\ -import os -with open(os.path.dirname(__file__) + '/cereal.csv', 'r') as f: - print(f.read(1000)) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": directory["name"]} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await asyncio.sleep(0.2) # wait for data - message = await run_py_ws_client.receive() - m_type, m_data, m_process = parse_message(message.data) - assert m_type == "stdout" - assert m_data["output"][788:796] == "Cheerios" - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_upload_import_script(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - code = """\ -from some_lib import call_some_lib -print(call_some_lib()) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": directory["name"]} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stdout", "output", "some lib called\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_upload_bad_file(run_py_ws_client, aioresponses): - aioresponses.add("https://placekitten.com/50/50", status=500) - - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad upload") - - -@pytest.mark.asyncio -async def test_upload_existing_directory(run_py_ws_client): - existing_directory = directory.copy() - existing_directory["name"] = "existing_directory" - - os.mkdir("{}/existing_directory".format(WORKING_DIRECTORY)) - - upload_cmd = create_message("upload", {"directory": existing_directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - -@pytest.mark.asyncio -async def test_upload_restricted_directory(run_py_ws_client): - # name directory something that tries to escape from working dir - restricted_directory = directory.copy() - restricted_directory["name"] = "../injected" - - upload_cmd = create_message("upload", {"directory": restricted_directory}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad upload") - - -@pytest.mark.asyncio -async def test_upload_empty_directory(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": {}}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad message") From 12e5f85a45bbd13571a4fa4a82d3ce9b49e08ee7 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 27 Oct 2022 17:56:04 +0100 Subject: [PATCH 07/21] Avoid aiofiles for upload --- further_link/util/upload.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/further_link/util/upload.py b/further_link/util/upload.py index e6e7b7c9..885481a2 100644 --- a/further_link/util/upload.py +++ b/further_link/util/upload.py @@ -3,7 +3,6 @@ from functools import partial from shutil import rmtree -import aiofiles from aiohttp import ClientSession from ..util.user_config import default_user, get_gid, get_uid @@ -28,6 +27,15 @@ async def symlink(src, dst): await asyncio.to_thread(partial(os.symlink, src, dst)) +async def write_file(path, content, mode="w+"): + # aiofiles.open would sometimes just hang + def sync_write_file(p, c): + with open(p, mode) as file: + file.write(c) + + await asyncio.to_thread(partial(sync_write_file, path, content)) + + def file_is_valid(file): return ( "type" in file @@ -105,8 +113,7 @@ async def download_file(url, file_path): async with session.get(url) as response: assert response.status == 200 - async with aiofiles.open(file_path, "wb") as file: - await file.write(await response.read()) + await write_file(file_path, await response.read(), "wb") def get_directory_path(work_dir, directory_name): @@ -209,8 +216,7 @@ async def do_upload(directory, work_dir, user=None): if not valid_text_content(content): raise Exception("Invalid text content") - async with aiofiles.open(alias_path, "w+") as file: - await file.write(content["text"]) + await write_file(alias_path, content["text"]) # set ownership of file to the correct user if user: From ee932ac2e01aa9385ef4341d1a28b701caf220a2 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Wed, 2 Nov 2022 11:29:32 +0000 Subject: [PATCH 08/21] Add more logging --- further_link/endpoint/run.py | 27 +++++++++++++++++----- further_link/runner/process_handler.py | 31 +++++++++++++++++++++++++- further_link/util/upload.py | 3 +++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/further_link/endpoint/run.py b/further_link/endpoint/run.py index 27c5b373..5801c8e1 100644 --- a/further_link/endpoint/run.py +++ b/further_link/endpoint/run.py @@ -27,18 +27,19 @@ def __init__(self, socket, user=None, pty=False): } async def stop(self): + logging.debug(f"{self.id} Stopping processes: {self.process_handlers}") # the dictionary will be mutated so use list to make a copy for p in list(self.process_handlers.values()): try: await p.stop() except InvalidOperation: - pass + logging.debug(f"{self.id} Invalid operation stopping process: {p}") async def send(self, type, data=None, process_id=None): try: await self.socket.send_str(create_message(type, data, process_id)) except ConnectionResetError: - pass # already disconnected + logging.debug(f"{self.id} Send failed - client disconnected") async def handle_message(self, message): try: @@ -46,6 +47,10 @@ async def handle_message(self, message): process_handler = self.process_handlers.get(m_process) + logging.debug( + f"{self.id} Handling {m_type} message for handler {process_handler}" + ) + if m_type == "ping": await self.send("pong") @@ -99,7 +104,7 @@ async def handle_message(self, message): await process_handler.send_key_event(m_data["key"], m_data["event"]) else: - raise BadMessage() + raise BadMessage(f"Bad {m_type} message for handler {process_handler}") except (BadMessage, InvalidOperation): logging.exception(f"{self.id} Bad Message") @@ -117,7 +122,7 @@ async def add_handler(self, process_id, runner, path, code, novncOptions): async def on_start(): await self.send("started", None, process_id) - logging.info(f"{self.id} Started {process_id}") + logging.info(f"{self.id} Sent started {process_id}") async def on_stop(exit_code): # process_id may be reused with other runners so clean up handler @@ -141,11 +146,18 @@ async def on_display_activity(connection_details: VncConnectionDetails): ) handler = handler_class(self.user, self.pty) + logging.debug( + f"{self.id} Created {runner} handler for {process_id}, ID {handler.id}" + ) + handler.on_start = on_start handler.on_stop = on_stop handler.on_display_activity = on_display_activity handler.on_output = on_output + + logging.debug(f"{self.id} Starting handler {handler.id}") await handler.start(path, code, novncOptions=novncOptions) + logging.debug(f"{self.id} Started handler {handler.id}") self.process_handlers[process_id] = handler @@ -165,11 +177,16 @@ async def run(request): async for message in socket: logging.debug(f"{run_manager.id} Received Message {message.data}") await run_manager.handle_message(message.data) + logging.debug(f"{run_manager.id} Handled Message {message.data}") except asyncio.CancelledError: - pass + logging.debug(f"{run_manager.id} Run cancelled error") + + except Exception as e: + logging.exception(f"{run_manager.id} Run error: {e}") finally: + logging.debug(f"{run_manager.id} Closing connection") await socket.close() logging.info(f"{run_manager.id} Closed connection") await run_manager.stop() diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 6590797a..622979c8 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -75,6 +75,7 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): stdio = asyncio.subprocess.PIPE if self.pty: + logging.debug(f"{self.id} Starting PTY") # communicate through a pty for terminal 'cooked mode' behaviour master, slave = openpty() @@ -90,6 +91,8 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): stdio = self.pty_slave + logging.debug(f"{self.id} Setup pty") + process_env = {**os.environ.copy(), **env} process_env["TERM"] = "xterm-256color" # perhaps should be param @@ -105,6 +108,7 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): # set $DISPLAY so that user can open GUI windows if self.novnc: + logging.debug(f"{self.id} Starting NOVNC") process_env["DISPLAY"] = f":{self.id}" await async_start( display_id=self.id, @@ -114,6 +118,7 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): height=novncOptions.get("height"), width=novncOptions.get("width"), ) + logging.debug(f"{self.id} Setup novnc") else: default_display = get_first_display() if default_display: @@ -136,6 +141,7 @@ def preexec(): # as allowing a shell process to be a 'controlling terminal' os.setsid() + logging.debug(f"{self.id} Starting process") self.process = await asyncio.create_subprocess_exec( *split(command), stdin=stdio, @@ -145,6 +151,7 @@ def preexec(): cwd=self.work_dir, preexec_fn=preexec, ) + logging.debug(f"{self.id} Started process") if self.on_start: await self.on_start() @@ -154,9 +161,12 @@ def preexec(): self.pgid = os.getpgid(self.process.pid) # retain for cleanup asyncio.create_task(self._ipc_communicate()) # after as uses pgid except ProcessLookupError: + logging.debug(f"{self.id} Process lookup failed - running without IPC") # the process is done faster than we can look up gpid! self.pgid = None + logging.debug(f"{self.id} Start complete") + def is_running(self): return hasattr(self, "process") and self.process is not None @@ -165,27 +175,37 @@ async def stop(self): raise InvalidOperation() # send signal to process group in case we have child processes try: + logging.debug(f"{self.id} Stopping with SIGTERM") os.killpg(self.pgid, signal.SIGTERM) stopped = asyncio.create_task(self.process.wait()) done = await timeout(stopped, 0.1) # if SIGTERM didn't stop the process already, send SIGKILL if stopped not in done: + logging.debug(f"{self.id} Stopping with SIGKILL") os.killpg(self.pgid, signal.SIGKILL) + + logging.debug(f"{self.id} Stop complete") except ProcessLookupError: - pass + logging.debug(f"{self.id} Could not stop - ProcessLookupError") async def send_input(self, content): + logging.debug(f"{self.id} Receiving input {content}") + if not self.is_running() or not isinstance(content, str): + logging.debug(f"{self.id} Not receiving input as not running") raise InvalidOperation() content_bytes = content.encode("utf-8") if self.pty: + logging.debug(f"{self.id} Receiving input via pty") await self.pty_master.write(content_bytes) else: + logging.debug(f"{self.id} Receiving input via stdin") self.process.stdin.write(content_bytes) await self.process.stdin.drain() + logging.debug(f"{self.id} Received input") async def resize_pty(self, rows, cols): if not self.is_running() or not self.pty: @@ -229,9 +249,13 @@ async def _process_communicate(self): asyncio.create_task(self._handle_output(self.process.stderr, "stderr")) ) + logging.debug(f"{self.id} Monitoring output") + # wait for process to exit await self.process.wait() + logging.debug(f"{self.id} Process exited") + # wait a little for the io tasks to complete to let them send # output produced right before the process stopped # but cancel them after a timeout if they don't stop themselves @@ -260,7 +284,9 @@ async def _handle_output(self, stream, channel): ) async def _clean_up(self): + logging.debug(f"{self.id} Starting cleanup") if getattr(self, "pty", None): + logging.debug(f"{self.id} Cleaning up PTY") try: if getattr(self, "pty_master", None): await self.pty_master.close() @@ -270,12 +296,14 @@ async def _clean_up(self): logging.exception(f"{self.id} PTY Cleanup error: {e}") if getattr(self, "novnc", None): + logging.debug(f"{self.id} Cleaning up NOVNC") try: await async_stop(self.id) except Exception as e: logging.exception(f"{self.id} NOVNC Cleanup error: {e}") if getattr(self, "ipc_tasks", None): + logging.debug(f"{self.id} Cleaning up IPC") try: for channel in SERVER_IPC_CHANNELS: ipc_cleanup(channel, pgid=self.pgid) @@ -285,5 +313,6 @@ async def _clean_up(self): except Exception as e: logging.exception(f"{self.id} IPC Cleanup error: {e}") + logging.debug(f"{self.id} Releasing ID") id_generator.free(self.id) logging.debug(f"{self.id} Cleanup complete") diff --git a/further_link/util/upload.py b/further_link/util/upload.py index 885481a2..6807fca3 100644 --- a/further_link/util/upload.py +++ b/further_link/util/upload.py @@ -30,8 +30,11 @@ async def symlink(src, dst): async def write_file(path, content, mode="w+"): # aiofiles.open would sometimes just hang def sync_write_file(p, c): + print("opening file") with open(p, mode) as file: + print("file open") file.write(c) + print("file written") await asyncio.to_thread(partial(sync_write_file, path, content)) From 651d8398e45c3651f750b3a049f8a1c37b34e5c4 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Wed, 2 Nov 2022 12:26:07 +0000 Subject: [PATCH 09/21] Replace more blocking file operations --- further_link/runner/exec_process_handler.py | 13 +-- further_link/runner/py_process_handler.py | 11 +- further_link/runner/shell_process_handler.py | 2 +- further_link/util/async_files.py | 95 ++++++++++++++++ further_link/util/upload.py | 109 ++++--------------- 5 files changed, 125 insertions(+), 105 deletions(-) create mode 100644 further_link/util/async_files.py diff --git a/further_link/runner/exec_process_handler.py b/further_link/runner/exec_process_handler.py index b1e31c91..9ef2b0ee 100644 --- a/further_link/runner/exec_process_handler.py +++ b/further_link/runner/exec_process_handler.py @@ -2,9 +2,7 @@ import os import pathlib -import aiofiles - -from ..util.upload import create_directory +from ..util.async_files import chmod, chown, create_directory, write_file from ..util.user_config import ( get_absolute_path, get_gid, @@ -22,18 +20,17 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist - create_directory(os.path.dirname(path), self.user) + await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"exec-{self.id}") # create a temporary file to execute if code is provided if code is not None: - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(code) + await write_file(entrypoint, code) self._remove_entrypoint = entrypoint - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) - os.chmod(entrypoint, 0o777) # make it executable + await chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) + await chmod(entrypoint, 0o777) # make it executable command = entrypoint diff --git a/further_link/runner/py_process_handler.py b/further_link/runner/py_process_handler.py index 92cf305c..3e2b2db4 100644 --- a/further_link/runner/py_process_handler.py +++ b/further_link/runner/py_process_handler.py @@ -2,9 +2,7 @@ import os import pathlib -import aiofiles - -from ..util.upload import create_directory +from ..util.async_files import chown, create_directory, write_file from ..util.user_config import ( get_absolute_path, get_gid, @@ -21,19 +19,18 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist - create_directory(os.path.dirname(path), self.user) + await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"{self.id}.py") # create a temporary file to execute if code is provided if code is not None: - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(code) + await write_file(entrypoint, code) self._remove_entrypoint = entrypoint command = "python3 -u " + entrypoint - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) + await chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) work_dir = os.path.dirname(entrypoint) diff --git a/further_link/runner/shell_process_handler.py b/further_link/runner/shell_process_handler.py index 6b6a132c..598b2c07 100644 --- a/further_link/runner/shell_process_handler.py +++ b/further_link/runner/shell_process_handler.py @@ -8,6 +8,6 @@ async def _start(self, path, code=None, novncOptions={}): work_dir = get_absolute_path(path, get_working_directory(self.user)) # create work dir if it doesn't already exist - create_directory(work_dir, self.user) + await create_directory(work_dir, self.user) await super()._start(get_shell(self.user), work_dir, novncOptions=novncOptions) diff --git a/further_link/util/async_files.py b/further_link/util/async_files.py new file mode 100644 index 00000000..87567654 --- /dev/null +++ b/further_link/util/async_files.py @@ -0,0 +1,95 @@ +import asyncio +import os +from functools import partial +from shutil import rmtree + +from aiohttp import ClientSession + +from .user_config import default_user, get_gid, get_uid + +# async file helpers. used instead of aiofiles mainly avoid +# packaging the new more complete versions for debian. aiofiles can still be used if preferred + +# some sync methods are acceptable, such as os.path methods that only do string +# manipulation or os.remove which blocks no more than starting a thread would + + +async def exists(path): + return await asyncio.to_thread(partial(os.path.exists, path)) + + +async def chmod(fd, mode): + await asyncio.to_thread(partial(os.chmod, fd, mode)) + + +async def chown(fd, uid, gid): + await asyncio.to_thread(partial(os.chown, fd, uid, gid)) + + +async def symlink(src, dst): + await asyncio.to_thread(partial(os.symlink, src, dst)) + + +async def write_file(path, content, mode="w+"): + def sync_write_file(p, c): + print("opening file") + with open(p, mode) as file: + print("file open") + file.write(c) + print("file written") + + await asyncio.to_thread(partial(sync_write_file, path, content)) + + +async def download_file(url, file_path): + async with ClientSession() as session: + async with session.get(url) as response: + assert response.status == 200 + + await write_file(file_path, await response.read(), "wb") + + +async def create_directory(directory_path: str, user: str = None): + def _create_directory(directory_path: str, user: str = None): + """ + Create the directories from the provided path level by level, making sure the folders have the correct owner + """ + if user is None: + user = default_user() + + splitted_path = directory_path.split("/") + subpaths = [ + "/".join(splitted_path[:i]) for i in range(2, len(splitted_path) + 1) + ] + + for subpath in subpaths: + if not os.path.isdir(subpath): + os.mkdir(subpath) + + # Update directory owner if on user home + if user and subpath.startswith(os.path.expanduser(f"~{user}")): + os.chown(subpath, uid=get_uid(user), gid=get_gid(user)) + + await asyncio.to_thread(partial(_create_directory, directory_path, user)) + + +async def clear_directory(directory_path: str): + def _clear_directory(directory_path: str): + # clear the upload directory every time + for filename in os.listdir(directory_path): + file_path = os.path.join(directory_path, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + rmtree(file_path) + except Exception: + pass + + await asyncio.to_thread(partial(_clear_directory, directory_path)) + + +async def create_empty_directory(directory_path: str, user: str = None): + await create_directory(directory_path, user) + # in case the directory already existed, ensure it is empty + await clear_directory(directory_path) diff --git a/further_link/util/upload.py b/further_link/util/upload.py index 6807fca3..03051db9 100644 --- a/further_link/util/upload.py +++ b/further_link/util/upload.py @@ -1,12 +1,15 @@ -import asyncio import os -from functools import partial -from shutil import rmtree -from aiohttp import ClientSession - -from ..util.user_config import default_user, get_gid, get_uid -from .user_config import CACHE_DIR_NAME +from .async_files import ( + chown, + create_directory, + create_empty_directory, + download_file, + exists, + symlink, + write_file, +) +from .user_config import CACHE_DIR_NAME, default_user, get_gid, get_uid file_types = ["url", "text"] @@ -15,30 +18,6 @@ class BadUpload(Exception): pass -async def exists(path): - await asyncio.to_thread(partial(os.path.exists, path)) - - -async def chown(path, uid, gid): - await asyncio.to_thread(partial(os.chown, path, uid, gid)) - - -async def symlink(src, dst): - await asyncio.to_thread(partial(os.symlink, src, dst)) - - -async def write_file(path, content, mode="w+"): - # aiofiles.open would sometimes just hang - def sync_write_file(p, c): - print("opening file") - with open(p, mode) as file: - print("file open") - file.write(c) - print("file written") - - await asyncio.to_thread(partial(sync_write_file, path, content)) - - def file_is_valid(file): return ( "type" in file @@ -58,44 +37,6 @@ def directory_is_valid(directory): ) -def create_directory(directory_path: str, user: str = None): - """ - Create the directories from the provided path level by level, making sure the folders have the correct owner - """ - if user is None: - user = default_user() - - splitted_path = directory_path.split("/") - subpaths = ["/".join(splitted_path[:i]) for i in range(2, len(splitted_path) + 1)] - - for subpath in subpaths: - if not os.path.isdir(subpath): - os.mkdir(subpath) - - # Update directory owner if on user home - if user and subpath.startswith(os.path.expanduser(f"~{user}")): - os.chown(subpath, uid=get_uid(user), gid=get_gid(user)) - - -def clear_directory(directory_path: str): - # clear the upload directory every time - for filename in os.listdir(directory_path): - file_path = os.path.join(directory_path, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - rmtree(file_path) - except Exception: - pass - - -def create_empty_directory(directory_path: str, user: str = None): - create_directory(directory_path, user) - # if the directory already existed, ensure it is empty - clear_directory(directory_path) - - def valid_url_content(content): return ( "url" in content @@ -111,12 +52,14 @@ def valid_text_content(content): return "text" in content and isinstance(content["text"], str) -async def download_file(url, file_path): - async with ClientSession() as session: - async with session.get(url) as response: - assert response.status == 200 +def is_sub_directory(sub_dir, from_dir): + return os.path.realpath(sub_dir).startswith(os.path.realpath(from_dir)) - await write_file(file_path, await response.read(), "wb") + +async def create_alias_subdirs(alias_path, user): + alias_dir = os.path.dirname(alias_path) + if not await exists(alias_dir): + await create_directory(alias_dir, user) def get_directory_path(work_dir, directory_name): @@ -160,16 +103,6 @@ def get_cache_file_path(bucket_cache_path, file_name): return cache_file_path -def is_sub_directory(sub_dir, from_dir): - return os.path.realpath(sub_dir).startswith(os.path.realpath(from_dir)) - - -def create_alias_subdirs(alias_path, user): - alias_dir = os.path.dirname(alias_path) - if not os.path.exists(alias_dir): - create_directory(alias_dir, user) - - async def do_upload(directory, work_dir, user=None): try: if user is None: @@ -178,13 +111,13 @@ async def do_upload(directory, work_dir, user=None): directory_name = directory["name"] directory_path = get_directory_path(work_dir, directory_name) - await asyncio.to_thread(partial(create_empty_directory, directory_path, user)) + await create_empty_directory(directory_path, user) for alias_name, file_info in directory["files"].items(): alias_path = get_alias_path(directory_path, alias_name) # support for creating subdirs in alias name - await asyncio.to_thread(partial(create_alias_subdirs, alias_path, user)) + await create_alias_subdirs(alias_path, user) if file_info["type"] == "url": content = file_info["content"] @@ -198,9 +131,7 @@ async def do_upload(directory, work_dir, user=None): # url type files have a cache dir to prevent repeat download bucket_cache_path = get_bucket_cache_path(work_dir, bucket_name) - await asyncio.to_thread( - partial(create_directory, bucket_cache_path, user) - ) + await create_directory(bucket_cache_path, user) cache_file_path = get_cache_file_path(bucket_cache_path, file_name) # only download the file if it's not in the cache if not (await exists(cache_file_path)): From 7ed41cd9fc7ca2ed495f72186c048c07651eb245 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Wed, 2 Nov 2022 18:16:42 +0000 Subject: [PATCH 10/21] Wrap writing stdin in timeout incase it is blocked --- further_link/runner/process_handler.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 622979c8..4600cfc2 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -200,12 +200,21 @@ async def send_input(self, content): if self.pty: logging.debug(f"{self.id} Receiving input via pty") - await self.pty_master.write(content_bytes) + write_task = asyncio.create_task(self.pty_master.write(content_bytes)) else: logging.debug(f"{self.id} Receiving input via stdin") - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() - logging.debug(f"{self.id} Received input") + + async def write(): + self.process.stdin.write(content_bytes) + await self.process.stdin.drain() + + write_task = asyncio.create_task(write()) + + done = await timeout(write_task, 0.1) + if write_task not in done: + logging.debug(f"{self.id} Receiving input timed out") + else: + logging.debug(f"{self.id} Received input") async def resize_pty(self, rows, cols): if not self.is_running() or not self.pty: From bec5b81224995ece84927b44d023cb86af722519 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 11:10:19 +0000 Subject: [PATCH 11/21] Prevent double cleanup --- further_link/runner/process_handler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 4600cfc2..dee8b803 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -276,8 +276,9 @@ async def _process_communicate(self): async def _handle_process_end(self): exit_code = await self.process.wait() - await self._clean_up() - self.process = None + + if self.process: + await self._clean_up() if self.on_stop: await self.on_stop(exit_code) @@ -324,4 +325,6 @@ async def _clean_up(self): logging.debug(f"{self.id} Releasing ID") id_generator.free(self.id) + + self.process = None logging.debug(f"{self.id} Cleanup complete") From 173c1587115bbfc71b4b1dd3e0f6f9c78ea2baf2 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 11:18:45 +0000 Subject: [PATCH 12/21] More logging --- further_link/runner/exec_process_handler.py | 2 ++ further_link/runner/process_handler.py | 4 ++++ further_link/runner/py_process_handler.py | 2 ++ further_link/runner/shell_process_handler.py | 3 +++ further_link/util/async_files.py | 3 --- 5 files changed, 11 insertions(+), 3 deletions(-) diff --git a/further_link/runner/exec_process_handler.py b/further_link/runner/exec_process_handler.py index 9ef2b0ee..7dae0066 100644 --- a/further_link/runner/exec_process_handler.py +++ b/further_link/runner/exec_process_handler.py @@ -20,12 +20,14 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist + logging.debug(f"{self.id} Creating working directory for exec") await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"exec-{self.id}") # create a temporary file to execute if code is provided if code is not None: + logging.debug(f"{self.id} Creating entrypoint for exec") await write_file(entrypoint, code) self._remove_entrypoint = entrypoint diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index dee8b803..24227af7 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -299,9 +299,13 @@ async def _clean_up(self): logging.debug(f"{self.id} Cleaning up PTY") try: if getattr(self, "pty_master", None): + logging.debug(f"{self.id} Closing PTY master") await self.pty_master.close() + logging.debug(f"{self.id} Closed PTY master") if getattr(self, "pty_slave", None): + logging.debug(f"{self.id} Closing PTY slave") await self.pty_slave.close() + logging.debug(f"{self.id} Closed PTY slave") except Exception as e: logging.exception(f"{self.id} PTY Cleanup error: {e}") diff --git a/further_link/runner/py_process_handler.py b/further_link/runner/py_process_handler.py index 3e2b2db4..19cd1c22 100644 --- a/further_link/runner/py_process_handler.py +++ b/further_link/runner/py_process_handler.py @@ -19,12 +19,14 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist + logging.debug(f"{self.id} Creating working directory for python3") await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"{self.id}.py") # create a temporary file to execute if code is provided if code is not None: + logging.debug(f"{self.id} Creating entrypoint for python3") await write_file(entrypoint, code) self._remove_entrypoint = entrypoint diff --git a/further_link/runner/shell_process_handler.py b/further_link/runner/shell_process_handler.py index 598b2c07..497c525f 100644 --- a/further_link/runner/shell_process_handler.py +++ b/further_link/runner/shell_process_handler.py @@ -1,3 +1,5 @@ +import logging + from ..util.upload import create_directory from ..util.user_config import get_absolute_path, get_shell, get_working_directory from .process_handler import ProcessHandler @@ -8,6 +10,7 @@ async def _start(self, path, code=None, novncOptions={}): work_dir = get_absolute_path(path, get_working_directory(self.user)) # create work dir if it doesn't already exist + logging.debug(f"{self.id} Creating working directory for shell") await create_directory(work_dir, self.user) await super()._start(get_shell(self.user), work_dir, novncOptions=novncOptions) diff --git a/further_link/util/async_files.py b/further_link/util/async_files.py index 87567654..c9388a2a 100644 --- a/further_link/util/async_files.py +++ b/further_link/util/async_files.py @@ -32,11 +32,8 @@ async def symlink(src, dst): async def write_file(path, content, mode="w+"): def sync_write_file(p, c): - print("opening file") with open(p, mode) as file: - print("file open") file.write(c) - print("file written") await asyncio.to_thread(partial(sync_write_file, path, content)) From d7f8ff5faa6a13cd4d850b0a3cd9362d10f1f6f0 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 11:54:46 +0000 Subject: [PATCH 13/21] Extract pty class and improve cleanup --- further_link/runner/process_handler.py | 52 ++++++--------------- further_link/util/async_files.py | 4 ++ further_link/util/pty.py | 65 ++++++++++++++++++++++++++ tests/unit/test_process_handler.py | 3 -- 4 files changed, 83 insertions(+), 41 deletions(-) create mode 100644 further_link/util/pty.py diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 24227af7..75e51e49 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -3,17 +3,15 @@ import os import signal from functools import partial -from pty import openpty from shlex import split -import aiofiles from pt_web_vnc.vnc import async_start, async_stop from ..util.async_helpers import ringbuf_read, timeout from ..util.id_generator import IdGenerator from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup +from ..util.pty import Pty from ..util.sdk import get_first_display -from ..util.terminal import set_winsize from ..util.user_config import ( get_current_user, get_gid, @@ -55,7 +53,7 @@ class InvalidOperation(Exception): class ProcessHandler: def __init__(self, user, pty=False): - self.pty = pty + self.use_pty = pty self.id = id_generator.create() assert user_exists(user) self.user = user @@ -74,22 +72,15 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): stdio = asyncio.subprocess.PIPE - if self.pty: + if self.use_pty: logging.debug(f"{self.id} Starting PTY") # communicate through a pty for terminal 'cooked mode' behaviour - master, slave = openpty() - - # on some distros process user must own slave, otherwise you get: - # cannot set terminal process group (-1): Inappropriate ioctl for device - os.chown(slave, get_uid(self.user), get_gid(self.user)) - - self.pty_master = await aiofiles.open(master, "w+b", 0) - self.pty_slave = await aiofiles.open(slave, "r+b", 0) + self.pty = await Pty.create(self.user) # set terminal size to a minimum that we display in Further - set_winsize(slave, 4, 60) + self.pty.set_winsize(4, 60) - stdio = self.pty_slave + stdio = self.pty.back logging.debug(f"{self.id} Setup pty") @@ -190,19 +181,15 @@ async def stop(self): logging.debug(f"{self.id} Could not stop - ProcessLookupError") async def send_input(self, content): - logging.debug(f"{self.id} Receiving input {content}") - if not self.is_running() or not isinstance(content, str): logging.debug(f"{self.id} Not receiving input as not running") raise InvalidOperation() content_bytes = content.encode("utf-8") - if self.pty: - logging.debug(f"{self.id} Receiving input via pty") - write_task = asyncio.create_task(self.pty_master.write(content_bytes)) + if self.use_pty: + write_task = asyncio.create_task(self.pty.front.write(content_bytes)) else: - logging.debug(f"{self.id} Receiving input via stdin") async def write(): self.process.stdin.write(content_bytes) @@ -213,14 +200,12 @@ async def write(): done = await timeout(write_task, 0.1) if write_task not in done: logging.debug(f"{self.id} Receiving input timed out") - else: - logging.debug(f"{self.id} Received input") async def resize_pty(self, rows, cols): - if not self.is_running() or not self.pty: + if not self.is_running() or not self.use_pty: raise InvalidOperation() - set_winsize(self.pty_slave.fileno(), rows, cols) + self.pty.set_winsize(rows, cols) async def send_key_event(self, key, event): if ( @@ -246,9 +231,9 @@ async def _ipc_communicate(self): async def _process_communicate(self): output_tasks = [] - if self.pty: + if self.use_pty: output_tasks.append( - asyncio.create_task(self._handle_output(self.pty_master, "stdout")) + asyncio.create_task(self._handle_output(self.pty.front, "stdout")) ) else: output_tasks.append( @@ -297,17 +282,8 @@ async def _clean_up(self): logging.debug(f"{self.id} Starting cleanup") if getattr(self, "pty", None): logging.debug(f"{self.id} Cleaning up PTY") - try: - if getattr(self, "pty_master", None): - logging.debug(f"{self.id} Closing PTY master") - await self.pty_master.close() - logging.debug(f"{self.id} Closed PTY master") - if getattr(self, "pty_slave", None): - logging.debug(f"{self.id} Closing PTY slave") - await self.pty_slave.close() - logging.debug(f"{self.id} Closed PTY slave") - except Exception as e: - logging.exception(f"{self.id} PTY Cleanup error: {e}") + await self.pty.clean_up() + self.pty = None if getattr(self, "novnc", None): logging.debug(f"{self.id} Cleaning up NOVNC") diff --git a/further_link/util/async_files.py b/further_link/util/async_files.py index c9388a2a..91e15f87 100644 --- a/further_link/util/async_files.py +++ b/further_link/util/async_files.py @@ -14,6 +14,10 @@ # manipulation or os.remove which blocks no more than starting a thread would +async def close(fd): + return await asyncio.to_thread(partial(os.close, fd)) + + async def exists(path): return await asyncio.to_thread(partial(os.path.exists, path)) diff --git a/further_link/util/pty.py b/further_link/util/pty.py new file mode 100644 index 00000000..5ace1d9d --- /dev/null +++ b/further_link/util/pty.py @@ -0,0 +1,65 @@ +import asyncio +import logging +from pty import openpty + +import aiofiles + +from ..util.async_files import chown, close +from ..util.async_helpers import timeout +from ..util.terminal import set_winsize +from ..util.user_config import get_gid, get_uid + + +class Pty: + @classmethod + async def create(cls, user): + self = cls() + self.front_fd, self.back_fd = openpty() + + # on some distros process user must own 'back', otherwise you get: + # cannot set terminal process group (-1): Inappropriate ioctl for device + await chown(self.back_fd, get_uid(user), get_gid(user)) + + self.front = await aiofiles.open(self.front_fd, "w+b", 0) + self.back = await aiofiles.open(self.back_fd, "r+b", 0) + + return self + + def set_winsize(self, rows, cols): + set_winsize(self.back_fd, 4, 60) + + async def write(self, content_bytes): + await self.front.write(content_bytes) + + async def clean_up(self): + logging.debug("PTY Closing master") + await self._clean_up_end(self.front, self.front_fd) + logging.debug("PTY Closing slave") + await self._clean_up_end(self.back, self.back_fd) + logging.debug("PTY Cleanup complete") + + async def _clean_up_end(self, file, fd): + # closing sometimes hangs so use a timeout and try closing fd also + async def close_file(): + try: + logging.debug("PTY Closing file") + await file.close() + except Exception as e: + logging.debug(f"PTY Close file error: {e}") + + close_file_task = asyncio.create_task(close_file()) + done = await timeout(close_file_task, 0.1) + if close_file_task not in done: + logging.debug("PTY Close file timed out") + + async def close_fd(): + try: + logging.debug("PTY Closing fd") + await close(fd) + except Exception as e: + logging.debug(f"PTY Close fd error: {e}") + + close_fd_task = asyncio.create_task(close_fd()) + done = await timeout(close_fd_task, 0.1) + if close_fd_task not in done: + logging.debug("PTY Close fd timed out") diff --git a/tests/unit/test_process_handler.py b/tests/unit/test_process_handler.py index 1ed4c13d..5cb6d40d 100644 --- a/tests/unit/test_process_handler.py +++ b/tests/unit/test_process_handler.py @@ -7,7 +7,6 @@ from unittest.mock import patch import pytest -from aiofiles.threadpool.binary import AsyncFileIO from mock import AsyncMock from further_link.runner.process_handler import ProcessHandler @@ -75,8 +74,6 @@ async def test_pty(): await p.start('python3 -u -c "print(input())"') assert type(p.process) == Process assert p.pty - assert type(p.pty_master) == AsyncFileIO - assert type(p.pty_slave) == AsyncFileIO p.on_start.assert_called() From 605a9c9ecb5360b063aa213ee8a953d3c714fe68 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 16:22:33 +0000 Subject: [PATCH 14/21] Add more debug around pty input --- further_link/runner/process_handler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 75e51e49..8c440230 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -188,6 +188,16 @@ async def send_input(self, content): content_bytes = content.encode("utf-8") if self.use_pty: + logging.debug( + f"Debug PTY front: {self.pty.front}; closed: {self.pty.front.closed}" + ) + fd = self.pty.front.fileno() + stat = None + try: + stat = os.fstat(fd) + except Exception as e: + logging.debug(f"fd exception: {fd}; e: {e}") + logging.debug(f"Debug fd: {fd}; stat: {stat}") write_task = asyncio.create_task(self.pty.front.write(content_bytes)) else: From e1077964e410acd378df11b0fcbe0d70871ce462 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 17:02:08 +0000 Subject: [PATCH 15/21] Instead of timeout on pty write, don't write if it's closed --- further_link/runner/process_handler.py | 27 +++++++------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 8c440230..dc059b66 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -188,28 +188,15 @@ async def send_input(self, content): content_bytes = content.encode("utf-8") if self.use_pty: - logging.debug( - f"Debug PTY front: {self.pty.front}; closed: {self.pty.front.closed}" - ) - fd = self.pty.front.fileno() - stat = None + # if fstat errors file is already closed and writing will hang try: - stat = os.fstat(fd) - except Exception as e: - logging.debug(f"fd exception: {fd}; e: {e}") - logging.debug(f"Debug fd: {fd}; stat: {stat}") - write_task = asyncio.create_task(self.pty.front.write(content_bytes)) + os.fstat(self.pty.front.fileno()) + await self.pty.front.write(content_bytes) + except OSError: + logging.debug(f"{self.id} Not receiving input as pty closed") else: - - async def write(): - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() - - write_task = asyncio.create_task(write()) - - done = await timeout(write_task, 0.1) - if write_task not in done: - logging.debug(f"{self.id} Receiving input timed out") + self.process.stdin.write(content_bytes) + await self.process.stdin.drain() async def resize_pty(self, rows, cols): if not self.is_running() or not self.use_pty: From 0777d49adfb807462b959e43a5bbb29138c75e3f Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 18:09:55 +0000 Subject: [PATCH 16/21] Revert "Instead of timeout on pty write, don't write if it's closed" This reverts commit e1077964e410acd378df11b0fcbe0d70871ce462. --- further_link/runner/process_handler.py | 27 +++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index dc059b66..8c440230 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -188,15 +188,28 @@ async def send_input(self, content): content_bytes = content.encode("utf-8") if self.use_pty: - # if fstat errors file is already closed and writing will hang + logging.debug( + f"Debug PTY front: {self.pty.front}; closed: {self.pty.front.closed}" + ) + fd = self.pty.front.fileno() + stat = None try: - os.fstat(self.pty.front.fileno()) - await self.pty.front.write(content_bytes) - except OSError: - logging.debug(f"{self.id} Not receiving input as pty closed") + stat = os.fstat(fd) + except Exception as e: + logging.debug(f"fd exception: {fd}; e: {e}") + logging.debug(f"Debug fd: {fd}; stat: {stat}") + write_task = asyncio.create_task(self.pty.front.write(content_bytes)) else: - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() + + async def write(): + self.process.stdin.write(content_bytes) + await self.process.stdin.drain() + + write_task = asyncio.create_task(write()) + + done = await timeout(write_task, 0.1) + if write_task not in done: + logging.debug(f"{self.id} Receiving input timed out") async def resize_pty(self, rows, cols): if not self.is_running() or not self.use_pty: From 7f13c61fba163b965f43dad592f0a6fcb44304c6 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Thu, 3 Nov 2022 18:10:13 +0000 Subject: [PATCH 17/21] Use websocket autoclose and add logs on message --- further_link/endpoint/run.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/further_link/endpoint/run.py b/further_link/endpoint/run.py index 5801c8e1..2b930d5f 100644 --- a/further_link/endpoint/run.py +++ b/further_link/endpoint/run.py @@ -167,7 +167,7 @@ async def run(request): user = query_params.get("user", None) pty = query_params.get("pty", "").lower() in ["1", "true"] - socket = web.WebSocketResponse() + socket = web.WebSocketResponse(autoclose=True) await socket.prepare(request) run_manager = RunManager(socket, user=user, pty=pty) @@ -176,6 +176,9 @@ async def run(request): try: async for message in socket: logging.debug(f"{run_manager.id} Received Message {message.data}") + logging.debug(f"{run_manager.id} message: {message.type} {message}") + logging.debug(f"{run_manager.id} socket closing: {socket._closing}") + logging.debug(f"{run_manager.id} socket closed: {socket._closed}") await run_manager.handle_message(message.data) logging.debug(f"{run_manager.id} Handled Message {message.data}") From a14b1057df07d7def25d9d3a70a41af86fd4aa4f Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Fri, 4 Nov 2022 11:39:23 +0000 Subject: [PATCH 18/21] Add /live health endpoint. Refactor to use singleton of id_generator. --- further_link/__main__.py | 18 ++++++--------- further_link/endpoint/status.py | 31 ++++++++++++++++++++++++++ further_link/runner/process_handler.py | 18 +-------------- further_link/util/id_generator.py | 24 +++++++++++++++++++- 4 files changed, 62 insertions(+), 29 deletions(-) create mode 100644 further_link/endpoint/status.py diff --git a/further_link/__main__.py b/further_link/__main__.py index 196c2358..f0694eb2 100755 --- a/further_link/__main__.py +++ b/further_link/__main__.py @@ -1,7 +1,6 @@ import logging import os import sys -from json import dumps import aiohttp_cors import click @@ -9,10 +8,10 @@ from further_link.endpoint.apt_version import apt_version from further_link.endpoint.run import run as run_handler +from further_link.endpoint.status import live, status, version from further_link.endpoint.upload import upload from further_link.util import vnc from further_link.util.ssl_context import ssl_context -from further_link.version import __version__ logging.basicConfig( stream=sys.stdout, @@ -37,21 +36,18 @@ def create_app(): }, ) - async def status(_): - return web.Response(text="OK") - - async def version(_): - return web.Response(text=dumps({"version": __version__})) - status_resource = cors.add(app.router.add_resource("/status")) cors.add(status_resource.add_route("GET", status)) - status_resource = cors.add(app.router.add_resource("/version/apt/{pkg}")) - cors.add(status_resource.add_route("GET", apt_version)) - status_resource = cors.add(app.router.add_resource("/version")) cors.add(status_resource.add_route("GET", version)) + status_resource = cors.add(app.router.add_resource("/live")) + cors.add(status_resource.add_route("GET", live)) + + status_resource = cors.add(app.router.add_resource("/version/apt/{pkg}")) + cors.add(status_resource.add_route("GET", apt_version)) + status_resource = cors.add(app.router.add_resource("/upload")) cors.add(status_resource.add_route("POST", upload)) diff --git a/further_link/endpoint/status.py b/further_link/endpoint/status.py new file mode 100644 index 00000000..c82efa31 --- /dev/null +++ b/further_link/endpoint/status.py @@ -0,0 +1,31 @@ +import asyncio +from functools import partial +from json import dumps +from time import sleep + +from aiohttp import web + +from ..util.async_helpers import timeout +from ..util.id_generator import id_generator +from ..version import __version__ + + +async def status(_): + return web.Response(text="OK") + + +async def version(_): + return web.Response(text=dumps({"version": __version__})) + + +async def live(_): + if not id_generator.has_ids(): + raise Exception("All ids are in use.") + + sleep_thread = asyncio.to_thread(partial(sleep, 0.1)) + async_thread_task = asyncio.create_task(sleep_thread) + done = await timeout(async_thread_task, 0.2) + if async_thread_task not in done: + raise Exception("No available async thread executors.") + + return web.Response(text="OK") diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 8c440230..0cc6803d 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -8,7 +8,7 @@ from pt_web_vnc.vnc import async_start, async_stop from ..util.async_helpers import ringbuf_read, timeout -from ..util.id_generator import IdGenerator +from ..util.id_generator import id_generator from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup from ..util.pty import Pty from ..util.sdk import get_first_display @@ -35,22 +35,6 @@ class InvalidOperation(Exception): pass -# each run will need a unique id -# one use of the id is for the pt-web-vnc virtual display id and port numbers -# so we must use +ve int < 1000, with 0-99 reserved for other uses -# envvar FURTHER_LINK_MAX_PROCESSES can be used to limit the range -var_max_processes = os.environ.get("FURTHER_LINK_MAX_PROCESSES") -MAX = 900 -if isinstance(var_max_processes, str) and var_max_processes.isdigit(): - max_processes = int(var_max_processes) -else: - max_processes = MAX -if 1 > max_processes > MAX: - max_processes = MAX - -id_generator = IdGenerator(min_value=100, max_value=99 + max_processes) - - class ProcessHandler: def __init__(self, user, pty=False): self.use_pty = pty diff --git a/further_link/util/id_generator.py b/further_link/util/id_generator.py index 922cfc0e..4186f7e5 100644 --- a/further_link/util/id_generator.py +++ b/further_link/util/id_generator.py @@ -1,3 +1,4 @@ +import os from random import randint from typing import List @@ -13,8 +14,11 @@ def __init__(self, max_value: int, min_value: int = 1) -> None: self.MIN_VALUE = min_value self.MAX_VALUE = max_value + def has_ids(self) -> bool: + return len(self.used_ids) < self.MAX_VALUE - self.MIN_VALUE + 1 + def create(self) -> int: - if len(self.used_ids) == self.MAX_VALUE - self.MIN_VALUE + 1: + if not self.has_ids(): raise Exception("All ids are in use.") candidate = randint(self.MIN_VALUE, self.MAX_VALUE) @@ -26,3 +30,21 @@ def create(self) -> int: def free(self, id) -> None: if id in self.used_ids: self.used_ids.remove(id) + + +# each run will need a unique id +# one use of the id is for the pt-web-vnc virtual display id and port numbers +# so we must use +ve int < 1000, with 0-99 reserved for other uses +# envvar FURTHER_LINK_MAX_PROCESSES can be used to limit the range +var_max_processes = os.environ.get("FURTHER_LINK_MAX_PROCESSES") +MAX = 900 +if isinstance(var_max_processes, str) and var_max_processes.isdigit(): + max_processes = int(var_max_processes) +else: + max_processes = MAX +if 1 > max_processes > MAX: + max_processes = MAX + + +# global "singleton" +id_generator = IdGenerator(min_value=100, max_value=99 + max_processes) From 4e734d9335a1b88cf7091fcfc2fc5bf59fcc56ef Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Fri, 4 Nov 2022 11:42:32 +0000 Subject: [PATCH 19/21] Add create_directory logging --- further_link/util/async_files.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/further_link/util/async_files.py b/further_link/util/async_files.py index 91e15f87..ecb11449 100644 --- a/further_link/util/async_files.py +++ b/further_link/util/async_files.py @@ -1,4 +1,5 @@ import asyncio +import logging import os from functools import partial from shutil import rmtree @@ -65,7 +66,9 @@ def _create_directory(directory_path: str, user: str = None): for subpath in subpaths: if not os.path.isdir(subpath): + logging.debug(f"Creating directory {subpath}") os.mkdir(subpath) + logging.debug(f"Created directory {subpath}") # Update directory owner if on user home if user and subpath.startswith(os.path.expanduser(f"~{user}")): From c103dfa98954bc6dc489e8e043e52d50b78b8e4a Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Fri, 4 Nov 2022 13:52:06 +0000 Subject: [PATCH 20/21] Update live check --- further_link/endpoint/status.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/further_link/endpoint/status.py b/further_link/endpoint/status.py index c82efa31..b5a24273 100644 --- a/further_link/endpoint/status.py +++ b/further_link/endpoint/status.py @@ -22,10 +22,15 @@ async def live(_): if not id_generator.has_ids(): raise Exception("All ids are in use.") + async_task = asyncio.create_task(asyncio.sleep(0.1)) + done = await timeout(async_task, 0.2) + if async_task not in done: + raise Exception("Async coroutines blocked.") + sleep_thread = asyncio.to_thread(partial(sleep, 0.1)) async_thread_task = asyncio.create_task(sleep_thread) done = await timeout(async_thread_task, 0.2) if async_thread_task not in done: - raise Exception("No available async thread executors.") + raise Exception("Async threads blocked.") return web.Response(text="OK") From ff617428436e8f6e48f010b3c45d838fc6b2cc96 Mon Sep 17 00:00:00 2001 From: Angus Whitehead Date: Fri, 16 Dec 2022 14:27:23 +0000 Subject: [PATCH 21/21] Update Docker installed sdk to 0.30 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 9241c81a..39cdda81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,7 +34,7 @@ RUN apt-get update && \ # Install pitop SDK # using pip for onnxruntime as there is only armhf debian build -RUN pip3 install pitop==0.26.3.post1 +RUN pip3 install pitop==0.30.0.post1 # Install useful extras from pt-os RUN apt-get update && \