Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion src/kaggle/api/kaggle_api_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
ApiKernelMetadata,
ApiDeleteKernelRequest,
)
from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType
from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus, KernelsListSortType, KernelsListViewType
from kagglesdk.models.types.model_api_service import (
ApiListModelsRequest,
ApiCreateModelRequest,
Expand Down Expand Up @@ -3601,6 +3601,89 @@ def kernels_status_cli(self, kernel, kernel_opt=None):
else:
print('%s has status "%s"' % (kernel, status))

def kernels_logs(self, kernel: str) -> str:
"""Retrieves the execution log for a specified kernel.

Args:
kernel (str): The kernel identifier in the format owner/kernel-slug.

Returns:
str: The log content from the kernel's latest session.
"""
if kernel is None:
raise ValueError("A kernel must be specified")
if "/" in kernel:
self.validate_kernel_string(kernel)
kernel_url_list = kernel.split("/")
owner_slug = kernel_url_list[0]
kernel_slug = kernel_url_list[1]
else:
owner_slug = self.get_config_value(self.CONFIG_NAME_USER)
kernel_slug = kernel

with self.build_kaggle_client() as kaggle:
request = ApiListKernelSessionOutputRequest()
request.user_name = owner_slug
request.kernel_slug = kernel_slug
try:
response = kaggle.kernels.kernels_api_client.list_kernel_session_output(request)
except HTTPError as e:
if e.response.status_code in (401, 403):
raise ValueError(
f"Cannot access kernel '{kernel}' (Permission 'kernels.get' was denied). "
"The most likely cause is a wrong kernel slug. "
"Use the slug from the notebook URL (kaggle.com/code/owner/KERNEL-SLUG)."
)
raise
return response.log or ""

def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=5):
"""Print kernel execution logs to stdout.

Args:
kernel: The kernel for which to retrieve the logs.
kernel_opt: An alternative option to providing a kernel.
follow: If True, continuously poll and print new log lines.
interval: Polling interval in seconds for follow mode (default 5).
"""
kernel = kernel or kernel_opt
terminal_statuses = {
KernelWorkerStatus.COMPLETE,
KernelWorkerStatus.ERROR,
KernelWorkerStatus.CANCEL_ACKNOWLEDGED,
}
printed_lines = 0

while True:
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []

if follow:
new_lines = lines[printed_lines:]
if new_lines:
print("\n".join(new_lines), flush=True)
printed_lines = len(lines)

# Check if the kernel has reached a terminal status
try:
status_response = self.kernels_status(kernel)
status = status_response.status
except Exception:
break
if status in terminal_statuses:
# Fetch final logs one more time
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []
final_new_lines = lines[printed_lines:]
if final_new_lines:
print("\n".join(final_new_lines), flush=True)
break

time.sleep(interval)
else:
print(log)
break

