Skip to content

Ft/bottlenecks selective cols#364

Merged
azmat-meesho merged 14 commits intodevelopfrom
ft/bottlenecks-selective-cols
Apr 6, 2026
Merged

Ft/bottlenecks selective cols#364
azmat-meesho merged 14 commits intodevelopfrom
ft/bottlenecks-selective-cols

Conversation

@dheerajchouhan08
Copy link
Copy Markdown
Contributor

@dheerajchouhan08 dheerajchouhan08 commented Apr 6, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.

Context:

Give a brief overview of the motivation behind this change. Include any relevant discussion links (Slack, documents, tickets, etc.) that help reviewers understand the background and the issue being addressed.

Describe your changes:

Mention the changes made in the codebase.

Testing:

Please describe how you tested the code. If manual tests were performed - please explain how. If automatic tests were added or existing ones cover the change - please explain how did you run them.

Monitoring:

Explain how this change will be tracked after deployment. Indicate whether current dashboards, alerts, and logs are enough, or if additional instrumentation is required.

Rollback plan

Explain rollback plan in case of issues.

Checklist before requesting a review

  • I have reviewed my own changes?
  • Relevant or critical functionality is covered by tests?
  • Monitoring needs have been evaluated?
  • Any necessary documentation updates have been considered?

📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

  • New Features

    • Distributed, Spark-based decoding for scalable MPLog processing.
    • Optional column filtering to restrict decoded output to requested features.
    • CLI enhanced to run with a Spark session and to stream/write Spark-formatted output.
  • Breaking Changes

    • Public decoding APIs now operate on and return Spark DataFrames and require a Spark session.
  • Documentation

    • README expanded into a full SDK guide with updated API examples.
  • Chores

    • Version bumped to 0.3.1.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 6, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7183460e-c589-419e-a22b-409c78612d93

📥 Commits

Reviewing files that changed from the base of the PR and between 8546926 and bc810b6.

📒 Files selected for processing (2)
  • .pre-commit-config.yaml
  • py-sdk/inference_logging_client/pyproject.toml
✅ Files skipped from review due to trivial changes (1)
  • .pre-commit-config.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • py-sdk/inference_logging_client/pyproject.toml

Walkthrough

The inference logging client's public API and CLI were migrated from pandas-based to Spark-based processing: decode_mplog and decode_mplog_dataframe now require a SparkSession and return Spark DataFrames. Decoders gained needed_columns filtering. CLI, utilities, formats, and packaging were updated; version bumped to 0.3.1.

Changes

Cohort / File(s) Summary
Core Decoding API
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Refactored decode_mplog() and decode_mplog_dataframe() to accept spark: "SparkSession", return Spark DataFrames, add needed_columns: Optional[Collection[str]]. Internals now build Spark rows, use spark.createDataFrame() and mapInPandas(); added _extract_metadata_byte() and exported format_dataframe_floats.
CLI Implementation
py-sdk/inference_logging_client/inference_logging_client/cli.py
Creates and configures a SparkSession (new --spark-master), passes spark into decode_mplog(), and switches output handling to Spark DataFrame writes or streamed JSON from part files. Adjusted summary/display logic to Spark APIs and ensures spark.stop() in finally.
Format Decoders
py-sdk/inference_logging_client/inference_logging_client/formats.py
Added needed_columns filtering to decode_proto_format, decode_arrow_format, decode_parquet_format and to per-entity decoders; introduced decode_arrow_features() and decode_parquet_features() helpers. Proto decoder updated to skip non-needed features using type-size logic.
Utilities
py-sdk/inference_logging_client/inference_logging_client/utils.py
Rewrote format_dataframe_floats() from pandas-based implementation to Spark-based implementation using pyspark.sql.functions.round() on FloatType/DoubleType schema fields. Minor reformatting of scalar/type maps.
I/O and Schema Fetching
py-sdk/inference_logging_client/inference_logging_client/io.py
Minor formatting and literal normalization; get_feature_schema() signature layout unchanged but code style normalized; zstd magic-number formatting normalized.
Decoder Styling
py-sdk/inference_logging_client/inference_logging_client/decoder.py
Formatting and expression normalization only (quote styles, spacing, slice formatting, hex casing). No functional changes.
Exceptions & Types
py-sdk/inference_logging_client/inference_logging_client/exceptions.py, py-sdk/inference_logging_client/inference_logging_client/types.py
Inserted blank lines in docstrings and minor spacing normalization. No API or behavior changes.
Packaging & Dependencies
py-sdk/inference_logging_client/pyproject.toml
Version bumped 0.1.00.3.1. Dependency changes: removed pandas, added pyspark==3.3.0.
Documentation
py-sdk/inference_logging_client/readme.md
Completely expanded README into full SDK guide. Documents new Spark-based function signatures, new/updated public functions and types, CLI usage, and development instructions.
Pre-commit
.pre-commit-config.yaml
Added a local post-commit hook entry to run post-commit-scripts/runner.sh at the post-commit stage.

Sequence Diagram(s)

sequenceDiagram
rect rgba(200,200,255,0.5)
participant CLI
participant Spark
participant Decoder
participant SchemaService
participant Output
end

CLI->>Spark: create SparkSession (--spark-master)
CLI->>Decoder: call decode_mplog(logs, spark, needed_columns)
Decoder->>SchemaService: get_feature_schema(mp_config_id, version)
SchemaService-->>Decoder: return schema
Decoder->>Spark: spark.createDataFrame(rows) / mapInPandas(_decode_batch)
Spark-->>Decoder: partitioned decoded rows
Decoder-->>CLI: return Spark DataFrame
CLI->>Output: write CSV/JSON or stream JSON parts / df.show()
Output-->>CLI: completion
Loading
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No application-dyn-*.yml files exist in the repository and no changes to such files are present in this PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

azmat-meesho
azmat-meesho previously approved these changes Apr 6, 2026
Copy link
Copy Markdown
Contributor

@azmat-meesho azmat-meesho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

525-526: Default 10,000 partitions may be excessive for small datasets.

Repartitioning to 10,000 partitions is appropriate for large datasets (millions of rows with 3-5MB each as noted), but creates significant overhead for small datasets. Consider adding a heuristic or documenting when to override via num_partitions parameter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 525 - 526, The default hard-coded n_partitions = 10000 can cause overhead
for small DataFrames; update the logic in the block that computes n_partitions
(referencing num_partitions, n_partitions, df_repart, and df.repartition) to use
a simple heuristic: if num_partitions is provided use it, otherwise compute
partitions based on data size or existing partitions (e.g., base on
df.rdd.getNumPartitions() or df.count() and a target rows-per-partition /
bytes-per-partition threshold) and cap it between a sensible minimum and maximum
(e.g., at least 1 and at most 10k) before calling df.repartition(n_partitions),
or document clearly in the function docstring that callers should pass
num_partitions for small datasets.
py-sdk/inference_logging_client/inference_logging_client/cli.py (1)

106-116: Missing --columns CLI argument for needed_columns feature.

The decode_mplog function accepts a needed_columns parameter (as shown in the API), but this feature is not exposed in the CLI. Users cannot filter columns from the command line.

♻️ Proposed addition to expose column filtering

Add argument after line 62:

parser.add_argument(
    "--columns",
    "-c",
    nargs="+",
    help="Only decode these feature columns (space-separated list)",
)

Then pass to decode_mplog:

 df = decode_mplog(
     log_data=data,
     model_proxy_id=args.model_proxy_id,
     version=args.version,
     spark=spark,
     format_type=format_type,
     inference_host=inference_host,
     decompress=not args.no_decompress,
+    needed_columns=args.columns,
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py` around lines
106 - 116, Add a new CLI flag to capture user-specified columns and forward it
into decode_mplog as the needed_columns parameter: add a
parser.add_argument("--columns", "-c", nargs="+", help="Only decode these
feature columns (space-separated list)") to the argument parsing block (near
where parser is constructed and other args like model_proxy_id/version are
defined), and then pass args.columns to decode_mplog via the needed_columns=...
argument in the try block where decode_mplog(...) is called so the CLI can
filter decoded columns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 122-129: The current CLI write path forces a single partition by
calling df.coalesce(1) which can cause OOM for large datasets; change the write
logic in the args.output block (where df.coalesce(1).write... is used) to either
use df.repartition(1).write... instead of coalesce(1) or, better, avoid forcing
a single partition entirely and write multi-part output by removing the
coalesce/repartition; also add a brief warning/log message when the user
requests a single output file so they are informed of the memory risk and
document the limitation in the CLI help text.
- Around line 171-176: The comment says to avoid full scans but the code still
calls df.count(); change the implementation to perform a lazy emptiness check by
replacing df.count() with df.limit(1).count() (use df.limit(1).count() to detect
empty vs non-empty without a full scan) and update the printed message to
reflect the limited check (e.g., print "Rows: 0" if count==0 else "Rows: (>=1 -
full count skipped)" to stderr), or if you prefer to remove ambiguity simply
delete the misleading comment; locate the logic around df.count() and the stderr
print in the CLI function and apply one of these two fixes.

In `@py-sdk/inference_logging_client/inference_logging_client/formats.py`:
- Around line 213-230: The skip branch currently uses
get_scalar_size(feature.feature_type) or 4 which silently skips 4 bytes for
unknown scalar types and can corrupt subsequent decoding; change the logic in
the skip handling (the block using is_sized_type, get_scalar_size, and reader)
to explicitly check the result of get_scalar_size(feature.feature_type) and, if
it is None (unknown type), emit a warning (or use the module logger) referencing
feature.name and feature.feature_type and then break out of the loop instead of
skipping a fixed 4 bytes; keep the existing sized-type branch unchanged and
ensure reader.read is only called when a valid size is present.

In `@py-sdk/inference_logging_client/inference_logging_client/io.py`:
- Around line 48-54: The User-Agent header is hardcoded to
"inference-logging-client/0.2.0" in the Request creation (see the req =
urllib.request.Request(...) block); change it to use the package __version__
variable so the header stays in sync with the package release (e.g., construct
the header as "inference-logging-client/{__version__}"). Ensure you import or
reference the module-level __version__ (from inference_logging_client import
__version__ or similar) and replace the hardcoded string in the headers dict
used by urllib.request.Request.

In `@py-sdk/inference_logging_client/readme.md`:
- Around line 304-327: The docs for decode_mplog_dataframe are missing the
needed_columns parameter; update the parameter table and function signature
documentation to include needed_columns (type: Optional[List[str]] or similar),
mark it optional, state its default (e.g., None) and describe that it specifies
which columns to decode/return (mirroring decode_mplog behavior), and ensure any
referenced names (decode_mplog_dataframe, needed_columns, features_column,
metadata_column, mp_config_id_column) are consistent with the implementation.
- Around line 196-220: The README for decode_mplog is missing the needed_columns
parameter; update the documented function signature for decode_mplog to include
needed_columns: Optional[Collection[str]] = None and add an entry in the
Parameters table for needed_columns with type Collection[str], Required No,
Default None, and description "Only decode these feature columns (reduces
memory/output)"; ensure the signature and table match the actual implementation
(refer to decode_mplog) and keep wording consistent with existing parameter
descriptions.

---

Nitpick comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 525-526: The default hard-coded n_partitions = 10000 can cause
overhead for small DataFrames; update the logic in the block that computes
n_partitions (referencing num_partitions, n_partitions, df_repart, and
df.repartition) to use a simple heuristic: if num_partitions is provided use it,
otherwise compute partitions based on data size or existing partitions (e.g.,
base on df.rdd.getNumPartitions() or df.count() and a target rows-per-partition
/ bytes-per-partition threshold) and cap it between a sensible minimum and
maximum (e.g., at least 1 and at most 10k) before calling
df.repartition(n_partitions), or document clearly in the function docstring that
callers should pass num_partitions for small datasets.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 106-116: Add a new CLI flag to capture user-specified columns and
forward it into decode_mplog as the needed_columns parameter: add a
parser.add_argument("--columns", "-c", nargs="+", help="Only decode these
feature columns (space-separated list)") to the argument parsing block (near
where parser is constructed and other args like model_proxy_id/version are
defined), and then pass args.columns to decode_mplog via the needed_columns=...
argument in the try block where decode_mplog(...) is called so the CLI can
filter decoded columns.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f800e1f1-e587-4d0c-bbe9-23eb002d7000

📥 Commits

Reviewing files that changed from the base of the PR and between aafb59f and 8546926.

📒 Files selected for processing (10)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/inference_logging_client/cli.py
  • py-sdk/inference_logging_client/inference_logging_client/decoder.py
  • py-sdk/inference_logging_client/inference_logging_client/exceptions.py
  • py-sdk/inference_logging_client/inference_logging_client/formats.py
  • py-sdk/inference_logging_client/inference_logging_client/io.py
  • py-sdk/inference_logging_client/inference_logging_client/types.py
  • py-sdk/inference_logging_client/inference_logging_client/utils.py
  • py-sdk/inference_logging_client/pyproject.toml
  • py-sdk/inference_logging_client/readme.md

@azmat-meesho azmat-meesho merged commit 6bdd525 into develop Apr 6, 2026
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants