diff --git a/mssql/base.py b/mssql/base.py index e6068e43..645cc0ff 100644 --- a/mssql/base.py +++ b/mssql/base.py @@ -99,8 +99,13 @@ def encode_value(v): def handle_datetimeoffset(dto_value): # Decode bytes returned from SQL Server # source: https://github.com/mkleehammer/pyodbc/wiki/Using-an-Output-Converter-function - tup = struct.unpack("<6hI2h", dto_value) # e.g., (2017, 3, 16, 10, 35, 18, 500000000) - return datetime.datetime(tup[0], tup[1], tup[2], tup[3], tup[4], tup[5], tup[6] // 1000) + # Format: 6 shorts (year, month, day, hour, minute, second), + # 1 unsigned int (nanoseconds), 2 shorts (tz_offset_hour, tz_offset_minute) + tup = struct.unpack("<6hI2h", dto_value) + return datetime.datetime( + tup[0], tup[1], tup[2], tup[3], tup[4], tup[5], tup[6] // 1000, + datetime.timezone(datetime.timedelta(hours=tup[7], minutes=tup[8])), + ) class DatabaseWrapper(BaseDatabaseWrapper): diff --git a/mssql/functions.py b/mssql/functions.py index 6040859f..79ff2055 100644 --- a/mssql/functions.py +++ b/mssql/functions.py @@ -5,6 +5,7 @@ import itertools from django import VERSION +from django.conf import settings from django.core import validators from django.db import NotSupportedError, connections, transaction from django.db.models import BooleanField, CheckConstraint, Q, Value @@ -169,9 +170,9 @@ def sqlserver_exists(self, compiler, connection, template=None, **extra_context) return sql, params def sqlserver_now(self, compiler, connection, **extra_context): - return self.as_sql( - compiler, connection, template="SYSDATETIME()", **extra_context - ) + if settings.USE_TZ: + return self.as_sql(compiler, connection, template="SYSDATETIMEOFFSET()", **extra_context) + return self.as_sql(compiler, connection, template="SYSDATETIME()", **extra_context) def sqlserver_lookup(self, compiler, connection): # MSSQL doesn't allow EXISTS() to be compared to another expression diff --git a/testapp/tests/test_base.py b/testapp/tests/test_base.py index 196eb1cf..4a27d9c5 100644 --- a/testapp/tests/test_base.py +++ b/testapp/tests/test_base.py @@ -138,10 +138,8 @@ def test_empty_token(self): class TestHandleDatetimeoffset(SimpleTestCase): """Tests for the handle_datetimeoffset function.""" - def test_datetime_conversion(self): - """Test conversion of binary datetime offset to Python datetime.""" - # Pack a known datetime: 2023-06-15 14:30:45.123456 - # Format: year, month, day, hour, minute, second, nanoseconds (as microseconds * 1000), tz_hour, tz_min + def test_datetime_conversion_utc(self): + """Test conversion of binary datetime offset with UTC (zero offset).""" dto_bytes = struct.pack("<6hI2h", 2023, 6, 15, 14, 30, 45, 123456000, 0, 0) result = handle_datetimeoffset(dto_bytes) @@ -153,10 +151,50 @@ def test_datetime_conversion(self): self.assertEqual(result.minute, 30) self.assertEqual(result.second, 45) self.assertEqual(result.microsecond, 123456) + self.assertIsNotNone(result.tzinfo) + self.assertEqual(result.utcoffset(), datetime.timedelta(0)) - def test_datetime_edge_case(self): - """Test with edge case values.""" - # Midnight on Jan 1, 2000 + def test_datetime_positive_offset(self): + """Test conversion with a positive timezone offset (+05:30 IST).""" + dto_bytes = struct.pack("<6hI2h", 2024, 1, 10, 9, 0, 0, 0, 5, 30) + result = handle_datetimeoffset(dto_bytes) + + self.assertEqual(result.year, 2024) + self.assertEqual(result.hour, 9) + self.assertIsNotNone(result.tzinfo) + self.assertEqual(result.utcoffset(), datetime.timedelta(hours=5, minutes=30)) + + def test_datetime_negative_offset(self): + """Test conversion with a negative timezone offset (-05:00 EST).""" + dto_bytes = struct.pack("<6hI2h", 2024, 12, 25, 18, 0, 0, 0, -5, 0) + result = handle_datetimeoffset(dto_bytes) + + self.assertEqual(result.year, 2024) + self.assertEqual(result.hour, 18) + self.assertIsNotNone(result.tzinfo) + self.assertEqual(result.utcoffset(), datetime.timedelta(hours=-5)) + + def test_datetime_negative_half_hour_offset(self): + """Test conversion with a negative half-hour offset (-09:30 Marquesas).""" + dto_bytes = struct.pack("<6hI2h", 2024, 7, 1, 12, 0, 0, 0, -9, -30) + result = handle_datetimeoffset(dto_bytes) + + self.assertEqual(result.hour, 12) + self.assertIsNotNone(result.tzinfo) + expected = datetime.timedelta(hours=-9, minutes=-30) + self.assertEqual(result.utcoffset(), expected) + + def test_datetime_positive_three_quarter_offset(self): + """Test conversion with +05:45 (Nepal) offset.""" + dto_bytes = struct.pack("<6hI2h", 2024, 3, 15, 10, 30, 0, 0, 5, 45) + result = handle_datetimeoffset(dto_bytes) + + self.assertEqual(result.hour, 10) + self.assertIsNotNone(result.tzinfo) + self.assertEqual(result.utcoffset(), datetime.timedelta(hours=5, minutes=45)) + + def test_datetime_edge_case_midnight_utc(self): + """Test with edge case: midnight on Jan 1, 2000 at UTC.""" dto_bytes = struct.pack("<6hI2h", 2000, 1, 1, 0, 0, 0, 0, 0, 0) result = handle_datetimeoffset(dto_bytes) @@ -167,6 +205,7 @@ def test_datetime_edge_case(self): self.assertEqual(result.minute, 0) self.assertEqual(result.second, 0) self.assertEqual(result.microsecond, 0) + self.assertEqual(result.utcoffset(), datetime.timedelta(0)) class TestDatabaseWrapperIsDriverNotFoundError(SimpleTestCase): diff --git a/testapp/tests/test_queries.py b/testapp/tests/test_queries.py index fe2e884e..7ea92119 100644 --- a/testapp/tests/test_queries.py +++ b/testapp/tests/test_queries.py @@ -5,6 +5,7 @@ from django.db import connections, connection, models from django.db.models.functions import Now from django.test import TransactionTestCase, TestCase, skipUnlessDBFeature +from django.test.utils import override_settings from django.utils import timezone from ..models import Author, BinaryData, Editor @@ -193,3 +194,24 @@ def test_explain_raises_not_supported(self): qs = Author.objects.all() with self.assertRaises(django.db.utils.NotSupportedError): qs.explain() + + +class NowSQLTemplateTests(TestCase): + """Regression tests for #371 / PR #484: Now() should emit + SYSDATETIMEOFFSET() when USE_TZ=True, SYSDATETIME() otherwise.""" + + @override_settings(USE_TZ=True) + def test_now_uses_sysdatetimeoffset_when_use_tz(self): + qs = Author.objects.annotate(ts=Now()).filter(name="x") + compiler = qs.query.get_compiler(using="default") + sql_compiled, _ = compiler.as_sql() + self.assertIn("SYSDATETIMEOFFSET()", sql_compiled) + self.assertNotIn("SYSDATETIME()", sql_compiled) + + @override_settings(USE_TZ=False) + def test_now_uses_sysdatetime_when_no_tz(self): + qs = Author.objects.annotate(ts=Now()).filter(name="x") + compiler = qs.query.get_compiler(using="default") + sql_compiled, _ = compiler.as_sql() + self.assertIn("SYSDATETIME()", sql_compiled) + self.assertNotIn("SYSDATETIMEOFFSET()", sql_compiled)