fix: Pre-create S3A event log dir before SparkContext init#6317
Conversation
c8351c5 to
448212d
Compare
R-behera
left a comment
There was a problem hiding this comment.
This looks like a useful guard for the S3A event log edge case, and the focused tests help. One follow-up worth considering is whether some Feast users rely on credentials or endpoint details only through Spark/Hadoop config rather than environment variables. If so, a short note or test around that path could prevent surprises when the pre-create step runs before Spark fully applies the config.
| "spark.hadoop.fs.s3a.endpoint", | ||
| os.environ.get("FEAST_S3A_ENDPOINT", ""), | ||
| ) | ||
| access_key = os.environ.get("AWS_ACCESS_KEY_ID", "") |
There was a problem hiding this comment.
access_key = spark_config.get(
"spark.hadoop.fs.s3a.access.key",
os.environ.get("AWS_ACCESS_KEY_ID", ""),
)
secret_key = spark_config.get(
"spark.hadoop.fs.s3a.secret.key",
os.environ.get("AWS_SECRET_ACCESS_KEY", ""),
)
session_token = spark_config.get(
"spark.hadoop.fs.s3a.session.token",
os.environ.get("AWS_SESSION_TOKEN", ""),
) or None
|
@abhijeet-dhumal Let's handle both comment from devin and @R-behera suggestion |
b60d47c to
19bdd11
Compare
@ntkathole Addressed both your comments ✅ |
@R-behera Good catch on the Spark/Hadoop config credentials path ✅ |
|
|
||
| endpoint = spark_config.get( | ||
| "spark.hadoop.fs.s3a.endpoint", | ||
| os.environ.get("FEAST_S3A_ENDPOINT", ""), |
There was a problem hiding this comment.
Wondering if this can be AWS_ENDPOINT_URL instead or atleast we need to document this new env var in our docs ?
There was a problem hiding this comment.
Good call — switched to AWS_ENDPOINT_URL . No custom env vars to document now. Spark config (spark.hadoop.fs.s3a.endpoint) still takes precedence when set.
|
@abhijeet-dhumal let's fix the linting |
| aws_access_key_id=access_key or None, | ||
| aws_secret_access_key=secret_key or None, | ||
| aws_session_token=session_token, | ||
| config=BotoConfig(signature_version="s3v4"), |
There was a problem hiding this comment.
Also, consider supporting minio or other path style
addressing_style = (
"path"
if spark_config.get("spark.hadoop.fs.s3a.path.style.access", "false").lower() == "true"
else "auto"
)
config=BotoConfig(
signature_version="s3v4",
s3={"addressing_style": addressing_style},
)
There was a problem hiding this comment.
Added .. - _ensure_s3a_event_log_dir now reads spark.hadoop.fs.s3a.path.style.access and passes addressing_style: "path" to BotoConfig when it's "true", otherwise defaults to "auto". Tests cover both paths
…prevent silent materialize failure Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called inside SparkContext.__init__. When spark.eventLog.dir points to an S3A path that doesn't exist yet (S3 has no real directories), SparkContext fails to initialise — silently from Feast's perspective because _materialize_one() catches the exception and returns an ERROR job. Add _ensure_s3a_event_log_dir() to utils.py: before building the SparkSession, check if the S3A prefix exists and write a zero-byte placeholder if it doesn't. Uses boto3 (already a Feast dep via S3 offline store). Non-fatal: logs a warning and lets Spark surface its own error if the write fails. Signed-off-by: abhijeet-dhumal <[email protected]>
… config, add session token support Signed-off-by: abhijeet-dhumal <[email protected]>
…linting Signed-off-by: abhijeet-dhumal <[email protected]>
Signed-off-by: abhijeet-dhumal <[email protected]>
Signed-off-by: abhijeet-dhumal <[email protected]>
22b7e8e to
70215e2
Compare
# [0.63.0](v0.62.0...v0.63.0) (2026-05-04) ### Bug Fixes * Add project filter to apply_data_source and delete_data_source (closes [#6206](#6206)) ([#6322](#6322)) ([96562c4](96562c4)) * Add project_id filter to SnowflakeRegistry UPDATE path ([#6243](#6243)) ([6658b71](6658b71)), closes [#6208](#6208) [#6208](#6208) * Add subprocess timeouts to prevent test_e2e_local hanging on Dask atexit handler ([3de6556](3de6556)) * Ambiguous truth value of array during materialization ([#6259](#6259)) ([d0c8984](d0c8984)) * Auto-detect GCS/S3 registry store when registry is passed as string ([#6260](#6260)) ([7ebcf03](7ebcf03)) * **bigquery:** Prefer query over table in get_table_query_string ([#6360](#6360)) ([77ed779](77ed779)), closes [#6200](#6200) * correct project_id scoping in get_user_metadata and delete_project ([0c469a7](0c469a7)) * disable Redis RDB persistence in test deployments ([44cd682](44cd682)) * Disable snowflake tests temporarily in CI ([#6356](#6356)) ([31d5a98](31d5a98)) * Filter empty SQL commands at execute_snowflake_statement call sites ([#6249](#6249)) ([92ffbb9](92ffbb9)) * Fix five bugs in milvus online store ([#6275](#6275)) ([212504b](212504b)) * Fix issue with apply feature view ([835cda8](835cda8)) * Fix streaming materialization for exotic sources with lazy UDF pipelines ([c07972d](c07972d)) * Handle missing features gracefully instead of panicking ([7d00b3a](7d00b3a)) * Harden informer cache with label selectors and memory optimizations ([#6242](#6242)) ([3f11356](3f11356)) * **helm:** Avoid nil pointer for metrics.enabled inside podAnnotations ([#6251](#6251)) ([c833f1a](c833f1a)) * Include git in feast server image ([fb03c46](fb03c46)) * Include StreamFeatureView in freshness metric ([#6269](#6269)) ([463f16c](463f16c)) * Pre-create S3A event log dir before SparkContext init ([#6317](#6317)) ([9feca77](9feca77)) * Remote Online Store Type Inference Error with All-NULL Columns ([#6063](#6063)) ([de67bdd](de67bdd)) * Remove selector with kustomize overlay using a JSON 6902 patch ([9107a43](9107a43)) * Resolve multiple bugs in SnowflakeRegistry and Snowflake connection handling ([#6315](#6315)) ([7e66a2e](7e66a2e)) * **spark:** BatchFeatureView with TransformationMode.PYTHON now reads all source columns ([a310eaf](a310eaf)) * **spark:** Use SELECT * when feature_name_columns is empty in pull_all_from_table_or_query ([e1b1d2d](e1b1d2d)) * Support pandas mode in feature builder and fix dask column extraction ([863315e](863315e)) * support SQL string as entity_df in RemoteOfflineStore.get_historical_features ([c559889](c559889)) * Wrap LocalOutputNode return value in ArrowTableValue for consist… ([#6286](#6286)) ([a16cd55](a16cd55)) ### Features * Add agent skills and Cursor/Claude rules for Feast development ([312eea3](312eea3)) * Add feature view versioning support to FAISS online store ([b36acb7](b36acb7)) * Add feature view versioning support to Redis and DynamoDB online stores ([#6257](#6257)) ([edf25af](edf25af)), closes [#6164](#6164) [#6163](#6163) * Add optional 'org' in feature view ([#6288](#6288)) ([#6301](#6301)) ([608b105](608b105)) * Add RaySource, to_ray_dataset first-class method, docs, and tests ([1c98157](1c98157)) * Add TLS support for Go Feature Server ([#6229](#6229)) ([28a58d0](28a58d0)) * Add Vector Search support to MongoDBOnlineStore ([#6344](#6344)) ([c102738](c102738)) * Add versioning support to Milvus online store ([#6330](#6330)) ([3268ced](3268ced)) * Addresses performance issues in the Redis online store ([2e50da0](2e50da0)) * Allow to set gpu for ray ([5580ab4](5580ab4)) * Bump redis-py version cap from <5 to <8 ([#6339](#6339)) ([9538180](9538180)) * Expose feature_server, materialization, and openlineage configuration via FeatureStore CRD ([ec6ecfd](ec6ecfd)) * Make online_write_batch_size configurable in MaterializationConfig ([#6268](#6268)) ([d41becf](d41becf)) * Make udf optional if agg defined ([#5689](#5689)) ([#6328](#6328)) ([f630056](f630056)) * MongoDB offline store ([#6138](#6138)) ([8eebad7](8eebad7)) * Optional input_schema for ODFV ([#6308](#6308)) ([#6312](#6312)) ([f08b4e8](f08b4e8)) * Provision minimal TokenReview RBAC for OIDC auth and add SSL error logging in token parser ([#6240](#6240)) ([dca57e8](dca57e8)) * **spark:** Add compute-on-read support for BatchFeatureView in get_… ([#6357](#6357)) ([630d9f8](630d9f8))
What this PR does / why we need it:
When spark.eventLog.enabled: "true" and spark.eventLog.dir points to an S3A path, feast materialize-incremental silently writes nothing to the online store and exits with code 0.
The failure chain:
S3 has no real directories. An empty prefix is indistinguishable from "does not exist", so Spark's pre-flight check always fails on a fresh bucket.
Which issue(s) this PR fixes:
In get_or_create_new_spark_session() (compute_engines/spark/utils.py), before building the SparkSession, call _ensure_s3a_event_log_dir() which:
No-ops for non-S3A paths (hdfs://, file://, etc.) and when event logging is disabled.
Checks
git commit -s)Testing Strategy
Misc