Skip to content

feat: enable pickling for Python aggregate and window UDFs#1545

Open
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline
Open

feat: enable pickling for Python aggregate and window UDFs#1545
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline

Conversation

@timsaucer
Copy link
Copy Markdown
Member

@timsaucer timsaucer commented May 15, 2026

Which issue does this PR close?

Addresses part of #1517

This is PR 2 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.

Follow up PRs:

Rationale for this change

PR 1 closed the round-trip encoding and decoding of scalar UDFs. The same expression problem applies to Python aggregate and window UDFs: their accumulator / partition-evaluator factory is a Python callable, so a receiver that only has the UDF name cannot reconstruct one. This PR extends the inline-encoding mechanism so the natural pickle.dumps(expr) pattern also works for expressions referencing Python UDAFs and UDWFs.

What changes are included in this PR?

Codec extension is a straight parallel of the scalar path. New wire-format families:

Kind Family magic Cloudpickle tuple shape
Agg DFPYUDA (name, accumulator_factory, input_schema_bytes, return_schema_bytes, state_schema_bytes, volatility_str)
Window DFPYUDW (name, evaluator_factory, input_schema_bytes, return_schema_bytes, volatility_str)

Are there any user-facing changes?

The MultiColumnWindowUDF rename is a Rust-side breaking change, so adding api change even though no Python-facing API breaks.

Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:

  DFPYUDA  (agg)     (name, accumulator_factory, input_schema_bytes,
                      return_schema_bytes, state_schema_bytes,
                      volatility_str)
  DFPYUDW  (window)  (name, evaluator_factory, input_schema_bytes,
                      return_schema_bytes, volatility_str)

Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.

Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:

* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
  with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
  accumulator factory. Synthesizes state_{i} field names when the
  Python constructor passes only Vec<DataType>; from_parts preserves
  the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
  drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
  Py<PyAny> evaluator directly. PartialEq and Hash get the same
  pointer-identity fast path + debug-log exception handling already
  on PythonFunctionScalarUDF.

User-facing surface:

* AggregateUDF.name and WindowUDF.name properties (parallel to the
  ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.

The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
@timsaucer timsaucer force-pushed the pr2-agg-window-inline branch from 3226978 to d5bb146 Compare May 19, 2026 14:31
Re-export `to_rust_accumulator`, `to_rust_partition_evaluator`, and
`PythonFunctionWindowUDF` (with a `MultiColumnWindowUDF` alias) by
promoting `udaf` and `udwf` to `pub mod` so prior downstream Rust
consumers keep their API surface after the inline-encoding refactor.

Adds an end-to-end window UDF pickle round-trip test that runs the
decoded evaluator over a real session, mirroring the aggregate test.

Documents the cloudpickle-based shipping behavior of Python aggregate
and window UDFs in the user-guide aggregations and windows pages.

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
@timsaucer timsaucer changed the title feat: inline encoding for Python aggregate and window UDFs (2/4) feat: enable pickling for Python aggregate and window UDFs May 19, 2026
@timsaucer timsaucer marked this pull request as ready for review May 19, 2026 19:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant