feat: enable pickling for Python aggregate and window UDFs#1545
Open
timsaucer wants to merge 2 commits into
Open
feat: enable pickling for Python aggregate and window UDFs#1545timsaucer wants to merge 2 commits into
timsaucer wants to merge 2 commits into
Conversation
This was referenced May 15, 2026
d0baeb6 to
3226978
Compare
Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:
DFPYUDA (agg) (name, accumulator_factory, input_schema_bytes,
return_schema_bytes, state_schema_bytes,
volatility_str)
DFPYUDW (window) (name, evaluator_factory, input_schema_bytes,
return_schema_bytes, volatility_str)
Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.
Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:
* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
accumulator factory. Synthesizes state_{i} field names when the
Python constructor passes only Vec<DataType>; from_parts preserves
the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
Py<PyAny> evaluator directly. PartialEq and Hash get the same
pointer-identity fast path + debug-log exception handling already
on PythonFunctionScalarUDF.
User-facing surface:
* AggregateUDF.name and WindowUDF.name properties (parallel to the
ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.
The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
3226978 to
d5bb146
Compare
Re-export `to_rust_accumulator`, `to_rust_partition_evaluator`, and `PythonFunctionWindowUDF` (with a `MultiColumnWindowUDF` alias) by promoting `udaf` and `udwf` to `pub mod` so prior downstream Rust consumers keep their API surface after the inline-encoding refactor. Adds an end-to-end window UDF pickle round-trip test that runs the decoded evaluator over a real session, mirroring the aggregate test. Documents the cloudpickle-based shipping behavior of Python aggregate and window UDFs in the user-guide aggregations and windows pages. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Addresses part of #1517
This is PR 2 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PRs:
Rationale for this change
PR 1 closed the round-trip encoding and decoding of scalar UDFs. The same expression problem applies to Python aggregate and window UDFs: their accumulator / partition-evaluator factory is a Python callable, so a receiver that only has the UDF name cannot reconstruct one. This PR extends the inline-encoding mechanism so the natural
pickle.dumps(expr)pattern also works for expressions referencing Python UDAFs and UDWFs.What changes are included in this PR?
Codec extension is a straight parallel of the scalar path. New wire-format families:
DFPYUDA(name, accumulator_factory, input_schema_bytes, return_schema_bytes, state_schema_bytes, volatility_str)DFPYUDW(name, evaluator_factory, input_schema_bytes, return_schema_bytes, volatility_str)Are there any user-facing changes?
The
MultiColumnWindowUDFrename is a Rust-side breaking change, so addingapi changeeven though no Python-facing API breaks.