Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 72 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,36 +107,80 @@ from pymongosql import connect

with connect(host="mongodb://localhost:27017/database") as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT COUNT(*) as total FROM users')
result = cursor.fetchone()
print(f"Total users: {result[0]}")
```

### Using DictCursor for Dictionary Results

```python
from pymongosql import connect
from pymongosql.cursor import DictCursor

with connect(host="mongodb://localhost:27017/database") as conn:
with conn.cursor(DictCursor) as cursor:
cursor.execute('SELECT COUNT(*) as total FROM users')
result = cursor.fetchone()
print(f"Total users: {result['total']}")
```

### Cursor vs DictCursor

PyMongoSQL provides two cursor types for different result formats:

**Cursor** (default) - Returns results as tuples:
```python
cursor = connection.cursor()
cursor.execute('SELECT name, email FROM users')
row = cursor.fetchone()
print(row[0]) # Access by index
```

**DictCursor** - Returns results as dict:
```python
from pymongosql.cursor import DictCursor

cursor = connection.cursor(DictCursor)
cursor.execute('SELECT name, email FROM users')
row = cursor.fetchone()
print(row['name']) # Access by column name
```

### Query with Parameters

PyMongoSQL supports two styles of parameterized queries for safe value substitution:

**Positional Parameters with ?**

```python
from pymongosql import connect

connection = connect(host="mongodb://localhost:27017/database")
cursor = connection.cursor()

# Parameterized queries for security
min_age = 18
status = 'active'

cursor.execute('''
SELECT name, email, created_at
FROM users
WHERE age >= ? AND status = ?
''', [min_age, status])

users = cursor.fetchmany(5) # Fetch first 5 results
while users:
for user in users:
print(f"User: {user['name']} ({user['email']})")
users = cursor.fetchmany(5) # Fetch next 5
cursor.execute(
'SELECT name, email FROM users WHERE age > ? AND status = ?',
[25, 'active']
)
```

**Named Parameters with :name**

```python
from pymongosql import connect

connection = connect(host="mongodb://localhost:27017/database")
cursor = connection.cursor()

