diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dad0eeee..6ddd859f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - debian: [buster, bullseye] + debian: [bullseye] steps: - name: Checkout code uses: actions/checkout@v2.3.4 diff --git a/Dockerfile b/Dockerfile index 59c2f6e7..39cdda81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,6 +26,25 @@ RUN apt-get update && \ apt-get install -y pt-web-vnc && \ apt-get clean +# Install pitop SDK prerequisites +RUN apt-get update && \ + apt-get install -y pkg-config libsystemd0 libsystemd-dev && \ + pip3 install cmake && \ + apt-get clean + +# Install pitop SDK +# using pip for onnxruntime as there is only armhf debian build +RUN pip3 install pitop==0.30.0.post1 + +# Install useful extras from pt-os +RUN apt-get update && \ + # not installable apt-get install -y pt-os-ui-mods && \ + apt-get install -y chromium && \ + apt-get install -y vim && \ + apt-get install -y python3-matplotlib && \ + # no audio DEBIAN_FRONTEND='noninteractive' apt-get install -y sonic-pi python3-sonic && \ + apt-get clean + WORKDIR /further-link COPY pyproject.toml setup.py setup.cfg MANIFEST.in ./ @@ -36,9 +55,11 @@ COPY further_link further_link RUN pip3 install . ENV FURTHER_LINK_NOSSL=true +ENV FURTHER_LINK_MAX_PROCESSES=4 +ENV PITOP_VIRTUAL_HARDWARE=1 ENV PYTHONUNBUFFERED=1 ENV PYTHON_PACKAGE_VERSION=$PYTHON_PACKAGE_VERSION EXPOSE 8028 -EXPOSE 60100-60999 +EXPOSE 61100-61103 CMD [ "further-link" ] diff --git a/docs/README.md b/docs/README.md index b74a1057..a05649f0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -46,7 +46,7 @@ directory with the names `cert.pem` and `key.pem`. ### Working directory The default working directory where files are uploaded and executed from is `~/further`. This can be overridden by setting env var FURTHER_LINK_WORK_DIR. -For more information related to the `run-py` API see the options of the +For more information related to the `run` API see the options of the `upload` and `start` messages in the documentation below. ## API @@ -57,122 +57,6 @@ For more information related to the `run-py` API see the options of the ([example](../tests/test_data/upload_data.py)). Responds after creating files is complete with a simple 200 OK. -### Websocket Endpoint /run-py -Each websocket client connected on `/run-py` can manage a single python process -at a time. - -#### Example usage -- Connect websocket on `/run-py` (using [websocat](https://github.com/vi/websocat)): -``` -websocat ws://localhost:8028/run-py -``` -- Send `start` command with `sourceScript`: -``` -{ "type": "start", "data": { "sourceScript": "print('hi')" } } -``` -- Receive `stdout` response: -``` -{ "type": "stdout", "data": { "output": "hi\n" } } -``` -- Receive `stopped` response: -``` -{ "type": "stopped", "data": { "exitCode": 0 } } -``` - -#### Spec -##### Options -This connection has some options which can be selected with query -parameters: - -``` -/run-py?user=root -``` -The `user` parameter is used to select the Linux user which the code is -executed as. By default the `pi` user is selected if it exists, otherwise -the user executing the server is used. - -``` -/run-py?pty=1 -``` -The pty parameter, if set to 1 or true, will create a pseudoterminal interface -for the python process IO streams, in order to provide terminal behaviours such -as 'cooked mode'. This is useful to produce identical behaviour of programs to -that on the command line and to easily interface with terminal emulators such -as [xterm.js](https://github.com/xtermjs/xterm.js/). - -##### Message Types -Websocket messages sent between client and server must be in JSON with two top -level properties: required string `type` and optional object `data`. - -Message types accepted by the server are: -``` -{ - "type":"[ping|start|stop|stdin|upload|keyevent]", - "data": {...} -} -``` - -Message types sent from the server are: -``` -{ - "type":"[pong|error|started|stopped|stdout|stderr|uploaded|keylisten]", - "data": {...} -} -``` - -Message and response details: -- `ping` command from the client will be met with a `pong` response from - the server. This can be used to keep the socket active to prevent automatic - closures. -- `pong` response is sent by the server immediately after a `ping` is received -
- -- `error` response is sent for bad commands or server errors e.g. `data: { message: "something went wrong and it's not your python code" }` -
- -- `start` command will start a new python process. The code to run can be - specified in data as either a `souceScript` or `sourcePath`. For - `sourceScript` an additional `directoryName` can be passed to specify an - (uploaded) directory to run the script in, within the work dir, otherwise - `/tmp` is used. If `sourcePath` is not absolute, it is assumed to be - relative to the work dir. - e.g. `data: {sourceScript:"print('hi')", directoryName: "myproject"}` - or `data: {sourcePath: "myproject/run.py"}` -- `started` response is sent after a successful process `start`, has no data. -
- -- `stdin` command is used to send data to process stdin e.g. `data: { input: "this can be read by python\n" }`. -- `stdout` response is sent when process prints to stdout. e.g. `data: { output: "this was printed by python" }` -- `stderr` response is sent when process prints to stderr e.g. `data: { output: "Traceback bleh bleh" }` -
- -- `stop` command is used to stop a running process early, has no data. -- `stopped` response is sent when a process finished and has the exit code in e.g. `data: { exitCode: 0 }` -
- -- `upload` command is used to create a directory of files in the work dir. - Files are provided in the data as `text` type, with the text content, or - `url` type, with a url for the file to be downloaded from. - [Example](..tests/test_data/upload_data.py) -- `uploaded` response is sent after a successful upload , has no data. -
- -- `video` response is sent by the server with data.output containing a base64 - encoded mjpeg frame for the client to render as a video feed. -
- -- `keylisten` message is sent by the server to indicate it would like to - receive keyboard events from the client for a specific key. This would be - initiated by user code using the further_link.KeyboardButton python module. - Keys are specified as web [KeyboardEvent.key](https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key) - strings e.g. `data: { key: "ArrowUp" }` -- `keyevent` message is sent by the client to provide keyboard events to the - server so that they can be forwarded to user code using - further_link.KeyboardButton. The data includes a key string matching those - used in `keylisten` and an event string which is either "keydown" or - "keyup" e.g. `data: { key: "ArrowUp", event: "keydown" }` -
- ### Websocket Endpoint /run Each websocket client connected on `/run` can manage multuiple processes of different types, addressing them by a unique id. diff --git a/further_link/__main__.py b/further_link/__main__.py index 6d19591f..f0694eb2 100755 --- a/further_link/__main__.py +++ b/further_link/__main__.py @@ -1,7 +1,6 @@ import logging import os import sys -from json import dumps import aiohttp_cors import click @@ -9,11 +8,10 @@ from further_link.endpoint.apt_version import apt_version from further_link.endpoint.run import run as run_handler -from further_link.endpoint.run_py import run_py +from further_link.endpoint.status import live, status, version from further_link.endpoint.upload import upload from further_link.util import vnc from further_link.util.ssl_context import ssl_context -from further_link.version import __version__ logging.basicConfig( stream=sys.stdout, @@ -38,27 +36,21 @@ def create_app(): }, ) - async def status(_): - return web.Response(text="OK") - - async def version(_): - return web.Response(text=dumps({"version": __version__})) - status_resource = cors.add(app.router.add_resource("/status")) cors.add(status_resource.add_route("GET", status)) - status_resource = cors.add(app.router.add_resource("/version/apt/{pkg}")) - cors.add(status_resource.add_route("GET", apt_version)) - status_resource = cors.add(app.router.add_resource("/version")) cors.add(status_resource.add_route("GET", version)) + status_resource = cors.add(app.router.add_resource("/live")) + cors.add(status_resource.add_route("GET", live)) + + status_resource = cors.add(app.router.add_resource("/version/apt/{pkg}")) + cors.add(status_resource.add_route("GET", apt_version)) + status_resource = cors.add(app.router.add_resource("/upload")) cors.add(status_resource.add_route("POST", upload)) - exec_resource = cors.add(app.router.add_resource("/run-py")) - cors.add(exec_resource.add_route("GET", run_py)) - exec_resource = cors.add(app.router.add_resource("/run")) cors.add(exec_resource.add_route("GET", run_handler)) diff --git a/further_link/endpoint/run.py b/further_link/endpoint/run.py index 27c5b373..2b930d5f 100644 --- a/further_link/endpoint/run.py +++ b/further_link/endpoint/run.py @@ -27,18 +27,19 @@ def __init__(self, socket, user=None, pty=False): } async def stop(self): + logging.debug(f"{self.id} Stopping processes: {self.process_handlers}") # the dictionary will be mutated so use list to make a copy for p in list(self.process_handlers.values()): try: await p.stop() except InvalidOperation: - pass + logging.debug(f"{self.id} Invalid operation stopping process: {p}") async def send(self, type, data=None, process_id=None): try: await self.socket.send_str(create_message(type, data, process_id)) except ConnectionResetError: - pass # already disconnected + logging.debug(f"{self.id} Send failed - client disconnected") async def handle_message(self, message): try: @@ -46,6 +47,10 @@ async def handle_message(self, message): process_handler = self.process_handlers.get(m_process) + logging.debug( + f"{self.id} Handling {m_type} message for handler {process_handler}" + ) + if m_type == "ping": await self.send("pong") @@ -99,7 +104,7 @@ async def handle_message(self, message): await process_handler.send_key_event(m_data["key"], m_data["event"]) else: - raise BadMessage() + raise BadMessage(f"Bad {m_type} message for handler {process_handler}") except (BadMessage, InvalidOperation): logging.exception(f"{self.id} Bad Message") @@ -117,7 +122,7 @@ async def add_handler(self, process_id, runner, path, code, novncOptions): async def on_start(): await self.send("started", None, process_id) - logging.info(f"{self.id} Started {process_id}") + logging.info(f"{self.id} Sent started {process_id}") async def on_stop(exit_code): # process_id may be reused with other runners so clean up handler @@ -141,11 +146,18 @@ async def on_display_activity(connection_details: VncConnectionDetails): ) handler = handler_class(self.user, self.pty) + logging.debug( + f"{self.id} Created {runner} handler for {process_id}, ID {handler.id}" + ) + handler.on_start = on_start handler.on_stop = on_stop handler.on_display_activity = on_display_activity handler.on_output = on_output + + logging.debug(f"{self.id} Starting handler {handler.id}") await handler.start(path, code, novncOptions=novncOptions) + logging.debug(f"{self.id} Started handler {handler.id}") self.process_handlers[process_id] = handler @@ -155,7 +167,7 @@ async def run(request): user = query_params.get("user", None) pty = query_params.get("pty", "").lower() in ["1", "true"] - socket = web.WebSocketResponse() + socket = web.WebSocketResponse(autoclose=True) await socket.prepare(request) run_manager = RunManager(socket, user=user, pty=pty) @@ -164,12 +176,20 @@ async def run(request): try: async for message in socket: logging.debug(f"{run_manager.id} Received Message {message.data}") + logging.debug(f"{run_manager.id} message: {message.type} {message}") + logging.debug(f"{run_manager.id} socket closing: {socket._closing}") + logging.debug(f"{run_manager.id} socket closed: {socket._closed}") await run_manager.handle_message(message.data) + logging.debug(f"{run_manager.id} Handled Message {message.data}") except asyncio.CancelledError: - pass + logging.debug(f"{run_manager.id} Run cancelled error") + + except Exception as e: + logging.exception(f"{run_manager.id} Run error: {e}") finally: + logging.debug(f"{run_manager.id} Closing connection") await socket.close() logging.info(f"{run_manager.id} Closed connection") await run_manager.stop() diff --git a/further_link/endpoint/run_py.py b/further_link/endpoint/run_py.py deleted file mode 100644 index bbd697f3..00000000 --- a/further_link/endpoint/run_py.py +++ /dev/null @@ -1,137 +0,0 @@ -import asyncio -import logging - -from aiohttp import web - -from ..runner.run_py_process_handler import InvalidOperation, RunPyProcessHandler -from ..util.message import BadMessage, create_message, parse_message -from ..util.upload import BadUpload, directory_is_valid, do_upload - - -async def handle_message(message, process_handler, socket): - m_type, m_data, m_process = parse_message(message) - - if ( - m_type == "start" - and "sourceScript" in m_data - and isinstance(m_data.get("sourceScript"), str) - ): - path = ( - m_data.get("directoryName") - if ( - "directoryName" in m_data - and isinstance(m_data.get("directoryName"), str) - and len(m_data.get("directoryName")) > 0 - ) - else None - ) - await process_handler.start( - script=m_data["sourceScript"], - path=path, - ) - - elif ( - m_type == "start" - and "sourcePath" in m_data - and isinstance(m_data.get("sourcePath"), str) - and len(m_data.get("sourcePath")) > 0 - ): - await process_handler.start(path=m_data["sourcePath"]) - - elif ( - m_type == "upload" - and "directory" in m_data - and directory_is_valid(m_data.get("directory")) - ): - await do_upload( - m_data.get("directory"), process_handler.work_dir, m_data.get("user") - ) - await socket.send_str(create_message("uploaded")) - - elif ( - m_type == "stdin" and "input" in m_data and isinstance(m_data.get("input"), str) - ): - await process_handler.send_input(m_data["input"]) - - elif m_type == "ping": - await socket.send_str(create_message("pong")) - - elif m_type == "stop": - process_handler.stop() - - elif ( - m_type == "keyevent" - and "key" in m_data - and isinstance(m_data.get("key"), str) - and "event" in m_data - and isinstance(m_data.get("event"), str) - ): - await process_handler.send_key_event(m_data["key"], m_data["event"]) - - else: - raise BadMessage() - - -async def run_py(request): - query_params = request.query - user = query_params.get("user", None) - pty = query_params.get("pty", "").lower() in ["1", "true"] - - socket = web.WebSocketResponse() - await socket.prepare(request) - - async def on_start(): - await socket.send_str(create_message("started")) - logging.info(f"{process_handler.id} Started") - - async def on_stop(exit_code): - try: - await socket.send_str(create_message("stopped", {"exitCode": exit_code})) - except ConnectionResetError: - pass # already disconnected - logging.info(f"{process_handler.id} Stopped") - - async def on_output(channel, output): - logging.debug(f"{process_handler.id} Sending Output {channel} {output}") - await socket.send_str(create_message(channel, {"output": output})) - - process_handler = RunPyProcessHandler(user=user, pty=pty) - process_handler.on_start = on_start - process_handler.on_stop = on_stop - process_handler.on_output = on_output - logging.info(f"{process_handler.id} New connection") - - try: - async for message in socket: - logging.debug(f"{process_handler.id} Received Message {message.data}") - try: - await handle_message(message.data, process_handler, socket) - - except BadUpload: - logging.exception(f"{process_handler.id} Bad Upload") - await socket.send_str( - create_message("error", {"message": "Bad upload"}) - ) - - except (BadMessage, InvalidOperation): - logging.exception(f"{process_handler.id} Bad Message") - await socket.send_str( - create_message("error", {"message": "Bad message"}) - ) - - except Exception: - logging.exception(f"{process_handler.id} Message Exception") - await socket.send_str( - create_message("error", {"message": "Message Exception"}) - ) - - except asyncio.CancelledError: - pass - finally: - await socket.close() - - logging.info(f"{process_handler.id} Closed connection") - if process_handler.is_running(): - process_handler.stop() - - return socket diff --git a/further_link/endpoint/status.py b/further_link/endpoint/status.py new file mode 100644 index 00000000..b5a24273 --- /dev/null +++ b/further_link/endpoint/status.py @@ -0,0 +1,36 @@ +import asyncio +from functools import partial +from json import dumps +from time import sleep + +from aiohttp import web + +from ..util.async_helpers import timeout +from ..util.id_generator import id_generator +from ..version import __version__ + + +async def status(_): + return web.Response(text="OK") + + +async def version(_): + return web.Response(text=dumps({"version": __version__})) + + +async def live(_): + if not id_generator.has_ids(): + raise Exception("All ids are in use.") + + async_task = asyncio.create_task(asyncio.sleep(0.1)) + done = await timeout(async_task, 0.2) + if async_task not in done: + raise Exception("Async coroutines blocked.") + + sleep_thread = asyncio.to_thread(partial(sleep, 0.1)) + async_thread_task = asyncio.create_task(sleep_thread) + done = await timeout(async_thread_task, 0.2) + if async_thread_task not in done: + raise Exception("Async threads blocked.") + + return web.Response(text="OK") diff --git a/further_link/endpoint/upload.py b/further_link/endpoint/upload.py index 5c187506..9a9a98b2 100644 --- a/further_link/endpoint/upload.py +++ b/further_link/endpoint/upload.py @@ -1,4 +1,5 @@ import json +import logging from aiohttp import web @@ -23,7 +24,8 @@ async def upload(request): except (web.HTTPBadRequest, json.decoder.JSONDecodeError): raise web.HTTPBadRequest() - except BadUpload: + except BadUpload as e: + logging.exception(f"Upload error: {e}") raise web.HTTPInternalServerError() return web.Response(text="OK") diff --git a/further_link/runner/exec_process_handler.py b/further_link/runner/exec_process_handler.py index b1e31c91..7dae0066 100644 --- a/further_link/runner/exec_process_handler.py +++ b/further_link/runner/exec_process_handler.py @@ -2,9 +2,7 @@ import os import pathlib -import aiofiles - -from ..util.upload import create_directory +from ..util.async_files import chmod, chown, create_directory, write_file from ..util.user_config import ( get_absolute_path, get_gid, @@ -22,18 +20,19 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist - create_directory(os.path.dirname(path), self.user) + logging.debug(f"{self.id} Creating working directory for exec") + await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"exec-{self.id}") # create a temporary file to execute if code is provided if code is not None: - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(code) + logging.debug(f"{self.id} Creating entrypoint for exec") + await write_file(entrypoint, code) self._remove_entrypoint = entrypoint - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) - os.chmod(entrypoint, 0o777) # make it executable + await chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) + await chmod(entrypoint, 0o777) # make it executable command = entrypoint diff --git a/further_link/runner/process_handler.py b/further_link/runner/process_handler.py index 0ab3e6d3..0cc6803d 100644 --- a/further_link/runner/process_handler.py +++ b/further_link/runner/process_handler.py @@ -3,17 +3,15 @@ import os import signal from functools import partial -from pty import openpty from shlex import split -import aiofiles from pt_web_vnc.vnc import async_start, async_stop from ..util.async_helpers import ringbuf_read, timeout -from ..util.id_generator import IdGenerator +from ..util.id_generator import id_generator from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup +from ..util.pty import Pty from ..util.sdk import get_first_display -from ..util.terminal import set_winsize from ..util.user_config import ( get_current_user, get_gid, @@ -37,15 +35,9 @@ class InvalidOperation(Exception): pass -# each run will need a unique id -# one use of the id is for the pt-web-vnc virtual display id and port numbers -# so we must use +ve int < 1000, with 0-99 reserved for other uses -id_generator = IdGenerator(min_value=100, max_value=999) - - class ProcessHandler: def __init__(self, user, pty=False): - self.pty = pty + self.use_pty = pty self.id = id_generator.create() assert user_exists(user) self.user = user @@ -64,21 +56,17 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): stdio = asyncio.subprocess.PIPE - if self.pty: + if self.use_pty: + logging.debug(f"{self.id} Starting PTY") # communicate through a pty for terminal 'cooked mode' behaviour - master, slave = openpty() - - # on some distros process user must own slave, otherwise you get: - # cannot set terminal process group (-1): Inappropriate ioctl for device - os.chown(slave, get_uid(self.user), get_gid(self.user)) - - self.pty_master = await aiofiles.open(master, "w+b", 0) - self.pty_slave = await aiofiles.open(slave, "r+b", 0) + self.pty = await Pty.create(self.user) # set terminal size to a minimum that we display in Further - set_winsize(slave, 4, 60) + self.pty.set_winsize(4, 60) + + stdio = self.pty.back - stdio = self.pty_slave + logging.debug(f"{self.id} Setup pty") process_env = {**os.environ.copy(), **env} process_env["TERM"] = "xterm-256color" # perhaps should be param @@ -95,6 +83,7 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): # set $DISPLAY so that user can open GUI windows if self.novnc: + logging.debug(f"{self.id} Starting NOVNC") process_env["DISPLAY"] = f":{self.id}" await async_start( display_id=self.id, @@ -104,6 +93,7 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}): height=novncOptions.get("height"), width=novncOptions.get("width"), ) + logging.debug(f"{self.id} Setup novnc") else: default_display = get_first_display() if default_display: @@ -126,6 +116,7 @@ def preexec(): # as allowing a shell process to be a 'controlling terminal' os.setsid() + logging.debug(f"{self.id} Starting process") self.process = await asyncio.create_subprocess_exec( *split(command), stdin=stdio, @@ -135,6 +126,7 @@ def preexec(): cwd=self.work_dir, preexec_fn=preexec, ) + logging.debug(f"{self.id} Started process") if self.on_start: await self.on_start() @@ -144,9 +136,12 @@ def preexec(): self.pgid = os.getpgid(self.process.pid) # retain for cleanup asyncio.create_task(self._ipc_communicate()) # after as uses pgid except ProcessLookupError: + logging.debug(f"{self.id} Process lookup failed - running without IPC") # the process is done faster than we can look up gpid! self.pgid = None + logging.debug(f"{self.id} Start complete") + def is_running(self): return hasattr(self, "process") and self.process is not None @@ -155,33 +150,56 @@ async def stop(self): raise InvalidOperation() # send signal to process group in case we have child processes try: + logging.debug(f"{self.id} Stopping with SIGTERM") os.killpg(self.pgid, signal.SIGTERM) stopped = asyncio.create_task(self.process.wait()) done = await timeout(stopped, 0.1) # if SIGTERM didn't stop the process already, send SIGKILL if stopped not in done: + logging.debug(f"{self.id} Stopping with SIGKILL") os.killpg(self.pgid, signal.SIGKILL) + + logging.debug(f"{self.id} Stop complete") except ProcessLookupError: - pass + logging.debug(f"{self.id} Could not stop - ProcessLookupError") async def send_input(self, content): if not self.is_running() or not isinstance(content, str): + logging.debug(f"{self.id} Not receiving input as not running") raise InvalidOperation() content_bytes = content.encode("utf-8") - if self.pty: - await self.pty_master.write(content_bytes) + if self.use_pty: + logging.debug( + f"Debug PTY front: {self.pty.front}; closed: {self.pty.front.closed}" + ) + fd = self.pty.front.fileno() + stat = None + try: + stat = os.fstat(fd) + except Exception as e: + logging.debug(f"fd exception: {fd}; e: {e}") + logging.debug(f"Debug fd: {fd}; stat: {stat}") + write_task = asyncio.create_task(self.pty.front.write(content_bytes)) else: - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() + + async def write(): + self.process.stdin.write(content_bytes) + await self.process.stdin.drain() + + write_task = asyncio.create_task(write()) + + done = await timeout(write_task, 0.1) + if write_task not in done: + logging.debug(f"{self.id} Receiving input timed out") async def resize_pty(self, rows, cols): - if not self.is_running() or not self.pty: + if not self.is_running() or not self.use_pty: raise InvalidOperation() - set_winsize(self.pty_slave.fileno(), rows, cols) + self.pty.set_winsize(rows, cols) async def send_key_event(self, key, event): if ( @@ -207,9 +225,9 @@ async def _ipc_communicate(self): async def _process_communicate(self): output_tasks = [] - if self.pty: + if self.use_pty: output_tasks.append( - asyncio.create_task(self._handle_output(self.pty_master, "stdout")) + asyncio.create_task(self._handle_output(self.pty.front, "stdout")) ) else: output_tasks.append( @@ -219,9 +237,13 @@ async def _process_communicate(self): asyncio.create_task(self._handle_output(self.process.stderr, "stderr")) ) + logging.debug(f"{self.id} Monitoring output") + # wait for process to exit await self.process.wait() + logging.debug(f"{self.id} Process exited") + # wait a little for the io tasks to complete to let them send # output produced right before the process stopped # but cancel them after a timeout if they don't stop themselves @@ -233,8 +255,9 @@ async def _process_communicate(self): async def _handle_process_end(self): exit_code = await self.process.wait() - await self._clean_up() - self.process = None + + if self.process: + await self._clean_up() if self.on_stop: await self.on_stop(exit_code) @@ -250,22 +273,21 @@ async def _handle_output(self, stream, channel): ) async def _clean_up(self): + logging.debug(f"{self.id} Starting cleanup") if getattr(self, "pty", None): - try: - if getattr(self, "pty_master", None): - await self.pty_master.close() - if getattr(self, "pty_slave", None): - await self.pty_slave.close() - except Exception as e: - logging.exception(f"{self.id} PTY Cleanup error: {e}") + logging.debug(f"{self.id} Cleaning up PTY") + await self.pty.clean_up() + self.pty = None if getattr(self, "novnc", None): + logging.debug(f"{self.id} Cleaning up NOVNC") try: await async_stop(self.id) except Exception as e: logging.exception(f"{self.id} NOVNC Cleanup error: {e}") if getattr(self, "ipc_tasks", None): + logging.debug(f"{self.id} Cleaning up IPC") try: for channel in SERVER_IPC_CHANNELS: ipc_cleanup(channel, pgid=self.pgid) @@ -275,5 +297,8 @@ async def _clean_up(self): except Exception as e: logging.exception(f"{self.id} IPC Cleanup error: {e}") + logging.debug(f"{self.id} Releasing ID") id_generator.free(self.id) + + self.process = None logging.debug(f"{self.id} Cleanup complete") diff --git a/further_link/runner/py_process_handler.py b/further_link/runner/py_process_handler.py index 92cf305c..19cd1c22 100644 --- a/further_link/runner/py_process_handler.py +++ b/further_link/runner/py_process_handler.py @@ -2,9 +2,7 @@ import os import pathlib -import aiofiles - -from ..util.upload import create_directory +from ..util.async_files import chown, create_directory, write_file from ..util.user_config import ( get_absolute_path, get_gid, @@ -21,19 +19,20 @@ async def _start(self, path, code=None, novncOptions={}): path = get_absolute_path(path, get_working_directory(self.user)) # create path directories if they don't already exist - create_directory(os.path.dirname(path), self.user) + logging.debug(f"{self.id} Creating working directory for python3") + await create_directory(os.path.dirname(path), self.user) entrypoint = path if code is None else os.path.join(path, f"{self.id}.py") # create a temporary file to execute if code is provided if code is not None: - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(code) + logging.debug(f"{self.id} Creating entrypoint for python3") + await write_file(entrypoint, code) self._remove_entrypoint = entrypoint command = "python3 -u " + entrypoint - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) + await chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) work_dir = os.path.dirname(entrypoint) diff --git a/further_link/runner/run_py_process_handler.py b/further_link/runner/run_py_process_handler.py deleted file mode 100644 index 24e9a125..00000000 --- a/further_link/runner/run_py_process_handler.py +++ /dev/null @@ -1,237 +0,0 @@ -import asyncio -import os -import pathlib -import pty -import signal -from functools import partial - -import aiofiles - -from ..util.async_helpers import ringbuf_read -from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup -from ..util.sdk import get_first_display -from ..util.upload import create_directory -from ..util.user_config import ( - default_user, - get_current_user, - get_gid, - get_temp_dir, - get_uid, - get_working_directory, - user_exists, -) - -SERVER_IPC_CHANNELS = [ - "video", - "keylisten", -] - -dirname = pathlib.Path(__file__).parent.absolute() - - -class InvalidOperation(Exception): - pass - - -class RunPyProcessHandler: - def __init__(self, user=None, pty=False): - self.user = default_user() if user is None else user - self.pty = pty - self.work_dir = get_working_directory(user) - self.temp_dir = get_temp_dir() - - self.id = str(id(self)) - self.process = None - self.pgid = None - - def __del__(self): - if self.is_running(): - self.stop() - - async def start(self, script=None, path=None): - if self.is_running(): - raise InvalidOperation() - - entrypoint = await self._get_entrypoint(script, path) - self._remove_entrypoint = entrypoint if script is not None else None - - stdio = asyncio.subprocess.PIPE - - if self.pty: - # communicate through a pty for terminal 'cooked mode' behaviour - master, slave = pty.openpty() - self.pty_master = await aiofiles.open(master, "w+b", 0) - self.pty_slave = await aiofiles.open(slave, "r+b", 0) - - stdio = self.pty_slave - - cmd = "python3 -u " + entrypoint - if self.user != get_current_user() and user_exists(self.user): - cmd = f"sudo -u {self.user} {cmd}" - - process_env = os.environ.copy() - - # Ensure that DISPLAY is set, so that user can open GUI windows - display = get_first_display() - if display is not None: - process_env["DISPLAY"] = display - - self.process = await asyncio.create_subprocess_exec( - *cmd.split(), - stdin=stdio, - stdout=stdio, - stderr=stdio, - env=process_env, - cwd=os.path.dirname(entrypoint), - preexec_fn=os.setsid, - ) # make a process group for this and children - - self.pgid = os.getpgid(self.process.pid) # retain for cleanup - - asyncio.create_task(self._ipc_communicate()) # after exec as uses pgid - asyncio.create_task(self._process_communicate()) - - if self.on_start: - await self.on_start() - - def is_running(self): - return hasattr(self, "process") and self.process is not None - - def stop(self): - if not self.is_running(): - raise InvalidOperation() - # send TERM to process group in case we have child processes - try: - os.killpg(self.pgid, signal.SIGTERM) - except ProcessLookupError: - pass - - async def send_input(self, content): - if not self.is_running() or not isinstance(content, str): - raise InvalidOperation() - - content_bytes = content.encode("utf-8") - - if self.pty: - await self.pty_master.write(content_bytes) - else: - self.process.stdin.write(content_bytes) - await self.process.stdin.drain() - - async def send_key_event(self, key, event): - if ( - not self.is_running() - or not isinstance(key, str) - or not isinstance(event, str) - ): - raise InvalidOperation() - - content_bytes = f"{key} {event}".encode("utf-8") - await async_ipc_send("keyevent", content_bytes, pgid=self.pgid) - - async def _get_entrypoint(self, script=None, path=None): - if isinstance(path, str): - # path is absolute or relative to work_dir - first_char = path[0] - if first_char != "/": - path = os.path.join(self.work_dir, path) - - path_dirs = path if isinstance(script, str) else os.path.dirname(path) - - # if there's a script to create, create path dirs for it to go in - if not os.path.exists(path_dirs) and isinstance(script, str): - create_directory(path_dirs, self.user) - - if isinstance(script, str): - # write script to file, at path if given, otherwise temp - entrypoint = self._get_script_filename(path) - async with aiofiles.open(entrypoint, "w+") as file: - await file.write(script) - - os.chown(entrypoint, uid=get_uid(self.user), gid=get_gid(self.user)) - - return entrypoint - - if path is not None: - return path - - raise InvalidOperation() - - def _get_script_filename(self, path=None): - dir = path if isinstance(path, str) else self.temp_dir - return dir + "/" + self.id + ".py" - - async def _ipc_communicate(self): - self.ipc_tasks = [] - for channel in SERVER_IPC_CHANNELS: - self.ipc_tasks.append( - asyncio.create_task( - async_start_ipc_server( - channel, partial(self.on_output, channel), pgid=self.pgid - ) - ) - ) - - async def _process_communicate(self): - output_tasks = [] - if self.pty: - output_tasks.append( - asyncio.create_task(self._handle_output(self.pty_master, "stdout")) - ) - else: - output_tasks.append( - asyncio.create_task(self._handle_output(self.process.stdout, "stdout")) - ) - output_tasks.append( - asyncio.create_task(self._handle_output(self.process.stderr, "stderr")) - ) - - # wait for process to exit - exit_code = await self.process.wait() - - # stop ongoing io tasks - await asyncio.sleep(0.1) - for task in output_tasks: - task.cancel() - await asyncio.wait(output_tasks) - - for task in self.ipc_tasks: - task.cancel() - await asyncio.wait(self.ipc_tasks) - - await self._clean_up() - self.process = None - - if self.on_stop: - await self.on_stop(exit_code) - - async def _handle_output(self, stream, channel): - await ringbuf_read( - stream, - output_callback=partial(self.on_output, channel), - buffer_time=0.1, - max_chunks=50, - chunk_size=256, - done_condition=self.process.wait, - ) - - async def _clean_up(self): - # aiofiles.os.remove not released to debian buster - # os.remove should not block significantly, just fires a single syscall - try: - if self._remove_entrypoint is not None: - os.remove(self._remove_entrypoint) - except Exception: - pass - - try: - if self.pty: - self.pty_master.close() - self.pty_slave.close() - os.remove(self.pty_master) - os.remove(self.pty_slave) - except Exception: - pass - - for channel in SERVER_IPC_CHANNELS: - ipc_cleanup(channel, pgid=self.pgid) diff --git a/further_link/runner/shell_process_handler.py b/further_link/runner/shell_process_handler.py index 6b6a132c..497c525f 100644 --- a/further_link/runner/shell_process_handler.py +++ b/further_link/runner/shell_process_handler.py @@ -1,3 +1,5 @@ +import logging + from ..util.upload import create_directory from ..util.user_config import get_absolute_path, get_shell, get_working_directory from .process_handler import ProcessHandler @@ -8,6 +10,7 @@ async def _start(self, path, code=None, novncOptions={}): work_dir = get_absolute_path(path, get_working_directory(self.user)) # create work dir if it doesn't already exist - create_directory(work_dir, self.user) + logging.debug(f"{self.id} Creating working directory for shell") + await create_directory(work_dir, self.user) await super()._start(get_shell(self.user), work_dir, novncOptions=novncOptions) diff --git a/further_link/util/async_files.py b/further_link/util/async_files.py new file mode 100644 index 00000000..ecb11449 --- /dev/null +++ b/further_link/util/async_files.py @@ -0,0 +1,99 @@ +import asyncio +import logging +import os +from functools import partial +from shutil import rmtree + +from aiohttp import ClientSession + +from .user_config import default_user, get_gid, get_uid + +# async file helpers. used instead of aiofiles mainly avoid +# packaging the new more complete versions for debian. aiofiles can still be used if preferred + +# some sync methods are acceptable, such as os.path methods that only do string +# manipulation or os.remove which blocks no more than starting a thread would + + +async def close(fd): + return await asyncio.to_thread(partial(os.close, fd)) + + +async def exists(path): + return await asyncio.to_thread(partial(os.path.exists, path)) + + +async def chmod(fd, mode): + await asyncio.to_thread(partial(os.chmod, fd, mode)) + + +async def chown(fd, uid, gid): + await asyncio.to_thread(partial(os.chown, fd, uid, gid)) + + +async def symlink(src, dst): + await asyncio.to_thread(partial(os.symlink, src, dst)) + + +async def write_file(path, content, mode="w+"): + def sync_write_file(p, c): + with open(p, mode) as file: + file.write(c) + + await asyncio.to_thread(partial(sync_write_file, path, content)) + + +async def download_file(url, file_path): + async with ClientSession() as session: + async with session.get(url) as response: + assert response.status == 200 + + await write_file(file_path, await response.read(), "wb") + + +async def create_directory(directory_path: str, user: str = None): + def _create_directory(directory_path: str, user: str = None): + """ + Create the directories from the provided path level by level, making sure the folders have the correct owner + """ + if user is None: + user = default_user() + + splitted_path = directory_path.split("/") + subpaths = [ + "/".join(splitted_path[:i]) for i in range(2, len(splitted_path) + 1) + ] + + for subpath in subpaths: + if not os.path.isdir(subpath): + logging.debug(f"Creating directory {subpath}") + os.mkdir(subpath) + logging.debug(f"Created directory {subpath}") + + # Update directory owner if on user home + if user and subpath.startswith(os.path.expanduser(f"~{user}")): + os.chown(subpath, uid=get_uid(user), gid=get_gid(user)) + + await asyncio.to_thread(partial(_create_directory, directory_path, user)) + + +async def clear_directory(directory_path: str): + def _clear_directory(directory_path: str): + # clear the upload directory every time + for filename in os.listdir(directory_path): + file_path = os.path.join(directory_path, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + rmtree(file_path) + except Exception: + pass + + await asyncio.to_thread(partial(_clear_directory, directory_path)) + + +async def create_empty_directory(directory_path: str, user: str = None): + await create_directory(directory_path, user) + # in case the directory already existed, ensure it is empty + await clear_directory(directory_path) diff --git a/further_link/util/id_generator.py b/further_link/util/id_generator.py index 922cfc0e..4186f7e5 100644 --- a/further_link/util/id_generator.py +++ b/further_link/util/id_generator.py @@ -1,3 +1,4 @@ +import os from random import randint from typing import List @@ -13,8 +14,11 @@ def __init__(self, max_value: int, min_value: int = 1) -> None: self.MIN_VALUE = min_value self.MAX_VALUE = max_value + def has_ids(self) -> bool: + return len(self.used_ids) < self.MAX_VALUE - self.MIN_VALUE + 1 + def create(self) -> int: - if len(self.used_ids) == self.MAX_VALUE - self.MIN_VALUE + 1: + if not self.has_ids(): raise Exception("All ids are in use.") candidate = randint(self.MIN_VALUE, self.MAX_VALUE) @@ -26,3 +30,21 @@ def create(self) -> int: def free(self, id) -> None: if id in self.used_ids: self.used_ids.remove(id) + + +# each run will need a unique id +# one use of the id is for the pt-web-vnc virtual display id and port numbers +# so we must use +ve int < 1000, with 0-99 reserved for other uses +# envvar FURTHER_LINK_MAX_PROCESSES can be used to limit the range +var_max_processes = os.environ.get("FURTHER_LINK_MAX_PROCESSES") +MAX = 900 +if isinstance(var_max_processes, str) and var_max_processes.isdigit(): + max_processes = int(var_max_processes) +else: + max_processes = MAX +if 1 > max_processes > MAX: + max_processes = MAX + + +# global "singleton" +id_generator = IdGenerator(min_value=100, max_value=99 + max_processes) diff --git a/further_link/util/pty.py b/further_link/util/pty.py new file mode 100644 index 00000000..5ace1d9d --- /dev/null +++ b/further_link/util/pty.py @@ -0,0 +1,65 @@ +import asyncio +import logging +from pty import openpty + +import aiofiles + +from ..util.async_files import chown, close +from ..util.async_helpers import timeout +from ..util.terminal import set_winsize +from ..util.user_config import get_gid, get_uid + + +class Pty: + @classmethod + async def create(cls, user): + self = cls() + self.front_fd, self.back_fd = openpty() + + # on some distros process user must own 'back', otherwise you get: + # cannot set terminal process group (-1): Inappropriate ioctl for device + await chown(self.back_fd, get_uid(user), get_gid(user)) + + self.front = await aiofiles.open(self.front_fd, "w+b", 0) + self.back = await aiofiles.open(self.back_fd, "r+b", 0) + + return self + + def set_winsize(self, rows, cols): + set_winsize(self.back_fd, 4, 60) + + async def write(self, content_bytes): + await self.front.write(content_bytes) + + async def clean_up(self): + logging.debug("PTY Closing master") + await self._clean_up_end(self.front, self.front_fd) + logging.debug("PTY Closing slave") + await self._clean_up_end(self.back, self.back_fd) + logging.debug("PTY Cleanup complete") + + async def _clean_up_end(self, file, fd): + # closing sometimes hangs so use a timeout and try closing fd also + async def close_file(): + try: + logging.debug("PTY Closing file") + await file.close() + except Exception as e: + logging.debug(f"PTY Close file error: {e}") + + close_file_task = asyncio.create_task(close_file()) + done = await timeout(close_file_task, 0.1) + if close_file_task not in done: + logging.debug("PTY Close file timed out") + + async def close_fd(): + try: + logging.debug("PTY Closing fd") + await close(fd) + except Exception as e: + logging.debug(f"PTY Close fd error: {e}") + + close_fd_task = asyncio.create_task(close_fd()) + done = await timeout(close_fd_task, 0.1) + if close_fd_task not in done: + logging.debug("PTY Close fd timed out") diff --git a/further_link/util/upload.py b/further_link/util/upload.py index 0bd362d5..03051db9 100644 --- a/further_link/util/upload.py +++ b/further_link/util/upload.py @@ -1,11 +1,15 @@ import os -from shutil import rmtree -import aiofiles -from aiohttp import ClientSession - -from ..util.user_config import default_user, get_gid, get_uid -from .user_config import CACHE_DIR_NAME +from .async_files import ( + chown, + create_directory, + create_empty_directory, + download_file, + exists, + symlink, + write_file, +) +from .user_config import CACHE_DIR_NAME, default_user, get_gid, get_uid file_types = ["url", "text"] @@ -33,25 +37,6 @@ def directory_is_valid(directory): ) -def create_directory(directory_path: str, user: str = None): - """ - Create the directories from the provided path level by level, making sure the folders have the correct owner - """ - if user is None: - user = default_user() - - splitted_path = directory_path.split("/") - subpaths = ["/".join(splitted_path[:i]) for i in range(2, len(splitted_path) + 1)] - - for subpath in subpaths: - if not os.path.isdir(subpath): - os.mkdir(subpath) - - # Update directory owner if on user home - if user and subpath.startswith(os.path.expanduser(f"~{user}")): - os.chown(subpath, uid=get_uid(user), gid=get_gid(user)) - - def valid_url_content(content): return ( "url" in content @@ -67,13 +52,14 @@ def valid_text_content(content): return "text" in content and isinstance(content["text"], str) -async def download_file(url, file_path): - async with ClientSession() as session: - async with session.get(url) as response: - assert response.status == 200 +def is_sub_directory(sub_dir, from_dir): + return os.path.realpath(sub_dir).startswith(os.path.realpath(from_dir)) + - async with aiofiles.open(file_path, "wb") as file: - await file.write(await response.read()) +async def create_alias_subdirs(alias_path, user): + alias_dir = os.path.dirname(alias_path) + if not await exists(alias_dir): + await create_directory(alias_dir, user) def get_directory_path(work_dir, directory_name): @@ -117,10 +103,6 @@ def get_cache_file_path(bucket_cache_path, file_name): return cache_file_path -def is_sub_directory(sub_dir, from_dir): - return os.path.realpath(sub_dir).startswith(os.path.realpath(from_dir)) - - async def do_upload(directory, work_dir, user=None): try: if user is None: @@ -129,26 +111,13 @@ async def do_upload(directory, work_dir, user=None): directory_name = directory["name"] directory_path = get_directory_path(work_dir, directory_name) - create_directory(directory_path, user) - - # clear the upload directory every time - for filename in os.listdir(directory_path): - file_path = os.path.join(directory_path, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - rmtree(file_path) - except Exception: - pass + await create_empty_directory(directory_path, user) for alias_name, file_info in directory["files"].items(): alias_path = get_alias_path(directory_path, alias_name) # support for creating subdirs in alias name - alias_dir = os.path.dirname(alias_path) - if not os.path.exists(alias_dir): - create_directory(alias_dir, user) + await create_alias_subdirs(alias_path, user) if file_info["type"] == "url": content = file_info["content"] @@ -162,19 +131,18 @@ async def do_upload(directory, work_dir, user=None): # url type files have a cache dir to prevent repeat download bucket_cache_path = get_bucket_cache_path(work_dir, bucket_name) - if not os.path.exists(bucket_cache_path): - create_directory(bucket_cache_path, user) + await create_directory(bucket_cache_path, user) cache_file_path = get_cache_file_path(bucket_cache_path, file_name) # only download the file if it's not in the cache - if not os.path.exists(cache_file_path): + if not (await exists(cache_file_path)): await download_file(url, cache_file_path) # set ownership of file to the correct user if user: - os.chown(cache_file_path, uid=get_uid(user), gid=get_gid(user)) + await chown(cache_file_path, uid=get_uid(user), gid=get_gid(user)) # create a symlink pointing to the cached downloaded file - os.symlink(cache_file_path, alias_path) + await symlink(cache_file_path, alias_path) elif file_info["type"] == "text": content = file_info["content"] @@ -182,12 +150,11 @@ async def do_upload(directory, work_dir, user=None): if not valid_text_content(content): raise Exception("Invalid text content") - async with aiofiles.open(alias_path, "w+") as file: - await file.write(content["text"]) + await write_file(alias_path, content["text"]) # set ownership of file to the correct user if user: - os.chown(alias_path, uid=get_uid(user), gid=get_gid(user)) + await chown(alias_path, uid=get_uid(user), gid=get_gid(user)) except Exception as e: raise BadUpload(e) diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py index e274019b..73270660 100644 --- a/tests/e2e/__init__.py +++ b/tests/e2e/__init__.py @@ -6,4 +6,3 @@ VERSION_PATH = "/version" UPLOAD_PATH = "/upload" RUN_PATH = "/run" -RUN_PY_PATH = "/run-py" diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 681ef5e4..86c12a9c 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -5,7 +5,7 @@ from further_link.__main__ import create_app -from . import RUN_PATH, RUN_PY_PATH +from . import RUN_PATH @pytest.fixture @@ -19,24 +19,6 @@ async def http_client(aiohttp_client): yield client -@pytest.fixture() -async def run_py_ws_client(aiohttp_client): - client = await aiohttp_client(create_app()) - async with client.ws_connect(RUN_PY_PATH, receive_timeout=0.1) as client: - yield client - - -run_py_ws_client2 = run_py_ws_client - - -@pytest.fixture() -async def run_py_ws_client_query(aiohttp_client, query_params): - url = RUN_PY_PATH + "?" + urllib.parse.urlencode(query_params) - client = await aiohttp_client(create_app()) - async with client.ws_connect(url, receive_timeout=0.1) as client: - yield client - - @pytest.fixture() async def run_ws_client(aiohttp_client): client = await aiohttp_client(create_app()) diff --git a/tests/e2e/test_run_py.py b/tests/e2e/test_run_py.py deleted file mode 100644 index 5a3b6572..00000000 --- a/tests/e2e/test_run_py.py +++ /dev/null @@ -1,433 +0,0 @@ -import asyncio -import os -from datetime import datetime -from shutil import copy - -import pytest - -from further_link import __version__ -from further_link.util.message import create_message, parse_message - -from ..dirs import WORKING_DIRECTORY -from . import E2E_PATH -from .helpers import receive_data, wait_for_data - - -@pytest.mark.asyncio -async def test_bad_message(run_py_ws_client): - start_cmd = create_message("start") - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "error", "message", "Bad message") - - -@pytest.mark.asyncio -async def test_run_code_script(run_py_ws_client): - code = """\ -from datetime import datetime -print(datetime.now().strftime("%A")) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_script_with_directory(run_py_ws_client): - code = """\ -from datetime import datetime -print(datetime.now().strftime("%A")) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": "my-dirname"} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_relative_path(run_py_ws_client): - copy("{}/test_data/print_date.py".format(E2E_PATH), WORKING_DIRECTORY) - - start_cmd = create_message("start", {"sourcePath": "print_date.py"}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_run_code_absolute_path(run_py_ws_client): - start_cmd = create_message( - "start", {"sourcePath": "{}/test_data/print_date.py".format(E2E_PATH)} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - day = datetime.now().strftime("%A") - await wait_for_data(run_py_ws_client, "stdout", "output", day + "\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("query_params", [{"user": "root"}]) -@pytest.mark.skip(reason="Won't work in CI due to old sudo version") -async def test_run_as_user(run_py_ws_client_query): - # This test assumes non-root user with nopasswd sudo access... - code = "import getpass\nprint(getpass.getuser())" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client_query.send_str(start_cmd) - - await receive_data(run_py_ws_client_query, "started") - - await wait_for_data(run_py_ws_client_query, "stdout", "output", "root\n") - - await wait_for_data(run_py_ws_client_query, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_stop_early(run_py_ws_client): - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - -@pytest.mark.asyncio -async def test_bad_code(run_py_ws_client): - code = "i'm not valid python" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await asyncio.sleep(0.1) # wait for data - message = await run_py_ws_client.receive() - m_type, m_data, m_process = parse_message(message.data) - assert m_type == "stderr" - lines = m_data["output"].split("\n") - assert lines[0].startswith(" File") - assert lines[1] == " i'm not valid python" - assert lines[2][-1] == "^" - assert lines[3] == "SyntaxError: EOL while scanning string literal" - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 1) - - -@pytest.mark.asyncio -async def test_input(run_py_ws_client): - code = """s = input() -while "BYE" != s: - print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()]) - s = input()""" - - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data( - run_py_ws_client, "stdout", "output", "HUH?! SPEAK UP, SONNY!\n" - ) - - user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "NO, NOT SINCE 1930\n") - - user_input = create_message("stdin", {"input": "BYE\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("query_params", [{"pty": "1"}]) -async def test_input_pty(run_py_ws_client_query): - code = """s = input() -while "BYE" != s: - print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()]) - s = input()""" - - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client_query.send_str(start_cmd) - - await receive_data(run_py_ws_client_query, "started") - - user_input = create_message("stdin", {"input": "hello\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data( - run_py_ws_client_query, - "stdout", - "output", - "hello\r\nHUH?! SPEAK UP, SONNY!\r\n", - ) - - user_input = create_message("stdin", {"input": "HEY GRANDMA\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data( - run_py_ws_client_query, - "stdout", - "output", - "HEY GRANDMA\r\nNO, NOT SINCE 1930\r\n", - ) - - user_input = create_message("stdin", {"input": "BYE\r"}) - await run_py_ws_client_query.send_str(user_input) - - await wait_for_data(run_py_ws_client_query, "stdout", "output", "BYE\r\n") - - await wait_for_data(run_py_ws_client_query, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_two_clients(run_py_ws_client, run_py_ws_client2): - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await run_py_ws_client2.send_str(start_cmd) - - await receive_data(run_py_ws_client2, "started") - - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15, 100) - - stop_cmd = create_message("stop") - await run_py_ws_client2.send_str(stop_cmd) - - await wait_for_data(run_py_ws_client2, "stopped", "exitCode", -15, 100) - - -@pytest.mark.asyncio -async def test_out_of_order_commands(run_py_ws_client): - # send input - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send start - code = "while True: pass" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - # started - await receive_data(run_py_ws_client, "started") - - # send start - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # stopped - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - # send stop - stop_cmd = create_message("stop") - await run_py_ws_client.send_str(stop_cmd) - - # bad message - await receive_data(run_py_ws_client, "error", "message", "Bad message") - - -@pytest.mark.asyncio -async def test_discard_old_input(run_py_ws_client): - code = 'print("hello world")' - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - unterminated_input = create_message("stdin", {"input": "unterminated input"}) - await run_py_ws_client.send_str(unterminated_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "hello world\n", 100) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - code = "print(input())" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - user_input = create_message("stdin", {"input": "hello\n"}) - await run_py_ws_client.send_str(user_input) - - await wait_for_data(run_py_ws_client, "stdout", "output", "hello\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_use_lib(run_py_ws_client): - code = """\ -from further_link import __version__ -print(__version__) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stdout", "output", f"{__version__}\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -@pytest.mark.skipif("DISPLAY" not in os.environ, reason="requires UI") -async def test_use_display(run_py_ws_client): - code = """\ -from turtle import color -color('red') -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0, 5000) - - -@pytest.mark.asyncio -async def test_keyevent(run_py_ws_client): - code = """\ -from further_link import KeyboardButton -from signal import pause -a = KeyboardButton('a') -b = KeyboardButton('b') -a.when_pressed = lambda: print('a pressed') -b.when_released = lambda: print('b released') -pause() -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - await wait_for_data(run_py_ws_client, "keylisten", "output", "a") - await wait_for_data(run_py_ws_client, "keylisten", "output", "b") - - # keyevent ipc server can take a moment to start up - await asyncio.sleep(0.1) - - await run_py_ws_client.send_str( - create_message("keyevent", {"key": "a", "event": "keydown"}) - ) - - await wait_for_data(run_py_ws_client, "stdout", "output", "a pressed\n") - - await run_py_ws_client.send_str( - create_message("keyevent", {"key": "b", "event": "keyup"}) - ) - - await wait_for_data(run_py_ws_client, "stdout", "output", "b released\n") - - await run_py_ws_client.send_str(create_message("stop")) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", -15) - - -jpeg_pixel_b64 = "/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAABAAEBAREA/8QAFAABAAAAAAAAAAAAAAAAAAAAAP/EABQQAQAAAAAAAAAAAAAAAAAAAAD/2gAIAQEAAD8AP//Z" # noqa: E501 - - -@pytest.mark.asyncio -async def test_send_image_pil(run_py_ws_client): - code = """\ -from further_link import send_image -from PIL.Image import effect_noise -send_image(effect_noise((1, 1), 0)) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_send_image_opencv(run_py_ws_client): - code = """\ -from numpy import array -from further_link import send_image -from PIL.Image import effect_noise -send_image(array(effect_noise((1, 1), 0))) -""" - start_cmd = create_message("start", {"sourceScript": code}) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_send_image_with_directory(run_py_ws_client): - code = """\ -from further_link import send_image -from PIL.Image import effect_noise -send_image(effect_noise((1, 1), 0)) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": "my-dirname"} - ) - await run_py_ws_client.send_str(start_cmd) - - await wait_for_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "video", "output", jpeg_pixel_b64) - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) diff --git a/tests/e2e/test_run_py_upload.py b/tests/e2e/test_run_py_upload.py deleted file mode 100644 index 1b63a00e..00000000 --- a/tests/e2e/test_run_py_upload.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import os - -import aiofiles -import pytest - -from further_link.util.message import create_message, parse_message -from further_link.util.upload import get_bucket_cache_path, get_directory_path - -from ..dirs import WORKING_DIRECTORY -from .helpers import receive_data, wait_for_data -from .test_data.upload_data import directory - - -@pytest.mark.asyncio -async def test_upload(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - directory_path = get_directory_path(WORKING_DIRECTORY, directory["name"]) - - for alias_name, file_info in directory["files"].items(): - alias_path = os.path.join(directory_path, alias_name) - - assert os.path.isfile(alias_path) - - if file_info["type"] == "url": - content = file_info["content"] - bucket_name = content["bucketName"] - file_name = content["fileName"] - bucket_cache_path = get_bucket_cache_path(WORKING_DIRECTORY, bucket_name) - file_path = os.path.join(bucket_cache_path, file_name) - assert os.path.isfile(file_path) - - elif file_info["type"] == "url": - async with aiofiles.open(file_path) as file: - content = await file.read() - assert content == file_info["content"]["text"] - - -@pytest.mark.asyncio -async def test_upload_read_file(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - code = """\ -import os -with open(os.path.dirname(__file__) + '/cereal.csv', 'r') as f: - print(f.read(1000)) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": directory["name"]} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await asyncio.sleep(0.2) # wait for data - message = await run_py_ws_client.receive() - m_type, m_data, m_process = parse_message(message.data) - assert m_type == "stdout" - assert m_data["output"][788:796] == "Cheerios" - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_upload_import_script(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - code = """\ -from some_lib import call_some_lib -print(call_some_lib()) -""" - start_cmd = create_message( - "start", {"sourceScript": code, "directoryName": directory["name"]} - ) - await run_py_ws_client.send_str(start_cmd) - - await receive_data(run_py_ws_client, "started") - - await wait_for_data(run_py_ws_client, "stdout", "output", "some lib called\n") - - await wait_for_data(run_py_ws_client, "stopped", "exitCode", 0) - - -@pytest.mark.asyncio -async def test_upload_bad_file(run_py_ws_client, aioresponses): - aioresponses.add("https://placekitten.com/50/50", status=500) - - upload_cmd = create_message("upload", {"directory": directory}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad upload") - - -@pytest.mark.asyncio -async def test_upload_existing_directory(run_py_ws_client): - existing_directory = directory.copy() - existing_directory["name"] = "existing_directory" - - os.mkdir("{}/existing_directory".format(WORKING_DIRECTORY)) - - upload_cmd = create_message("upload", {"directory": existing_directory}) - await run_py_ws_client.send_str(upload_cmd) - - await wait_for_data(run_py_ws_client, "uploaded") - - -@pytest.mark.asyncio -async def test_upload_restricted_directory(run_py_ws_client): - # name directory something that tries to escape from working dir - restricted_directory = directory.copy() - restricted_directory["name"] = "../injected" - - upload_cmd = create_message("upload", {"directory": restricted_directory}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad upload") - - -@pytest.mark.asyncio -async def test_upload_empty_directory(run_py_ws_client): - upload_cmd = create_message("upload", {"directory": {}}) - await run_py_ws_client.send_str(upload_cmd) - - await receive_data(run_py_ws_client, "error", "message", "Bad message") diff --git a/tests/unit/test_process_handler.py b/tests/unit/test_process_handler.py index 1ed4c13d..5cb6d40d 100644 --- a/tests/unit/test_process_handler.py +++ b/tests/unit/test_process_handler.py @@ -7,7 +7,6 @@ from unittest.mock import patch import pytest -from aiofiles.threadpool.binary import AsyncFileIO from mock import AsyncMock from further_link.runner.process_handler import ProcessHandler @@ -75,8 +74,6 @@ async def test_pty(): await p.start('python3 -u -c "print(input())"') assert type(p.process) == Process assert p.pty - assert type(p.pty_master) == AsyncFileIO - assert type(p.pty_slave) == AsyncFileIO p.on_start.assert_called()