feat: Add Postgres Online Store Async Feature Retrieval#2
Closed
job-almekinders wants to merge 28 commits into
Closed
feat: Add Postgres Online Store Async Feature Retrieval#2job-almekinders wants to merge 28 commits into
job-almekinders wants to merge 28 commits into
Conversation
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Set connection read only Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Addition Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Use new ConnectionPool Pass kwargs as named argument Use executemany over execute_values Remove not-required open argument in psycopg.connect Improve Use SpooledTemporaryFile Use max_size and add docstring Properly write with StringIO Utils: Use SpooledTemporaryFile over StringIO object Add replace Fix df_to_postgres_table Remove import Utils Signed-off-by: Job Almekinders <[email protected]>
Add log statement Lint: Fix _to_arrow_internal Lint: Fix _get_entity_df_event_timestamp_range Update exception Use ZeroColumnQueryResult Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Update warning Fix Format warning Add typehints Use better variable name Signed-off-by: Job Almekinders <[email protected]>
…nline store Remove ConnectionType.pool_async Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
… in online_read_async Apply fixes Online: Create helper functions for online_read, and re-use them also in online_read_async Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
Signed-off-by: Job Almekinders <[email protected]>
job-almekinders
commented
Jun 25, 2024
Comment on lines
+97
to
+98
| if self._conn_async: | ||
| await self._conn_async.close() |
Owner
Author
There was a problem hiding this comment.
I'm not too sure about this part, however, the following integration test is getting stuck without this part:
sdk/python/tests/integration/online_store/test_universal_online.py::test_async_online_retrieval_with_event_timestamps
Comment on lines
+198
to
+266
| @staticmethod | ||
| def _construct_query_and_params( | ||
| config: RepoConfig, | ||
| table: FeatureView, | ||
| keys: List[bytes], | ||
| requested_features: Optional[List[str]] = None, | ||
| ) -> Tuple[sql.Composed, Union[Tuple[List[bytes], List[str]], Tuple[List[bytes]]]]: | ||
| """Construct the SQL query based on the given parameters.""" | ||
| if requested_features: | ||
| query = sql.SQL( | ||
| """ | ||
| SELECT entity_key, feature_name, value, event_ts | ||
| FROM {} WHERE entity_key = ANY(%s) and feature_name = ANY(%s); | ||
| """ | ||
| ).format( | ||
| sql.Identifier(_table_id(config.project, table)), | ||
| ) | ||
| params = (keys, requested_features) | ||
| else: | ||
| query = sql.SQL( | ||
| """ | ||
| SELECT entity_key, feature_name, value, event_ts | ||
| FROM {} WHERE entity_key = ANY(%s); | ||
| """ | ||
| ).format( | ||
| sql.Identifier(_table_id(config.project, table)), | ||
| ) | ||
| params = (keys, []) | ||
| return query, params | ||
|
|
||
| @staticmethod | ||
| def _prepare_keys( | ||
| config: RepoConfig, entity_keys: List[EntityKeyProto] | ||
| ) -> List[bytes]: | ||
| """Prepare all keys in a list to make fewer round trips to the database.""" | ||
| return [ | ||
| serialize_entity_key( | ||
| entity_key, | ||
| entity_key_serialization_version=config.entity_key_serialization_version, | ||
| ) | ||
| for entity_key in entity_keys | ||
| ] | ||
|
|
||
| @staticmethod | ||
| def _process_rows( | ||
| keys: List[bytes], rows: List[Tuple] | ||
| ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: | ||
| """Transform the retrieved rows in the desired output. | ||
|
|
||
| PostgreSQL may return rows in an unpredictable order. Therefore, `values_dict` | ||
| is created to quickly look up the correct row using the keys, since these are | ||
| actually in the correct order. | ||
| """ | ||
| values_dict = defaultdict(list) | ||
| for row in rows if rows is not None else []: | ||
| values_dict[ | ||
| row[0] if isinstance(row[0], bytes) else row[0].tobytes() | ||
| ].append(row[1:]) | ||
|
|
||
| result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] | ||
| for key in keys: | ||
| if key in values_dict: | ||
| value = values_dict[key] | ||
| res = {} | ||
| for feature_name, value_bin, event_ts in value: | ||
| val = ValueProto() | ||
| val.ParseFromString(bytes(value_bin)) | ||
| res[feature_name] = val | ||
| result.append((event_ts, res)) | ||
| else: | ||
| result.append((None, None)) |
Owner
Author
There was a problem hiding this comment.
No new code is introduced here. This is the online_read method broken down into helper functions.
Signed-off-by: Job Almekinders <[email protected]>
2d86e41 to
fb0d977
Compare
7165c51 to
3328530
Compare
Owner
Author
|
Will open PR with this branch once this PR is merged |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What this PR does / why we need it:
This PR implements the
online_read_asyncmethod on thePostgreSQLOnlineStoreclass.To make this happen, the
_get_conn_asyncmethod is added toPostgreSQLOnlineStore, to be able to retrieve anAsyncConnectionPoolonline_readmethod is broken down using several helper functions. These helper functions are re-used in theonline_read_asyncmethod.test_async_online_retrieval_with_event_timestampstest is activated forpostgresWhich issue(s) this PR fixes:
Fixes feast-dev#4260
Dependencies
Blocked by feast-dev#4303