diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index 90e0f7b8..48b75def 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -123,7 +123,7 @@ ApiKernelMetadata, ApiDeleteKernelRequest, ) -from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType +from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus, KernelsListSortType, KernelsListViewType from kagglesdk.models.types.model_api_service import ( ApiListModelsRequest, ApiCreateModelRequest, @@ -3601,6 +3601,89 @@ def kernels_status_cli(self, kernel, kernel_opt=None): else: print('%s has status "%s"' % (kernel, status)) + def kernels_logs(self, kernel: str) -> str: + """Retrieves the execution log for a specified kernel. + + Args: + kernel (str): The kernel identifier in the format owner/kernel-slug. + + Returns: + str: The log content from the kernel's latest session. + """ + if kernel is None: + raise ValueError("A kernel must be specified") + if "/" in kernel: + self.validate_kernel_string(kernel) + kernel_url_list = kernel.split("/") + owner_slug = kernel_url_list[0] + kernel_slug = kernel_url_list[1] + else: + owner_slug = self.get_config_value(self.CONFIG_NAME_USER) + kernel_slug = kernel + + with self.build_kaggle_client() as kaggle: + request = ApiListKernelSessionOutputRequest() + request.user_name = owner_slug + request.kernel_slug = kernel_slug + try: + response = kaggle.kernels.kernels_api_client.list_kernel_session_output(request) + except HTTPError as e: + if e.response.status_code in (401, 403): + raise ValueError( + f"Cannot access kernel '{kernel}' (Permission 'kernels.get' was denied). " + "The most likely cause is a wrong kernel slug. " + "Use the slug from the notebook URL (kaggle.com/code/owner/KERNEL-SLUG)." + ) + raise + return response.log or "" + + def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=5): + """Print kernel execution logs to stdout. + + Args: + kernel: The kernel for which to retrieve the logs. + kernel_opt: An alternative option to providing a kernel. + follow: If True, continuously poll and print new log lines. + interval: Polling interval in seconds for follow mode (default 5). + """ + kernel = kernel or kernel_opt + terminal_statuses = { + KernelWorkerStatus.COMPLETE, + KernelWorkerStatus.ERROR, + KernelWorkerStatus.CANCEL_ACKNOWLEDGED, + } + printed_lines = 0 + + while True: + log = self.kernels_logs(kernel) + lines = log.split("\n") if log else [] + + if follow: + new_lines = lines[printed_lines:] + if new_lines: + print("\n".join(new_lines), flush=True) + printed_lines = len(lines) + + # Check if the kernel has reached a terminal status + try: + status_response = self.kernels_status(kernel) + status = status_response.status + except Exception: + break + if status in terminal_statuses: + # Fetch final logs one more time + log = self.kernels_logs(kernel) + lines = log.split("\n") if log else [] + final_new_lines = lines[printed_lines:] + if final_new_lines: + print("\n".join(final_new_lines), flush=True) + break + + time.sleep(interval) + else: + print(log) + break + def model_get(self, model: str) -> ApiModel: """Gets a model. diff --git a/src/kaggle/cli.py b/src/kaggle/cli.py index 754b4237..3b7a00ee 100644 --- a/src/kaggle/cli.py +++ b/src/kaggle/cli.py @@ -614,6 +614,24 @@ def parse_kernels(subparsers) -> None: parser_kernels_status._action_groups.append(parser_kernels_status_optional) parser_kernels_status.set_defaults(func=api.kernels_status_cli) + # Kernels logs + parser_kernels_logs = subparsers_kernels.add_parser( + "logs", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_logs + ) + parser_kernels_logs_optional = parser_kernels_logs._action_groups.pop() + parser_kernels_logs_optional.add_argument("kernel", nargs="?", default=None, help=Help.param_kernel) + parser_kernels_logs_optional.add_argument( + "-k", "--kernel", dest="kernel_opt", required=False, help=argparse.SUPPRESS + ) + parser_kernels_logs_optional.add_argument( + "-f", "--follow", dest="follow", action="store_true", required=False, help=Help.param_kernel_logs_follow + ) + parser_kernels_logs_optional.add_argument( + "--interval", dest="interval", default=5, type=int, required=False, help=Help.param_kernel_logs_interval + ) + parser_kernels_logs._action_groups.append(parser_kernels_logs_optional) + parser_kernels_logs.set_defaults(func=api.kernels_logs_cli) + # Kernels delete parser_kernels_delete = subparsers_kernels.add_parser( "delete", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_delete @@ -1073,7 +1091,7 @@ class Help(object): ] competitions_choices = ["list", "files", "download", "submit", "submissions", "leaderboard"] datasets_choices = ["list", "files", "download", "create", "version", "init", "metadata", "status", "delete"] - kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete"] + kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "logs", "update", "delete"] models_choices = ["instances", "i", "variations", "v", "get", "list", "init", "create", "delete", "update"] model_instances_choices = ["versions", "v", "get", "files", "list", "init", "create", "delete", "update"] model_instance_versions_choices = ["init", "create", "download", "delete", "files", "list"] @@ -1138,6 +1156,7 @@ class Help(object): command_kernels_pull = "Pull down code from a kernel" command_kernels_output = "Get data output from the latest kernel run" command_kernels_status = "Display the status of the latest kernel run" + command_kernels_logs = "Print the execution logs from the latest kernel run" command_kernels_delete = "Delete a kernel" # Models commands @@ -1305,6 +1324,8 @@ class Help(object): "Regex pattern to match against filenames. Only files matching the pattern will be downloaded." ) param_kernel_acc = "Specify the type of accelerator to use for the kernel run" + param_kernel_logs_follow = "Continuously poll and print new log lines (like tail -f)" + param_kernel_logs_interval = "Polling interval in seconds for follow mode (default 5)" # Models params param_model = "Model URL suffix in format /" diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py new file mode 100644 index 00000000..1431b8e6 --- /dev/null +++ b/tests/test_kernels_logs.py @@ -0,0 +1,202 @@ +# coding=utf-8 +import unittest +from unittest.mock import patch, MagicMock, call +import io +import sys + +sys.path.insert(0, "..") + +from kaggle.api.kaggle_api_extended import KaggleApi +from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus + + +class TestKernelsLogs(unittest.TestCase): + """Tests for the kernels_logs and kernels_logs_cli methods.""" + + def setUp(self): + self.api = KaggleApi.__new__(KaggleApi) + self.api.config_values = {"username": "testuser"} + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_returns_log_string(self, mock_validate, mock_client): + """Test that kernels_logs returns the log string from the API response.""" + mock_response = MagicMock() + mock_response.log = "Line 1\nLine 2\nLine 3" + mock_kaggle = MagicMock() + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response + mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle) + mock_client.return_value.__exit__ = MagicMock(return_value=False) + + result = self.api.kernels_logs("owner/kernel-slug") + self.assertEqual(result, "Line 1\nLine 2\nLine 3") + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_returns_empty_string_when_no_log(self, mock_validate, mock_client): + """Test that kernels_logs returns empty string when log is None.""" + mock_response = MagicMock() + mock_response.log = None + mock_kaggle = MagicMock() + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response + mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle) + mock_client.return_value.__exit__ = MagicMock(return_value=False) + + result = self.api.kernels_logs("owner/kernel-slug") + self.assertEqual(result, "") + + @patch.object(KaggleApi, "build_kaggle_client") + def test_kernels_logs_raises_when_kernel_none(self, mock_client): + """Test that kernels_logs raises ValueError when kernel is None.""" + with self.assertRaises(ValueError): + self.api.kernels_logs(None) + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "get_config_value", return_value="defaultuser") + def test_kernels_logs_uses_default_user_for_bare_slug(self, mock_config, mock_client): + """Test that a bare kernel slug uses the default username.""" + mock_response = MagicMock() + mock_response.log = "some log" + mock_kaggle = MagicMock() + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = mock_response + mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle) + mock_client.return_value.__exit__ = MagicMock(return_value=False) + + result = self.api.kernels_logs("my-kernel") + self.assertEqual(result, "some log") + + # Verify the request used the default user + call_args = mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_args + request = call_args[0][0] + self.assertEqual(request.user_name, "defaultuser") + self.assertEqual(request.kernel_slug, "my-kernel") + + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_oneshot(self, mock_logs): + """Test one-shot mode prints log to stdout.""" + mock_logs.return_value = "Line 1\nLine 2\nDone" + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli("owner/kernel-slug") + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(captured.getvalue(), "Line 1\nLine 2\nDone\n") + + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_uses_kernel_opt(self, mock_logs): + """Test that kernel_opt is used when kernel is None.""" + mock_logs.return_value = "log output" + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli(None, kernel_opt="owner/kernel-slug") + finally: + sys.stdout = sys.__stdout__ + + mock_logs.assert_called_once_with("owner/kernel-slug") + + @patch("time.sleep") + @patch.object(KaggleApi, "kernels_status") + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_follow_mode(self, mock_logs, mock_status, mock_sleep): + """Test follow mode polls and prints new lines, stops on terminal status.""" + # First poll: kernel is running, returns some log lines + # Second poll: kernel is complete, returns more log lines + mock_logs.side_effect = [ + "Line 1\nLine 2", + "Line 1\nLine 2\nLine 3\nLine 4", + "Line 1\nLine 2\nLine 3\nLine 4", # final fetch after terminal status + ] + + status_running = MagicMock() + status_running.status = KernelWorkerStatus.RUNNING + status_complete = MagicMock() + status_complete.status = KernelWorkerStatus.COMPLETE + mock_status.side_effect = [status_running, status_complete] + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) + finally: + sys.stdout = sys.__stdout__ + + output = captured.getvalue() + # First poll prints "Line 1\nLine 2" + # Second poll prints "Line 3\nLine 4" + self.assertIn("Line 1", output) + self.assertIn("Line 2", output) + self.assertIn("Line 3", output) + self.assertIn("Line 4", output) + + # Verify sleep was called with the right interval + mock_sleep.assert_called_with(1) + + @patch("time.sleep") + @patch.object(KaggleApi, "kernels_status") + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_follow_stops_on_error(self, mock_logs, mock_status, mock_sleep): + """Test follow mode stops when kernel status is ERROR.""" + mock_logs.side_effect = [ + "Line 1", + "Line 1", # final fetch + ] + + status_error = MagicMock() + status_error.status = KernelWorkerStatus.ERROR + mock_status.return_value = status_error + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) + finally: + sys.stdout = sys.__stdout__ + + # Should only poll once before stopping + self.assertEqual(mock_status.call_count, 1) + + @patch("time.sleep") + @patch.object(KaggleApi, "kernels_status") + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_follow_stops_on_cancel(self, mock_logs, mock_status, mock_sleep): + """Test follow mode stops when kernel status is CANCEL_ACKNOWLEDGED.""" + mock_logs.side_effect = [ + "Cancelled", + "Cancelled", # final fetch + ] + + status_cancel = MagicMock() + status_cancel.status = KernelWorkerStatus.CANCEL_ACKNOWLEDGED + mock_status.return_value = status_cancel + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(mock_status.call_count, 1) + + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_empty_log(self, mock_logs): + """Test one-shot mode with empty log.""" + mock_logs.return_value = "" + + captured = io.StringIO() + sys.stdout = captured + try: + self.api.kernels_logs_cli("owner/kernel-slug") + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(captured.getvalue(), "\n") + + +if __name__ == "__main__": + unittest.main()