From f4ea3b2eea091286acd7e3ee7dac03b24c1966d8 Mon Sep 17 00:00:00 2001 From: Jiabin Hu Date: Sat, 7 Feb 2026 14:26:12 -0800 Subject: [PATCH 1/3] Add statement level query tag support by introducing it as a parameter on execute* methods --- examples/query_tags_example.py | 70 ++++++++++++++++++- .../sql/backend/databricks_client.py | 2 + src/databricks/sql/backend/thrift_backend.py | 22 ++++-- src/databricks/sql/client.py | 11 +++ src/databricks/sql/utils.py | 40 +++++++++++ tests/unit/test_util.py | 63 +++++++++++++++++ 6 files changed, 201 insertions(+), 7 deletions(-) diff --git a/examples/query_tags_example.py b/examples/query_tags_example.py index f615d082c..0661ebde4 100644 --- a/examples/query_tags_example.py +++ b/examples/query_tags_example.py @@ -7,11 +7,23 @@ Query Tags are key-value pairs that can be attached to SQL executions and will appear in the system.query.history table for analytical purposes. -Format: "key1:value1,key2:value2,key3:value3" +There are two ways to set query tags: +1. Session-level: Set in session_configuration (applies to all queries in the session) +2. Per-query level: Pass query_tags parameter to execute() or execute_async() (applies to specific query) + +Format: Dictionary with string keys and optional string values +Example: {"team": "engineering", "application": "etl", "priority": "high"} + +Special cases: +- If a value is None, only the key is included (no colon or value) +- Special characters (:, ,, \\) in values are automatically escaped +- Keys are not escaped (should be controlled identifiers) """ print("=== Query Tags Example ===\n") +# Example 1: Session-level query tags (old approach) +print("Example 1: Session-level query tags") with sql.connect( server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), http_path=os.getenv("DATABRICKS_HTTP_PATH"), @@ -21,10 +33,64 @@ 'ansi_mode': False } ) as connection: - + with connection.cursor() as cursor: cursor.execute("SELECT 1") result = cursor.fetchone() print(f" Result: {result[0]}") +print() + +# Example 2: Per-query query tags (new approach) +print("Example 2: Per-query query tags") +with sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), +) as connection: + + with connection.cursor() as cursor: + # Query 1: Tags for a critical ETL job + cursor.execute( + "SELECT 1", + query_tags={"team": "data-eng", "application": "etl", "priority": "high"} + ) + result = cursor.fetchone() + print(f" ETL Query Result: {result[0]}") + + # Query 2: Tags with None value (key-only tag) + cursor.execute( + "SELECT 2", + query_tags={"team": "analytics", "experimental": None} + ) + result = cursor.fetchone() + print(f" Experimental Query Result: {result[0]}") + + # Query 3: Tags with special characters (automatically escaped) + cursor.execute( + "SELECT 3", + query_tags={"description": "test:with:colons,and,commas"} + ) + result = cursor.fetchone() + print(f" Special Chars Query Result: {result[0]}") + +print() + +# Example 3: Async execution with query tags +print("Example 3: Async execution with query tags") +with sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), +) as connection: + + with connection.cursor() as cursor: + cursor.execute_async( + "SELECT 4", + query_tags={"team": "data-eng", "mode": "async"} + ) + cursor.get_async_execution_result() + result = cursor.fetchone() + print(f" Async Query Result: {result[0]}") + print("\n=== Query Tags Example Complete ===") \ No newline at end of file diff --git a/src/databricks/sql/backend/databricks_client.py b/src/databricks/sql/backend/databricks_client.py index 2213635fe..b772e7ddd 100644 --- a/src/databricks/sql/backend/databricks_client.py +++ b/src/databricks/sql/backend/databricks_client.py @@ -83,6 +83,7 @@ def execute_command( async_op: bool, enforce_embedded_schema_correctness: bool, row_limit: Optional[int] = None, + query_tags: Optional[Dict[str, Optional[str]]] = None, ) -> Union[ResultSet, None]: """ Executes a SQL command or query within the specified session. @@ -102,6 +103,7 @@ def execute_command( async_op: Whether to execute the command asynchronously enforce_embedded_schema_correctness: Whether to enforce schema correctness row_limit: Maximum number of rows in the response. + query_tags: Optional dictionary of query tags to apply for this query only. Returns: If async_op is False, returns a ResultSet object containing the diff --git a/src/databricks/sql/backend/thrift_backend.py b/src/databricks/sql/backend/thrift_backend.py index edee02bfa..e23f3389b 100644 --- a/src/databricks/sql/backend/thrift_backend.py +++ b/src/databricks/sql/backend/thrift_backend.py @@ -5,7 +5,7 @@ import math import time import threading -from typing import List, Optional, Union, Any, TYPE_CHECKING +from typing import Dict, List, Optional, Union, Any, TYPE_CHECKING from uuid import UUID from databricks.sql.common.unified_http_client import UnifiedHttpClient @@ -53,6 +53,7 @@ convert_arrow_based_set_to_arrow_table, convert_decimals_in_arrow_table, convert_column_based_set_to_arrow_table, + serialize_query_tags, ) from databricks.sql.types import SSLOptions from databricks.sql.backend.databricks_client import DatabricksClient @@ -1003,6 +1004,7 @@ def execute_command( async_op=False, enforce_embedded_schema_correctness=False, row_limit: Optional[int] = None, + query_tags: Optional[Dict[str, Optional[str]]] = None, ) -> Union["ResultSet", None]: thrift_handle = session_id.to_thrift_handle() if not thrift_handle: @@ -1022,6 +1024,19 @@ def execute_command( # DBR should be changed to use month_day_nano_interval intervalTypesAsArrow=False, ) + + # Build confOverlay with default configs and query_tags + merged_conf_overlay = { + # We want to receive proper Timestamp arrow types. + "spark.thriftserver.arrowBasedRowSet.timestampAsString": "false" + } + + # Serialize and add query_tags to confOverlay if provided + if query_tags: + serialized_tags = serialize_query_tags(query_tags) + if serialized_tags: + merged_conf_overlay["query_tags"] = serialized_tags + req = ttypes.TExecuteStatementReq( sessionHandle=thrift_handle, statement=operation, @@ -1036,10 +1051,7 @@ def execute_command( canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, canDownloadResult=use_cloud_fetch, - confOverlay={ - # We want to receive proper Timestamp arrow types. - "spark.thriftserver.arrowBasedRowSet.timestampAsString": "false" - }, + confOverlay=merged_conf_overlay, useArrowNativeTypes=spark_arrow_types, parameters=parameters, enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness, diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 1a246b7c1..afa319723 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1263,6 +1263,7 @@ def execute( parameters: Optional[TParameterCollection] = None, enforce_embedded_schema_correctness=False, input_stream: Optional[BinaryIO] = None, + query_tags: Optional[Dict[str, Optional[str]]] = None, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -1293,6 +1294,10 @@ def execute( Both will result in the query equivalent to "SELECT * FROM table WHERE field = 'foo' being sent to the server + :param query_tags: Optional dictionary of query tags to apply for this query only. + Tags are key-value pairs that can be used to identify and categorize queries. + Example: {"team": "data-eng", "application": "etl"} + :returns self """ @@ -1333,6 +1338,7 @@ def execute( async_op=False, enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, row_limit=self.row_limit, + query_tags=query_tags, ) if self.active_result_set and self.active_result_set.is_staging_operation: @@ -1349,6 +1355,7 @@ def execute_async( operation: str, parameters: Optional[TParameterCollection] = None, enforce_embedded_schema_correctness=False, + query_tags: Optional[Dict[str, Optional[str]]] = None, ) -> "Cursor": """ @@ -1356,6 +1363,9 @@ def execute_async( :param operation: :param parameters: + :param query_tags: Optional dictionary of query tags to apply for this query only. + Tags are key-value pairs that can be used to identify and categorize queries. + Example: {"team": "data-eng", "application": "etl"} :return: """ @@ -1392,6 +1402,7 @@ def execute_async( async_op=True, enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, row_limit=self.row_limit, + query_tags=query_tags, ) return self diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 043183ac2..f5a335f33 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -898,6 +898,46 @@ def concat_table_chunks( return pyarrow.concat_tables(table_chunks) +def serialize_query_tags(query_tags: Optional[Dict[str, Optional[str]]]) -> Optional[str]: + """ + Serialize query_tags dictionary to a string format. + + Format: "key1:value1,key2:value2" + Special cases: + - If value is None, omit the colon and value (e.g., "key1:value1,key2,key3:value3") + - Escape special characters (:, ,, \\) in values with a leading backslash + - Keys are not escaped (assumed to be controlled identifiers) + + Args: + query_tags: Dictionary of query tags where keys are strings and values are optional strings + + Returns: + Serialized string or None if query_tags is None or empty + """ + if not query_tags: + return None + + def escape_value(value: str) -> str: + """Escape special characters in tag values.""" + # Escape backslash first to avoid double-escaping + value = value.replace("\\", "\\\\") + # Escape colon and comma + value = value.replace(":", "\\:") + value = value.replace(",", "\\,") + return value + + serialized_parts = [] + for key, value in query_tags.items(): + if value is None: + # No colon or value when value is None + serialized_parts.append(key) + else: + escaped_value = escape_value(value) + serialized_parts.append(f"{key}:{escaped_value}") + + return ",".join(serialized_parts) + + def build_client_context(server_hostname: str, version: str, **kwargs): """Build ClientContext for HTTP client configuration.""" from databricks.sql.auth.common import ClientContext diff --git a/tests/unit/test_util.py b/tests/unit/test_util.py index 713342b2e..57eee9745 100644 --- a/tests/unit/test_util.py +++ b/tests/unit/test_util.py @@ -6,6 +6,7 @@ convert_to_assigned_datatypes_in_column_table, ColumnTable, concat_table_chunks, + serialize_query_tags, ) try: @@ -161,3 +162,65 @@ def test_concat_table_chunks__incorrect_column_names_error(self): with pytest.raises(ValueError): concat_table_chunks([column_table1, column_table2]) + + def test_serialize_query_tags_basic(self): + """Test basic query tags serialization""" + query_tags = {"team": "data-eng", "application": "etl"} + result = serialize_query_tags(query_tags) + assert result == "team:data-eng,application:etl" + + def test_serialize_query_tags_with_none_value(self): + """Test query tags with None value (should omit colon and value)""" + query_tags = {"key1": "value1", "key2": None, "key3": "value3"} + result = serialize_query_tags(query_tags) + assert result == "key1:value1,key2,key3:value3" + + def test_serialize_query_tags_with_special_chars(self): + """Test query tags with special characters (colon, comma, backslash)""" + query_tags = { + "key1": "value:with:colons", + "key2": "value,with,commas", + "key3": "value\\with\\backslashes", + } + result = serialize_query_tags(query_tags) + assert ( + result + == "key1:value\\:with\\:colons,key2:value\\,with\\,commas,key3:value\\\\with\\\\backslashes" + ) + + def test_serialize_query_tags_with_mixed_special_chars(self): + """Test query tags with mixed special characters""" + query_tags = {"key1": "a:b,c\\d"} + result = serialize_query_tags(query_tags) + assert result == "key1:a\\:b\\,c\\\\d" + + def test_serialize_query_tags_empty_dict(self): + """Test serialization with empty dictionary""" + query_tags = {} + result = serialize_query_tags(query_tags) + assert result is None + + def test_serialize_query_tags_none(self): + """Test serialization with None input""" + result = serialize_query_tags(None) + assert result is None + + def test_serialize_query_tags_with_special_chars_in_key(self): + """Test query tags with special characters in keys (keys are not escaped)""" + query_tags = { + "key:with:colons": "value1", + "key,with,commas": "value2", + "key\\with\\backslashes": "value3", + } + result = serialize_query_tags(query_tags) + # Keys are not escaped, only values are + assert ( + result + == "key:with:colons:value1,key,with,commas:value2,key\\with\\backslashes:value3" + ) + + def test_serialize_query_tags_all_none_values(self): + """Test query tags where all values are None""" + query_tags = {"key1": None, "key2": None, "key3": None} + result = serialize_query_tags(query_tags) + assert result == "key1,key2,key3" From 051f90825b5929133fe565058937afd4e98b0ea2 Mon Sep 17 00:00:00 2001 From: Jiabin Hu Date: Sat, 7 Feb 2026 14:44:44 -0800 Subject: [PATCH 2/3] Add query_tags support to executemany method - Added query_tags parameter to executemany() method - Query tags are applied to all queries in the batch - Updated example to demonstrate executemany usage with query_tags - All tests pass (122/122 client tests) --- examples/query_tags_example.py | 20 ++++++++++++++++++++ src/databricks/sql/client.py | 13 +++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/examples/query_tags_example.py b/examples/query_tags_example.py index 0661ebde4..ab40e9bcd 100644 --- a/examples/query_tags_example.py +++ b/examples/query_tags_example.py @@ -93,4 +93,24 @@ result = cursor.fetchone() print(f" Async Query Result: {result[0]}") +print() + +# Example 4: executemany with query tags +print("Example 4: executemany with query tags") +with sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), +) as connection: + + with connection.cursor() as cursor: + # Execute multiple queries with the same tags + cursor.executemany( + "SELECT ?", + [[5], [6], [7]], + query_tags={"team": "data-eng", "batch": "executemany"} + ) + result = cursor.fetchone() + print(f" Executemany Query Result (last): {result[0]}") + print("\n=== Query Tags Example Complete ===") \ No newline at end of file diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index afa319723..efaf6ae4d 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1459,7 +1459,12 @@ def get_async_execution_result(self): session_id_hex=self.connection.get_session_id_hex(), ) - def executemany(self, operation, seq_of_parameters): + def executemany( + self, + operation, + seq_of_parameters, + query_tags: Optional[Dict[str, Optional[str]]] = None, + ): """ Execute the operation once for every set of passed in parameters. @@ -1468,10 +1473,14 @@ def executemany(self, operation, seq_of_parameters): Only the final result set is retained. + :param query_tags: Optional dictionary of query tags to apply for all queries in this batch. + Tags are key-value pairs that can be used to identify and categorize queries. + Example: {"team": "data-eng", "application": "etl"} + :returns self """ for parameters in seq_of_parameters: - self.execute(operation, parameters) + self.execute(operation, parameters, query_tags=query_tags) return self @log_latency(StatementType.METADATA) From 3e5b657846838c2e29b3dfa590bb85e6b999adc9 Mon Sep 17 00:00:00 2001 From: Jiabin Hu Date: Mon, 9 Feb 2026 11:47:33 -0800 Subject: [PATCH 3/3] add example that doesn't have tag --- examples/query_tags_example.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/examples/query_tags_example.py b/examples/query_tags_example.py index ab40e9bcd..83c1b364c 100644 --- a/examples/query_tags_example.py +++ b/examples/query_tags_example.py @@ -74,6 +74,11 @@ result = cursor.fetchone() print(f" Special Chars Query Result: {result[0]}") + # Query 4: No tags (demonstrates tags don't persist from previous queries) + cursor.execute("SELECT 4") + result = cursor.fetchone() + print(f" No Tags Query Result: {result[0]}") + print() # Example 3: Async execution with query tags @@ -86,7 +91,7 @@ with connection.cursor() as cursor: cursor.execute_async( - "SELECT 4", + "SELECT 5", query_tags={"team": "data-eng", "mode": "async"} ) cursor.get_async_execution_result() @@ -107,7 +112,7 @@ # Execute multiple queries with the same tags cursor.executemany( "SELECT ?", - [[5], [6], [7]], + [[6], [7], [8]], query_tags={"team": "data-eng", "batch": "executemany"} ) result = cursor.fetchone()