cursor.execute(
'SELECT name, email FROM users WHERE age > :age AND status = :status',
{'age': 25, 'status': 'active'}
)
```

Parameters are substituted into the MongoDB filter during execution, providing protection against injection attacks.

## Supported SQL Features

### SELECT Statements
Expand Down Expand Up @@ -166,19 +210,6 @@ while users:
- LIMIT: `LIMIT 10`
- Combined: `ORDER BY created_at DESC LIMIT 5`

## Limitations & Roadmap

**Note**: Currently PyMongoSQL focuses on Data Query Language (DQL) operations. The following SQL features are **not yet supported** but are planned for future releases:

- **DML Operations** (Data Manipulation Language)
- `INSERT`, `UPDATE`, `DELETE`
- **DDL Operations** (Data Definition Language)
- `CREATE TABLE/COLLECTION`, `DROP TABLE/COLLECTION`
- `CREATE INDEX`, `DROP INDEX`
- `LIST TABLES/COLLECTIONS`

These features are on our development roadmap and contributions are welcome!

## Apache Superset Integration

PyMongoSQL can be used as a database driver in Apache Superset for querying and visualizing MongoDB data:
Expand All @@ -200,6 +231,19 @@ PyMongoSQL can be used as a database driver in Apache Superset for querying and

This allows seamless integration between MongoDB data and Superset's BI capabilities without requiring data migration to traditional SQL databases.

<h2 style="color: red;">Limitations & Roadmap</h2>

**Note**: Currently PyMongoSQL focuses on Data Query Language (DQL) operations. The following SQL features are **not yet supported** but are planned for future releases:

- **DML Operations** (Data Manipulation Language)
- `INSERT`, `UPDATE`, `DELETE`
- **DDL Operations** (Data Definition Language)
- `CREATE TABLE/COLLECTION`, `DROP TABLE/COLLECTION`
- `CREATE INDEX`, `DROP INDEX`
- `LIST TABLES/COLLECTIONS`

These features are on our development roadmap and contributions are welcome!

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Expand Down
2 changes: 1 addition & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.2.4"
__version__: str = "0.2.5"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down
6 changes: 3 additions & 3 deletions pymongosql/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import logging
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, Union

from .error import ProgrammingError

Expand Down Expand Up @@ -38,12 +38,12 @@ def description(
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
):
raise NotImplementedError # pragma: no cover

@abstractmethod
def executemany(self, operation: str, seq_of_parameters: List[Optional[Dict[str, Any]]]) -> None:
def executemany(self, operation: str, seq_of_parameters: List[Union[Sequence[Any], Dict[str, Any]]]) -> None:
raise NotImplementedError # pragma: no cover

@abstractmethod
Expand Down
11 changes: 5 additions & 6 deletions pymongosql/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,20 @@ def _check_closed(self) -> None:
if self._is_closed:
raise ProgrammingError("Cursor is closed")

def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = None) -> _T:
def execute(self: _T, operation: str, parameters: Optional[Any] = None) -> _T:
"""Execute a SQL statement

Args:
operation: SQL statement to execute
parameters: Parameters for the SQL statement (not yet implemented)
parameters: Parameters to substitute placeholders in the SQL
- Sequence for positional parameters with ? placeholders
- Dict for named parameters with :name placeholders

Returns:
Self for method chaining
"""
self._check_closed()

if parameters:
_logger.warning("Parameter substitution not yet implemented, ignoring parameters")

try:
# Create execution context
context = ExecutionContext(operation, self.mode)
Expand All @@ -98,7 +97,7 @@ def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = Non
strategy = ExecutionPlanFactory.get_strategy(context)

# Execute using selected strategy (Standard or Subquery)
result = strategy.execute(context, self.connection)
result = strategy.execute(context, self.connection, parameters)

# Store execution plan for reference
self._current_execution_plan = strategy.execution_plan
Expand Down
71 changes: 64 additions & 7 deletions pymongosql/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Sequence, Union

from pymongo.errors import PyMongoError

Expand All @@ -19,6 +19,7 @@ class ExecutionContext:

query: str
execution_mode: str = "standard"
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None

def __repr__(self) -> str:
return f"ExecutionContext(mode={self.execution_mode}, " f"query={self.query})"
Expand All @@ -38,13 +39,15 @@ def execute(
self,
context: ExecutionContext,
connection: Any,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
) -> Optional[Dict[str, Any]]:
"""
Execute query and return result set.

Args:
context: ExecutionContext with query and subquery info
connection: MongoDB connection
parameters: Sequence for positional (?) or Dict for named (:param) parameters

Returns:
command_result with query results
Expand Down Expand Up @@ -86,19 +89,59 @@ def _parse_sql(self, sql: str) -> ExecutionPlan:
_logger.error(f"SQL parsing failed: {e}")
raise SqlSyntaxError(f"Failed to parse SQL: {e}")

def _execute_execution_plan(self, execution_plan: ExecutionPlan, db: Any) -> Optional[Dict[str, Any]]:
def _replace_placeholders(self, obj: Any, parameters: Sequence[Any]) -> Any:
"""Recursively replace ? placeholders with parameter values in filter/projection dicts"""
param_index = [0] # Use list to allow modification in nested function

def replace_recursive(value: Any) -> Any:
if isinstance(value, str):
# Replace ? with the next parameter value
if value == "?":
if param_index[0] < len(parameters):
result = parameters[param_index[0]]
param_index[0] += 1
return result
else:
raise ProgrammingError(
f"Not enough parameters provided: expected at least {param_index[0] + 1}"
)
return value
elif isinstance(value, dict):
return {k: replace_recursive(v) for k, v in value.items()}
elif isinstance(value, list):
return [replace_recursive(item) for item in value]
else:
return value

return replace_recursive(obj)

def _execute_execution_plan(
self,
execution_plan: ExecutionPlan,
db: Any,
parameters: Optional[Sequence[Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Execute an ExecutionPlan against MongoDB using db.command"""
try:
# Get database
if not execution_plan.collection:
raise ProgrammingError("No collection specified in query")

# Replace placeholders with parameters in filter_stage only (not in projection)
filter_stage = execution_plan.filter_stage or {}

if parameters:
# Positional parameters with ? (named parameters are converted to positional in execute())
filter_stage = self._replace_placeholders(filter_stage, parameters)

projection_stage = execution_plan.projection_stage or {}

# Build MongoDB find command
find_command = {"find": execution_plan.collection, "filter": execution_plan.filter_stage or {}}
find_command = {"find": execution_plan.collection, "filter": filter_stage}

# Apply projection if specified
if execution_plan.projection_stage:
find_command["projection"] = execution_plan.projection_stage
if projection_stage:
find_command["projection"] = projection_stage

# Apply sort if specified
if execution_plan.sort_stage:
Expand Down Expand Up @@ -135,14 +178,28 @@ def execute(
self,
context: ExecutionContext,
connection: Any,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
) -> Optional[Dict[str, Any]]:
"""Execute standard query directly against MongoDB"""
_logger.debug(f"Using standard execution for query: {context.query[:100]}")

# Preprocess query to convert named parameters to positional
processed_query = context.query
processed_params = parameters
if isinstance(parameters, dict):
# Convert :param_name to ? for parsing
import re

param_names = re.findall(r":(\w+)", context.query)
# Convert dict parameters to list in order of appearance
processed_params = [parameters[name] for name in param_names]
# Replace :param_name with ?
processed_query = re.sub(r":(\w+)", "?", context.query)

# Parse the query
self._execution_plan = self._parse_sql(context.query)
self._execution_plan = self._parse_sql(processed_query)

return self._execute_execution_plan(self._execution_plan, connection.database)
return self._execute_execution_plan(self._execution_plan, connection.database, processed_params)


class ExecutionPlanFactory:
Expand Down
3 changes: 2 additions & 1 deletion pymongosql/superset_mongodb/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def execute(
self,
context: ExecutionContext,
connection: Any,
parameters: Optional[Any] = None,
) -> Optional[Dict[str, Any]]:
"""Execute query in two stages: MongoDB for subquery, intermediate DB for outer query"""
_logger.debug(f"Using subquery execution for query: {context.query[:100]}")
Expand All @@ -54,7 +55,7 @@ def execute(
# If no subquery detected, fall back to standard execution
if not query_info.has_subquery:
_logger.debug("No subquery detected, falling back to standard execution")
return super().execute(context, connection)
return super().execute(context, connection, parameters)

# Stage 1: Execute MongoDB subquery
mongo_query = query_info.subquery_text
Expand Down
2 changes: 1 addition & 1 deletion tests/run_test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def main():
setup_test_data()
print(f"\n[SUCCESS] MongoDB {version} test instance is ready!")
print(
f"Connection: mongodb://{TEST_USERNAME}:{TEST_PASSWORD}@{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DATABASE}?authSource={TEST_AUTH_SOURCE}" # noqa: E501
f"Connection: mongodb://{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DATABASE}?authSource={TEST_AUTH_SOURCE}" # noqa: E501
)
else:
print("[ERROR] Failed to create database user")
Expand Down
26 changes: 26 additions & 0 deletions tests/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,29 @@ def test_execute_with_nested_field_alias(self, conn):
rows = cursor.result_set.fetchall()
assert len(rows) == 3
assert len(rows[0]) == 2 # Should have 2 columns

def test_execute_with_positional_parameters(self, conn):
"""Test executing SELECT with positional parameters (?)"""
sql = "SELECT name, email FROM users WHERE age > ? AND active = ?"
cursor = conn.cursor()
result = cursor.execute(sql, [25, True])

assert result == cursor # execute returns self
assert isinstance(cursor.result_set, ResultSet)

rows = cursor.result_set.fetchall()
assert len(rows) > 0 # Should have results matching the filter
assert len(rows[0]) == 2 # Should have name and email columns

def test_execute_with_named_parameters(self, conn):
"""Test executing SELECT with named parameters (:name)"""
sql = "SELECT name, email FROM users WHERE age > :min_age AND active = :is_active"
cursor = conn.cursor()
result = cursor.execute(sql, {"min_age": 25, "is_active": True})

assert result == cursor # execute returns self
assert isinstance(cursor.result_set, ResultSet)

rows = cursor.result_set.fetchall()
assert len(rows) > 0 # Should have results matching the filter
assert len(rows[0]) == 2 # Should have name and email columns
Loading