diff --git a/cloudinary/__init__.py b/cloudinary/__init__.py index 55eb1466..6a7c0f4a 100644 --- a/cloudinary/__init__.py +++ b/cloudinary/__init__.py @@ -183,6 +183,8 @@ def __init__(self): if not self.signature_algorithm: self.signature_algorithm = utils.SIGNATURE_SHA1 + if not self.signature_version: + self.signature_version = 2 def _config_from_parsed_url(self, parsed_url): if not self._is_url_scheme_valid(parsed_url): @@ -280,7 +282,7 @@ def __len__(self): return len(self.public_id) if self.public_id is not None else 0 def validate(self): - return self.signature == self.get_expected_signature() + return utils.verify_api_response_signature(self.public_id, self.version, self.signature) def get_prep_value(self): if None in [self.public_id, @@ -301,7 +303,7 @@ def get_presigned(self): def get_expected_signature(self): return utils.api_sign_request({"public_id": self.public_id, "version": self.version}, config().api_secret, - config().signature_algorithm) + config().signature_algorithm, signature_version=1) @property def url(self): diff --git a/cloudinary/utils.py b/cloudinary/utils.py index a6f8d933..dd895370 100644 --- a/cloudinary/utils.py +++ b/cloudinary/utils.py @@ -619,24 +619,72 @@ def sign_request(params, options): if not api_secret: raise ValueError("Must supply api_secret") signature_algorithm = options.get("signature_algorithm", cloudinary.config().signature_algorithm) + signature_version = options.get("signature_version", cloudinary.config().signature_version) params = cleanup_params(params) - params["signature"] = api_sign_request(params, api_secret, signature_algorithm) + params["signature"] = api_sign_request(params, api_secret, signature_algorithm, signature_version) params["api_key"] = api_key return params -def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1): - params = [(k + "=" + ( - ",".join(v) if isinstance(v, list) else - str(v).lower() if isinstance(v, bool) else - str(v) - )) for k, v in params_to_sign.items() if v] - to_sign = "&".join(sorted(params)) +def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1, signature_version=2): + """ + Signs API request parameters using the specified algorithm and signature version. + + :param params_to_sign: Parameters to include in the signature + :param api_secret: API secret key + :param algorithm: Signature algorithm (default: SHA1) + :param signature_version: Signature version (default: 2) + - Version 1: Original behavior without parameter encoding + - Version 2+: Includes parameter encoding to prevent parameter smuggling + :return: Computed signature + """ + to_sign = api_string_to_sign(params_to_sign, signature_version) return compute_hex_hash(to_sign + api_secret, algorithm) +def api_string_to_sign(params_to_sign, signature_version=2): + """ + Generates a string to be signed for API requests. + + :param params_to_sign: Parameters to include in the signature + :param signature_version: Version of signature algorithm to use: + - Version 1: Original behavior without parameter encoding + - Version 2+ (default): Includes parameter encoding to prevent parameter smuggling + :return: String to be signed + """ + params = [] + for k, v in params_to_sign.items(): + if v: + if isinstance(v, list): + value = ",".join(v) + elif isinstance(v, bool): + value = str(v).lower() + else: + value = str(v) + + param_string = k + "=" + value + if signature_version >= 2: + param_string = _encode_param(param_string) + params.append(param_string) + + return "&".join(sorted(params)) + + +def _encode_param(value): + """ + Encodes a parameter for safe inclusion in URL query strings. + + Specifically replaces "&" characters with their percent-encoded equivalent "%26" + to prevent them from being interpreted as parameter separators in URL query strings. + + :param value: The parameter to encode + :return: Encoded parameter + """ + return str(value).replace("&", "%26") + + def breakpoint_settings_mapper(breakpoint_settings): breakpoint_settings = copy.deepcopy(breakpoint_settings) transformation = breakpoint_settings.get("transformation") @@ -1566,7 +1614,7 @@ def verify_api_response_signature(public_id, version, signature, algorithm=None) :param version: The version of the asset as returned in the API response :param signature: Actual signature. Can be retrieved from the X-Cld-Signature header :param algorithm: Name of hashing algorithm to use for calculation of HMACs. - By default uses `cloudinary.config().signature_algorithm` + By default, uses `cloudinary.config().signature_algorithm` :return: Boolean result of the validation """ @@ -1576,10 +1624,12 @@ def verify_api_response_signature(public_id, version, signature, algorithm=None) parameters_to_sign = {'public_id': public_id, 'version': version} + # Use signature version 1 for backward compatibility return signature == api_sign_request( parameters_to_sign, cloudinary.config().api_secret, - algorithm or cloudinary.config().signature_algorithm + algorithm or cloudinary.config().signature_algorithm, + signature_version=1 ) @@ -1592,7 +1642,7 @@ def verify_notification_signature(body, timestamp, signature, valid_for=7200, al :param signature: Actual signature. Can be retrieved from the X-Cld-Signature header :param valid_for: The desired time in seconds for considering the request valid :param algorithm: Name of hashing algorithm to use for calculation of HMACs. - By default uses `cloudinary.config().signature_algorithm` + By default, uses `cloudinary.config().signature_algorithm` :return: Boolean result of the validation """ diff --git a/test/test_utils.py b/test/test_utils.py index 0fd596bb..57f19e1c 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -55,6 +55,8 @@ MOCKED_NOW = 1549533574 API_SECRET = 'X7qLTrsES31MzxxkxPPA-pAGGfU' +API_SIGN_REQUEST_TEST_SECRET = "hdcixPpR2iKERPwqvH6sHdK9cyac" +API_SIGN_REQUEST_CLOUD_NAME = "dn6ot3ged" class TestUtils(unittest.TestCase): @@ -76,7 +78,8 @@ def setUp(self): cname=None, # for these tests without actual upload, we ignore cname api_key="a", api_secret="b", secure_distribution=None, - private_cdn=False) + private_cdn=False, + signature_version=2) def __test_cloudinary_url(self, public_id=TEST_ID, options=None, expected_url=None, expected_options=None): if expected_options is None: @@ -1443,17 +1446,151 @@ def test_support_long_url_signature(self): expected_url=DEFAULT_UPLOAD_PATH + long_signature + "/" + image_name) def test_api_sign_request_sha1(self): - params = dict(cloud_name="dn6ot3ged", timestamp=1568810420, username="user@cloudinary.com") - signature = api_sign_request(params, "hdcixPpR2iKERPwqvH6sHdK9cyac") + params = dict(cloud_name=API_SIGN_REQUEST_CLOUD_NAME, timestamp=1568810420, username="user@cloudinary.com") + signature = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET) expected = "14c00ba6d0dfdedbc86b316847d95b9e6cd46d94" self.assertEqual(expected, signature) def test_api_sign_request_sha256(self): - params = dict(cloud_name="dn6ot3ged", timestamp=1568810420, username="user@cloudinary.com") - signature = api_sign_request(params, "hdcixPpR2iKERPwqvH6sHdK9cyac", cloudinary.utils.SIGNATURE_SHA256) + params = dict(cloud_name=API_SIGN_REQUEST_CLOUD_NAME, timestamp=1568810420, username="user@cloudinary.com") + signature = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA256) expected = "45ddaa4fa01f0c2826f32f669d2e4514faf275fe6df053f1a150e7beae58a3bd" self.assertEqual(expected, signature) + def test_api_sign_request_prevents_parameter_smuggling(self): + """Should prevent parameter smuggling via & characters in parameter values""" + # Test with notification_url containing & characters + params_with_ampersand = { + "cloud_name": API_SIGN_REQUEST_CLOUD_NAME, + "timestamp": 1568810420, + "notification_url": "https://fake.com/callback?a=1&tags=hello,world" + } + + signature_with_ampersand = api_sign_request(params_with_ampersand, API_SIGN_REQUEST_TEST_SECRET) + + # Test that attempting to smuggle parameters by splitting the notification_url fails + params_smuggled = { + "cloud_name": API_SIGN_REQUEST_CLOUD_NAME, + "timestamp": 1568810420, + "notification_url": "https://fake.com/callback?a=1", + "tags": "hello,world" # This would be smuggled if & encoding didn't work + } + + signature_smuggled = api_sign_request(params_smuggled, API_SIGN_REQUEST_TEST_SECRET) + + # The signatures should be different, proving that parameter smuggling is prevented + self.assertNotEqual(signature_with_ampersand, signature_smuggled, + "Signatures should be different to prevent parameter smuggling") + + # Verify the expected signature for the properly encoded case + expected_signature = "4fdf465dd89451cc1ed8ec5b3e314e8a51695704" + self.assertEqual(expected_signature, signature_with_ampersand) + + # Verify the expected signature for the smuggled parameters case + expected_smuggled_signature = "7b4e3a539ff1fa6e6700c41b3a2ee77586a025f9" + self.assertEqual(expected_smuggled_signature, signature_smuggled) + + def test_api_sign_request_signature_versions(self): + """Should use signature version 1 (without parameter encoding) for backward compatibility""" + public_id_with_ampersand = 'tests/logo&version=2' + test_version = 1 + + expected_signature_v1 = api_sign_request( + {'public_id': public_id_with_ampersand, 'version': test_version}, + API_SIGN_REQUEST_TEST_SECRET, + cloudinary.utils.SIGNATURE_SHA1, + signature_version=1 + ) + + expected_signature_v2 = api_sign_request( + {'public_id': public_id_with_ampersand, 'version': test_version}, + API_SIGN_REQUEST_TEST_SECRET, + cloudinary.utils.SIGNATURE_SHA1, + signature_version=2 + ) + + self.assertNotEqual(expected_signature_v1, expected_signature_v2) + + # verify_api_response_signature should use version 1 for backward compatibility + with patch('cloudinary.config', return_value=cloudinary.config(api_secret=API_SIGN_REQUEST_TEST_SECRET)): + self.assertTrue( + verify_api_response_signature( + public_id_with_ampersand, + test_version, + expected_signature_v1 + ) + ) + + self.assertFalse( + verify_api_response_signature( + public_id_with_ampersand, + test_version, + expected_signature_v2 + ) + ) + + def test_signature_version_config_support(self): + """Should use signature_version from config and produce different signatures for v1 vs v2""" + # Use params with & characters to show the encoding difference between versions + params = {'public_id': 'test&image', 'notification_url': 'https://example.com/callback?param=value&other=data'} + + # Test with config signature_version = 1 + cloudinary.config().signature_version = 1 + + # Test sign_request function uses config values + options_with_config = {'api_key': 'test_key', 'api_secret': API_SIGN_REQUEST_TEST_SECRET} + signed_params_config_v1 = cloudinary.utils.sign_request(params.copy(), options_with_config) + + # Test explicit signature version + options_explicit_v1 = options_with_config.copy() + options_explicit_v1['signature_version'] = 1 + signed_params_explicit_v1 = cloudinary.utils.sign_request(params.copy(), options_explicit_v1) + + self.assertEqual(signed_params_config_v1['signature'], signed_params_explicit_v1['signature']) + + # Test with config signature_version = 2 + cloudinary.config().signature_version = 2 + + signed_params_config_v2 = cloudinary.utils.sign_request(params.copy(), options_with_config) + + options_explicit_v2 = options_with_config.copy() + options_explicit_v2['signature_version'] = 2 + signed_params_explicit_v2 = cloudinary.utils.sign_request(params.copy(), options_explicit_v2) + + self.assertEqual(signed_params_config_v2['signature'], signed_params_explicit_v2['signature']) + + # Verify that v1 and v2 actually produce different signatures due to parameter encoding + self.assertNotEqual(signed_params_config_v1['signature'], signed_params_config_v2['signature'], + "Signature v1 and v2 should be different for parameters with & characters") + + def test_sign_request_with_signature_version(self): + """Should support signature_version parameter in sign_request function""" + params = {'public_id': 'test_image', 'version': 1234} + options = {'api_key': 'test_key', 'api_secret': API_SIGN_REQUEST_TEST_SECRET} + + # Test with signature_version in options + options_v1 = options.copy() + options_v1['signature_version'] = 1 + signed_params_v1 = cloudinary.utils.sign_request(params.copy(), options_v1) + + options_v2 = options.copy() + options_v2['signature_version'] = 2 + signed_params_v2 = cloudinary.utils.sign_request(params.copy(), options_v2) + + # The signatures should be different for different versions (for params with & characters) + # For these simple params without & they might be the same, but let's test the structure + self.assertIn('signature', signed_params_v1) + self.assertIn('signature', signed_params_v2) + self.assertIn('api_key', signed_params_v1) + self.assertIn('api_key', signed_params_v2) + + # Test that signature_version is passed through correctly + expected_sig_v1 = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA1, 1) + expected_sig_v2 = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA1, 2) + + self.assertEqual(signed_params_v1['signature'], expected_sig_v1) + self.assertEqual(signed_params_v2['signature'], expected_sig_v2) + if __name__ == '__main__': unittest.main()