def model_get(self, model: str) -> ApiModel:
"""Gets a model.

Expand Down
23 changes: 22 additions & 1 deletion src/kaggle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,24 @@ def parse_kernels(subparsers) -> None:
parser_kernels_status._action_groups.append(parser_kernels_status_optional)
parser_kernels_status.set_defaults(func=api.kernels_status_cli)

# Kernels logs
parser_kernels_logs = subparsers_kernels.add_parser(
"logs", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_logs
)
parser_kernels_logs_optional = parser_kernels_logs._action_groups.pop()
parser_kernels_logs_optional.add_argument("kernel", nargs="?", default=None, help=Help.param_kernel)
parser_kernels_logs_optional.add_argument(
"-k", "--kernel", dest="kernel_opt", required=False, help=argparse.SUPPRESS
)
parser_kernels_logs_optional.add_argument(
"-f", "--follow", dest="follow", action="store_true", required=False, help=Help.param_kernel_logs_follow
)
parser_kernels_logs_optional.add_argument(
"--interval", dest="interval", default=5, type=int, required=False, help=Help.param_kernel_logs_interval
)
parser_kernels_logs._action_groups.append(parser_kernels_logs_optional)
parser_kernels_logs.set_defaults(func=api.kernels_logs_cli)

# Kernels delete
parser_kernels_delete = subparsers_kernels.add_parser(
"delete", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_delete
Expand Down Expand Up @@ -1073,7 +1091,7 @@ class Help(object):
]
competitions_choices = ["list", "files", "download", "submit", "submissions", "leaderboard"]
datasets_choices = ["list", "files", "download", "create", "version", "init", "metadata", "status", "delete"]
kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete"]
kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "logs", "update", "delete"]
models_choices = ["instances", "i", "variations", "v", "get", "list", "init", "create", "delete", "update"]
model_instances_choices = ["versions", "v", "get", "files", "list", "init", "create", "delete", "update"]
model_instance_versions_choices = ["init", "create", "download", "delete", "files", "list"]
Expand Down Expand Up @@ -1138,6 +1156,7 @@ class Help(object):
command_kernels_pull = "Pull down code from a kernel"
command_kernels_output = "Get data output from the latest kernel run"
command_kernels_status = "Display the status of the latest kernel run"
command_kernels_logs = "Print the execution logs from the latest kernel run"
command_kernels_delete = "Delete a kernel"

# Models commands
Expand Down Expand Up @@ -1305,6 +1324,8 @@ class Help(object):
"Regex pattern to match against filenames. Only files matching the pattern will be downloaded."
)
param_kernel_acc = "Specify the type of accelerator to use for the kernel run"
param_kernel_logs_follow = "Continuously poll and print new log lines (like tail -f)"
param_kernel_logs_interval = "Polling interval in seconds for follow mode (default 5)"

# Models params
param_model = "Model URL suffix in format <owner>/<model-name>"
Expand Down
202 changes: 202 additions & 0 deletions tests/test_kernels_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# coding=utf-8
import unittest
from unittest.mock import patch, MagicMock, call
import io
import sys

sys.path.insert(0, "..")

from kaggle.api.kaggle_api_extended import KaggleApi
from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus


class TestKernelsLogs(unittest.TestCase):
"""Tests for the kernels_logs and kernels_logs_cli methods."""

def setUp(self):
self.api = KaggleApi.__new__(KaggleApi)
self.api.config_values = {"username": "testuser"}

@patch.object(KaggleApi, "build_kaggle_client")
@patch.object(KaggleApi, "validate_kernel_string")
def test_kernels_logs_returns_log_string(self, mock_validate, mock_client):
"""Test that kernels_logs returns the log string from the API response."""
mock_response = MagicMock()
mock_response.log = "Line 1\nLine 2\nLine 3"
mock_kaggle = MagicMock()
mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response
mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_client.return_value.__exit__ = MagicMock(return_value=False)

result = self.api.kernels_logs("owner/kernel-slug")
self.assertEqual(result, "Line 1\nLine 2\nLine 3")

@patch.object(KaggleApi, "build_kaggle_client")
@patch.object(KaggleApi, "validate_kernel_string")
def test_kernels_logs_returns_empty_string_when_no_log(self, mock_validate, mock_client):
"""Test that kernels_logs returns empty string when log is None."""
mock_response = MagicMock()
mock_response.log = None
mock_kaggle = MagicMock()
mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response
mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_client.return_value.__exit__ = MagicMock(return_value=False)

result = self.api.kernels_logs("owner/kernel-slug")
self.assertEqual(result, "")

@patch.object(KaggleApi, "build_kaggle_client")
def test_kernels_logs_raises_when_kernel_none(self, mock_client):
"""Test that kernels_logs raises ValueError when kernel is None."""
with self.assertRaises(ValueError):
self.api.kernels_logs(None)

@patch.object(KaggleApi, "build_kaggle_client")
@patch.object(KaggleApi, "get_config_value", return_value="defaultuser")
def test_kernels_logs_uses_default_user_for_bare_slug(self, mock_config, mock_client):
"""Test that a bare kernel slug uses the default username."""
mock_response = MagicMock()
mock_response.log = "some log"
mock_kaggle = MagicMock()
mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response
mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_client.return_value.__exit__ = MagicMock(return_value=False)

result = self.api.kernels_logs("my-kernel")
self.assertEqual(result, "some log")

# Verify the request used the default user
call_args = mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_args
request = call_args[0][0]
self.assertEqual(request.user_name, "defaultuser")
self.assertEqual(request.kernel_slug, "my-kernel")

@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_oneshot(self, mock_logs):
"""Test one-shot mode prints log to stdout."""
mock_logs.return_value = "Line 1\nLine 2\nDone"

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli("owner/kernel-slug")
finally:
sys.stdout = sys.__stdout__

self.assertEqual(captured.getvalue(), "Line 1\nLine 2\nDone\n")

@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_uses_kernel_opt(self, mock_logs):
"""Test that kernel_opt is used when kernel is None."""
mock_logs.return_value = "log output"

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli(None, kernel_opt="owner/kernel-slug")
finally:
sys.stdout = sys.__stdout__

mock_logs.assert_called_once_with("owner/kernel-slug")

@patch("time.sleep")
@patch.object(KaggleApi, "kernels_status")
@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_follow_mode(self, mock_logs, mock_status, mock_sleep):
"""Test follow mode polls and prints new lines, stops on terminal status."""
# First poll: kernel is running, returns some log lines
# Second poll: kernel is complete, returns more log lines
mock_logs.side_effect = [
"Line 1\nLine 2",
"Line 1\nLine 2\nLine 3\nLine 4",
"Line 1\nLine 2\nLine 3\nLine 4", # final fetch after terminal status
]

status_running = MagicMock()
status_running.status = KernelWorkerStatus.RUNNING
status_complete = MagicMock()
status_complete.status = KernelWorkerStatus.COMPLETE
mock_status.side_effect = [status_running, status_complete]

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1)
finally:
sys.stdout = sys.__stdout__

output = captured.getvalue()
# First poll prints "Line 1\nLine 2"
# Second poll prints "Line 3\nLine 4"
self.assertIn("Line 1", output)
self.assertIn("Line 2", output)
self.assertIn("Line 3", output)
self.assertIn("Line 4", output)

# Verify sleep was called with the right interval
mock_sleep.assert_called_with(1)

@patch("time.sleep")
@patch.object(KaggleApi, "kernels_status")
@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_follow_stops_on_error(self, mock_logs, mock_status, mock_sleep):
"""Test follow mode stops when kernel status is ERROR."""
mock_logs.side_effect = [
"Line 1",
"Line 1", # final fetch
]

status_error = MagicMock()
status_error.status = KernelWorkerStatus.ERROR
mock_status.return_value = status_error

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1)
finally:
sys.stdout = sys.__stdout__

# Should only poll once before stopping
self.assertEqual(mock_status.call_count, 1)

@patch("time.sleep")
@patch.object(KaggleApi, "kernels_status")
@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_follow_stops_on_cancel(self, mock_logs, mock_status, mock_sleep):
"""Test follow mode stops when kernel status is CANCEL_ACKNOWLEDGED."""
mock_logs.side_effect = [
"Cancelled",
"Cancelled", # final fetch
]

status_cancel = MagicMock()
status_cancel.status = KernelWorkerStatus.CANCEL_ACKNOWLEDGED
mock_status.return_value = status_cancel

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1)
finally:
sys.stdout = sys.__stdout__

self.assertEqual(mock_status.call_count, 1)

@patch.object(KaggleApi, "kernels_logs")
def test_kernels_logs_cli_empty_log(self, mock_logs):
"""Test one-shot mode with empty log."""
mock_logs.return_value = ""

captured = io.StringIO()
sys.stdout = captured
try:
self.api.kernels_logs_cli("owner/kernel-slug")
finally:
sys.stdout = sys.__stdout__

self.assertEqual(captured.getvalue(), "\n")


if __name__ == "__main__":
unittest.main()
Loading