From e72d06d4d00f24d434836f9f1d29af50b526d697 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Wed, 15 May 2024 11:11:28 -0500 Subject: [PATCH 1/9] Add search functionality to SQL registry --- sdk/python/feast/infra/registry/sql.py | 160 ++++++++++++------ .../requirements/py3.10-ci-requirements.txt | 2 +- 2 files changed, 108 insertions(+), 54 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 80818acdac..e1041de311 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -200,12 +200,20 @@ class SqlRegistryConfig(RegistryConfig): If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """ +def _sanitize_search_params(**kwargs): + search_params = {} + for key, value in kwargs.items(): + if value is not None: + search_params[key] = value + return search_params + + class SqlRegistry(BaseRegistry): def __init__( - self, - registry_config: Optional[Union[RegistryConfig, SqlRegistryConfig]], - project: str = None, - repo_path: Optional[Path] = None, + self, + registry_config: Optional[Union[RegistryConfig, SqlRegistryConfig]], + project: str = None, + repo_path: Optional[Path] = None, ): assert registry_config is not None, "SqlRegistry needs a valid registry_config" # pool_recycle will recycle connections after the given number of seconds has passed @@ -361,7 +369,7 @@ def get_stream_feature_view( ) def list_stream_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -407,7 +415,7 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti ) def get_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -426,7 +434,7 @@ def get_feature_view( ) def get_on_demand_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -445,7 +453,7 @@ def get_on_demand_feature_view( ) def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ): if allow_cache: self._check_if_registry_refreshed() @@ -464,7 +472,7 @@ def get_request_feature_view( ) def get_feature_service( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: if allow_cache: self._check_if_registry_refreshed() @@ -483,7 +491,7 @@ def get_feature_service( ) def get_saved_dataset( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: if allow_cache: self._check_if_registry_refreshed() @@ -502,7 +510,7 @@ def get_saved_dataset( ) def get_validation_reference( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: if allow_cache: self._check_if_registry_refreshed() @@ -521,7 +529,7 @@ def get_validation_reference( ) def list_validation_references( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ValidationReference]: if allow_cache: self._check_if_registry_refreshed() @@ -575,7 +583,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): ) def get_data_source( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> DataSource: if allow_cache: self._check_if_registry_refreshed() @@ -594,7 +602,7 @@ def get_data_source( ) def list_data_sources( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[DataSource]: if allow_cache: self._check_if_registry_refreshed() @@ -606,14 +614,14 @@ def list_data_sources( ) def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True + self, data_source: DataSource, project: str, commit: bool = True ): return self._apply_object( data_sources, project, "data_source_name", data_source, "data_source_proto" ) def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): fv_table = self._infer_fv_table(feature_view) @@ -622,7 +630,7 @@ def apply_feature_view( ) def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, feature_service: FeatureService, project: str, commit: bool = True ): return self._apply_object( feature_services, @@ -643,7 +651,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): raise DataSourceObjectNotFoundException(name, project) def list_feature_services( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureService]: if allow_cache: self._check_if_registry_refreshed() @@ -659,7 +667,7 @@ def list_feature_services( ) def list_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -671,7 +679,7 @@ def list_feature_views( ) def list_saved_datasets( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: if allow_cache: self._check_if_registry_refreshed() @@ -687,7 +695,7 @@ def list_saved_datasets( ) def list_request_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -703,7 +711,7 @@ def list_request_feature_views( ) def list_on_demand_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -719,7 +727,7 @@ def list_on_demand_feature_views( ) def list_project_metadata( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: if allow_cache: self._check_if_registry_refreshed() @@ -738,8 +746,8 @@ def list_project_metadata( project_metadata.project_uuid = row["metadata_value"] if ( - row["metadata_key"] - == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value + row["metadata_key"] + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value ): project_metadata.last_updated_timestamp = ( datetime.utcfromtimestamp(int(row["metadata_value"])) @@ -750,10 +758,10 @@ def list_project_metadata( return [] def apply_saved_dataset( - self, - saved_dataset: SavedDataset, - project: str, - commit: bool = True, + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, ): return self._apply_object( saved_datasets, @@ -764,10 +772,10 @@ def apply_saved_dataset( ) def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, ): return self._apply_object( validation_references, @@ -778,12 +786,12 @@ def apply_validation_reference( ) def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, ): table = self._infer_fv_table(feature_view) python_class, proto_class = self._infer_fv_classes(feature_view) @@ -842,10 +850,10 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: return Infra() def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], ): table = self._infer_fv_table(feature_view) @@ -904,7 +912,7 @@ def _infer_fv_classes(self, feature_view): return python_class, proto_class def get_user_metadata( - self, project: str, feature_view: BaseFeatureView + self, project: str, feature_view: BaseFeatureView ) -> Optional[bytes]: table = self._infer_fv_table(feature_view) @@ -919,6 +927,7 @@ def get_user_metadata( def proto(self) -> RegistryProto: r = RegistryProto() + # last_updated_timestamps = [] def process_project(project): @@ -959,7 +968,7 @@ def process_project(project): # Use a ThreadPoolExecutor to process projects concurrently with concurrent.futures.ThreadPoolExecutor( - max_workers=MAX_WORKERS + max_workers=MAX_WORKERS ) as executor: # Adjust max_workers as needed executor.map(process_project, projects) @@ -976,13 +985,13 @@ def commit(self): pass def _apply_object( - self, - table: Table, - project: str, - id_field_name: str, - obj: Any, - proto_field_name: str, - name: Optional[str] = None, + self, + table: Table, + project: str, + id_field_name: str, + obj: Any, + proto_field_name: str, + name: Optional[str] = None, ): name = name or (obj.name if hasattr(obj, "name") else None) assert name, f"name needs to be provided for {obj}" @@ -1017,7 +1026,7 @@ def _apply_object( obj_proto = obj.to_proto() if hasattr(obj_proto, "meta") and hasattr( - obj_proto.meta, "created_timestamp" + obj_proto.meta, "created_timestamp" ): obj_proto.meta.created_timestamp.FromDatetime(update_datetime) @@ -1063,6 +1072,51 @@ def create_project_if_not_exists(self, project): if new_project: self._set_last_updated_metadata(update_datetime, project) + def search(self, **kwargs) -> List[Union[FeatureView, ProjectMetadata]]: + """ + Search for feature views or projects based on the provided search parameters. + Since the SQL database stores only metadata and protos, we have to pull all potentially matching objects + and filter in memory. + """ + + search_params = _sanitize_search_params(**kwargs) + + # Set default values for created_at and updated_at + created_at_obj = datetime.strptime(search_params.get('created_at', '0001-01-01T00:00:00'), '%Y-%m-%dT%H:%M:%S') + updated_at_obj = datetime.strptime(search_params.get('updated_at', '0001-01-01T00:00:00'), '%Y-%m-%dT%H:%M:%S') + + with self.engine.connect() as conn: + stmt = select(feature_views).where(feature_views.c.feature_view_name == search_params.get('name', '*')) + rows = conn.execute(stmt).all() + if rows: + fv_list = [ + FeatureView.from_proto( + FeatureViewProto.FromString(row["feature_view_proto"]) + ) + for row in rows + ] + + fv_list = [view for view in fv_list if view.created_timestamp >= created_at_obj] + fv_list = [view for view in fv_list if view.last_updated_timestamp >= updated_at_obj] + + if 'application' in search_params: + fv_list = [view for view in fv_list if view.tags.get('application') == search_params.get('application')] + if 'owning_team' in search_params: + fv_list = [view for view in fv_list if view.tags.get('team') == search_params.get('owning_team')] + if 'online' in search_params: + fv_list = [view for view in fv_list if view.online == search_params.get('online')] + + else: + fv_list = [] + + project_list = self.get_all_project_metadata(self) + + if 'name' in search_params: + project_list = [project for project in project_list if project.project_name == search_params.get('name')] + project_list = [project for project in project_list if project.last_updated_timestamp >= updated_at_obj] + + return project_list.extend(fv_list) + def _delete_object( self, table: Table, diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 0ef7d35489..df5e7fcd51 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -534,7 +534,7 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1.0 # via eg-feast (setup.py) -mysqlclient==2.2.0 +mysqlclient==2.2.4 # via eg-feast (setup.py) nbclient==0.8.0 # via nbconvert From 8baa53812dd40d7705c0fd231ecb39e04dcae99c Mon Sep 17 00:00:00 2001 From: William Parsley Date: Fri, 17 May 2024 15:12:36 -0500 Subject: [PATCH 2/9] switch back from kwargs --- sdk/python/feast/infra/registry/sql.py | 184 +++++++++++++------------ 1 file changed, 99 insertions(+), 85 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index e1041de311..9892064561 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -200,20 +200,12 @@ class SqlRegistryConfig(RegistryConfig): If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """ -def _sanitize_search_params(**kwargs): - search_params = {} - for key, value in kwargs.items(): - if value is not None: - search_params[key] = value - return search_params - - class SqlRegistry(BaseRegistry): def __init__( - self, - registry_config: Optional[Union[RegistryConfig, SqlRegistryConfig]], - project: str = None, - repo_path: Optional[Path] = None, + self, + registry_config: Optional[Union[RegistryConfig, SqlRegistryConfig]], + project: str = None, + repo_path: Optional[Path] = None, ): assert registry_config is not None, "SqlRegistry needs a valid registry_config" # pool_recycle will recycle connections after the given number of seconds has passed @@ -369,7 +361,7 @@ def get_stream_feature_view( ) def list_stream_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -415,7 +407,7 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti ) def get_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -434,7 +426,7 @@ def get_feature_view( ) def get_on_demand_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -453,7 +445,7 @@ def get_on_demand_feature_view( ) def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ): if allow_cache: self._check_if_registry_refreshed() @@ -472,7 +464,7 @@ def get_request_feature_view( ) def get_feature_service( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: if allow_cache: self._check_if_registry_refreshed() @@ -491,7 +483,7 @@ def get_feature_service( ) def get_saved_dataset( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: if allow_cache: self._check_if_registry_refreshed() @@ -510,7 +502,7 @@ def get_saved_dataset( ) def get_validation_reference( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: if allow_cache: self._check_if_registry_refreshed() @@ -529,7 +521,7 @@ def get_validation_reference( ) def list_validation_references( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ValidationReference]: if allow_cache: self._check_if_registry_refreshed() @@ -583,7 +575,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): ) def get_data_source( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> DataSource: if allow_cache: self._check_if_registry_refreshed() @@ -602,7 +594,7 @@ def get_data_source( ) def list_data_sources( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[DataSource]: if allow_cache: self._check_if_registry_refreshed() @@ -614,14 +606,14 @@ def list_data_sources( ) def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True + self, data_source: DataSource, project: str, commit: bool = True ): return self._apply_object( data_sources, project, "data_source_name", data_source, "data_source_proto" ) def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): fv_table = self._infer_fv_table(feature_view) @@ -630,7 +622,7 @@ def apply_feature_view( ) def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, feature_service: FeatureService, project: str, commit: bool = True ): return self._apply_object( feature_services, @@ -651,7 +643,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): raise DataSourceObjectNotFoundException(name, project) def list_feature_services( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureService]: if allow_cache: self._check_if_registry_refreshed() @@ -667,7 +659,7 @@ def list_feature_services( ) def list_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -679,7 +671,7 @@ def list_feature_views( ) def list_saved_datasets( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: if allow_cache: self._check_if_registry_refreshed() @@ -695,7 +687,7 @@ def list_saved_datasets( ) def list_request_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -711,7 +703,7 @@ def list_request_feature_views( ) def list_on_demand_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -727,7 +719,7 @@ def list_on_demand_feature_views( ) def list_project_metadata( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: if allow_cache: self._check_if_registry_refreshed() @@ -746,8 +738,8 @@ def list_project_metadata( project_metadata.project_uuid = row["metadata_value"] if ( - row["metadata_key"] - == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value + row["metadata_key"] + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value ): project_metadata.last_updated_timestamp = ( datetime.utcfromtimestamp(int(row["metadata_value"])) @@ -758,10 +750,10 @@ def list_project_metadata( return [] def apply_saved_dataset( - self, - saved_dataset: SavedDataset, - project: str, - commit: bool = True, + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, ): return self._apply_object( saved_datasets, @@ -772,10 +764,10 @@ def apply_saved_dataset( ) def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, ): return self._apply_object( validation_references, @@ -786,12 +778,12 @@ def apply_validation_reference( ) def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, ): table = self._infer_fv_table(feature_view) python_class, proto_class = self._infer_fv_classes(feature_view) @@ -850,10 +842,10 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: return Infra() def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], ): table = self._infer_fv_table(feature_view) @@ -912,7 +904,7 @@ def _infer_fv_classes(self, feature_view): return python_class, proto_class def get_user_metadata( - self, project: str, feature_view: BaseFeatureView + self, project: str, feature_view: BaseFeatureView ) -> Optional[bytes]: table = self._infer_fv_table(feature_view) @@ -968,7 +960,7 @@ def process_project(project): # Use a ThreadPoolExecutor to process projects concurrently with concurrent.futures.ThreadPoolExecutor( - max_workers=MAX_WORKERS + max_workers=MAX_WORKERS ) as executor: # Adjust max_workers as needed executor.map(process_project, projects) @@ -985,13 +977,13 @@ def commit(self): pass def _apply_object( - self, - table: Table, - project: str, - id_field_name: str, - obj: Any, - proto_field_name: str, - name: Optional[str] = None, + self, + table: Table, + project: str, + id_field_name: str, + obj: Any, + proto_field_name: str, + name: Optional[str] = None, ): name = name or (obj.name if hasattr(obj, "name") else None) assert name, f"name needs to be provided for {obj}" @@ -1026,7 +1018,7 @@ def _apply_object( obj_proto = obj.to_proto() if hasattr(obj_proto, "meta") and hasattr( - obj_proto.meta, "created_timestamp" + obj_proto.meta, "created_timestamp" ): obj_proto.meta.created_timestamp.FromDatetime(update_datetime) @@ -1072,48 +1064,70 @@ def create_project_if_not_exists(self, project): if new_project: self._set_last_updated_metadata(update_datetime, project) - def search(self, **kwargs) -> List[Union[FeatureView, ProjectMetadata]]: + def search( + self, + name="", + application="", + owning_team="", + created_at=datetime(1, 1, 1), + updated_at=datetime(1, 1, 1), + online=True, + ) -> List[Union[FeatureView, ProjectMetadata]]: """ - Search for feature views or projects based on the provided search parameters. - Since the SQL database stores only metadata and protos, we have to pull all potentially matching objects - and filter in memory. + Search for feature views or projects based on the provided search + parameters. Since the SQL database stores only metadata and protos, we + have to pull all potentially matching objects and filter in memory. """ - search_params = _sanitize_search_params(**kwargs) - - # Set default values for created_at and updated_at - created_at_obj = datetime.strptime(search_params.get('created_at', '0001-01-01T00:00:00'), '%Y-%m-%dT%H:%M:%S') - updated_at_obj = datetime.strptime(search_params.get('updated_at', '0001-01-01T00:00:00'), '%Y-%m-%dT%H:%M:%S') - with self.engine.connect() as conn: - stmt = select(feature_views).where(feature_views.c.feature_view_name == search_params.get('name', '*')) + stmt = select(feature_views).where( + feature_views.c.feature_view_name == name + ) rows = conn.execute(stmt).all() if rows: - fv_list = [ + fv_list: List[FeatureView] = [ FeatureView.from_proto( FeatureViewProto.FromString(row["feature_view_proto"]) ) for row in rows ] - fv_list = [view for view in fv_list if view.created_timestamp >= created_at_obj] - fv_list = [view for view in fv_list if view.last_updated_timestamp >= updated_at_obj] + fv_list = [ + view + for view in fv_list + if getattr(view, "created_timestamp", datetime(1, 1, 1)) + >= created_at + ] + fv_list = [ + view + for view in fv_list + if getattr(view, "last_updated_timestamp", datetime(1, 1, 1)) + >= updated_at + ] - if 'application' in search_params: - fv_list = [view for view in fv_list if view.tags.get('application') == search_params.get('application')] - if 'owning_team' in search_params: - fv_list = [view for view in fv_list if view.tags.get('team') == search_params.get('owning_team')] - if 'online' in search_params: - fv_list = [view for view in fv_list if view.online == search_params.get('online')] + fv_list = [ + view for view in fv_list if view.tags.get("team") == owning_team + ] + fv_list = [ + view + for view in fv_list + if view.tags.get("application") == application + ] + fv_list = [view for view in fv_list if view.online == online] else: fv_list = [] - project_list = self.get_all_project_metadata(self) + project_list: List[ProjectMetadataModel] = self.get_all_project_metadata() - if 'name' in search_params: - project_list = [project for project in project_list if project.project_name == search_params.get('name')] - project_list = [project for project in project_list if project.last_updated_timestamp >= updated_at_obj] + project_list = [ + project for project in project_list if project.project_name == name + ] + project_list = [ + project + for project in project_list + if project.last_updated_timestamp >= updated_at + ] return project_list.extend(fv_list) From 0c720a6b42985257e79cd9abb1f87749beb54cd3 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 18:12:35 -0500 Subject: [PATCH 3/9] fix return value --- sdk/python/feast/infra/registry/sql.py | 48 ++++++++++++++------------ 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 9892064561..6841a2bd00 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1066,26 +1066,26 @@ def create_project_if_not_exists(self, project): def search( self, + online, name="", application="", owning_team="", created_at=datetime(1, 1, 1), updated_at=datetime(1, 1, 1), - online=True, ) -> List[Union[FeatureView, ProjectMetadata]]: """ Search for feature views or projects based on the provided search parameters. Since the SQL database stores only metadata and protos, we have to pull all potentially matching objects and filter in memory. """ - + fv_list = [] with self.engine.connect() as conn: stmt = select(feature_views).where( - feature_views.c.feature_view_name == name + feature_views.c.feature_view_name == name if name else "*" ) rows = conn.execute(stmt).all() if rows: - fv_list: List[FeatureView] = [ + fv_list = [ FeatureView.from_proto( FeatureViewProto.FromString(row["feature_view_proto"]) ) @@ -1095,30 +1095,29 @@ def search( fv_list = [ view for view in fv_list - if getattr(view, "created_timestamp", datetime(1, 1, 1)) - >= created_at + if getattr(view, "created_timestamp", datetime.max) >= created_at ] fv_list = [ view for view in fv_list - if getattr(view, "last_updated_timestamp", datetime(1, 1, 1)) + if getattr(view, "last_updated_timestamp", datetime.max) >= updated_at ] - fv_list = [ - view for view in fv_list if view.tags.get("team") == owning_team - ] - fv_list = [ - view - for view in fv_list - if view.tags.get("application") == application - ] - fv_list = [view for view in fv_list if view.online == online] - - else: - fv_list = [] - - project_list: List[ProjectMetadataModel] = self.get_all_project_metadata() + if owning_team: + fv_list = [ + view for view in fv_list if view.tags.get("team") == owning_team + ] + if application: + fv_list = [ + view + for view in fv_list + if view.tags.get("application") == application + ] + if online: + fv_list = [view for view in fv_list if view.online == online] + + project_list = self.get_all_project_metadata() project_list = [ project for project in project_list if project.project_name == name @@ -1126,10 +1125,13 @@ def search( project_list = [ project for project in project_list - if project.last_updated_timestamp >= updated_at + if getattr(project, "last_updated_timestamp", datetime.max) >= updated_at ] - return project_list.extend(fv_list) + final_list: List[FeatureView | ProjectMetadataModel] = [] + final_list.extend(project_list) + final_list.extend(fv_list) + return final_list def _delete_object( self, From 8180b67cd9b595c771a5ec06cff4c73a6e6ec061 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 18:48:10 -0500 Subject: [PATCH 4/9] fix return typing --- sdk/python/feast/infra/registry/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 6841a2bd00..28ee92b440 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1072,7 +1072,7 @@ def search( owning_team="", created_at=datetime(1, 1, 1), updated_at=datetime(1, 1, 1), - ) -> List[Union[FeatureView, ProjectMetadata]]: + ) -> List[Union[FeatureView, ProjectMetadataModel]]: """ Search for feature views or projects based on the provided search parameters. Since the SQL database stores only metadata and protos, we From 19a669477f203af6651830846244cafe074b9ec2 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 22:06:27 -0500 Subject: [PATCH 5/9] fix timestamp logic --- sdk/python/feast/infra/registry/sql.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 28ee92b440..44d4a37df8 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1070,8 +1070,8 @@ def search( name="", application="", owning_team="", - created_at=datetime(1, 1, 1), - updated_at=datetime(1, 1, 1), + created_at=datetime.min, + updated_at=datetime.min, ) -> List[Union[FeatureView, ProjectMetadataModel]]: """ Search for feature views or projects based on the provided search @@ -1095,13 +1095,14 @@ def search( fv_list = [ view for view in fv_list - if getattr(view, "created_timestamp", datetime.max) >= created_at + if view.created_timestamp is not None + and view.created_timestamp >= created_at ] fv_list = [ view for view in fv_list - if getattr(view, "last_updated_timestamp", datetime.max) - >= updated_at + if view.last_updated_timestamp is not None + and view.last_updated_timestamp >= updated_at ] if owning_team: @@ -1125,7 +1126,8 @@ def search( project_list = [ project for project in project_list - if getattr(project, "last_updated_timestamp", datetime.max) >= updated_at + if project.last_updated_timestamp is not None + and project.last_updated_timestamp >= updated_at ] final_list: List[FeatureView | ProjectMetadataModel] = [] From 54e8728da324f9e4760e5762f67d947e0a269d5c Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 22:55:58 -0500 Subject: [PATCH 6/9] debug statements, name in query --- sdk/python/feast/infra/registry/sql.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 44d4a37df8..98eeaca116 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1081,9 +1081,11 @@ def search( fv_list = [] with self.engine.connect() as conn: stmt = select(feature_views).where( - feature_views.c.feature_view_name == name if name else "*" + name in feature_views.c.feature_view_name ) rows = conn.execute(stmt).all() + print('Before if rows') + print(rows) if rows: fv_list = [ FeatureView.from_proto( @@ -1091,6 +1093,8 @@ def search( ) for row in rows ] + print('after converting from proto') + print(fv_list) fv_list = [ view @@ -1098,25 +1102,35 @@ def search( if view.created_timestamp is not None and view.created_timestamp >= created_at ] + print('1') + print(fv_list) fv_list = [ view for view in fv_list if view.last_updated_timestamp is not None and view.last_updated_timestamp >= updated_at ] + print('2') + print(fv_list) if owning_team: fv_list = [ view for view in fv_list if view.tags.get("team") == owning_team ] + print('3') + print(fv_list) if application: fv_list = [ view for view in fv_list if view.tags.get("application") == application ] + print('4') + print(fv_list) if online: fv_list = [view for view in fv_list if view.online == online] + print('5') + print(fv_list) project_list = self.get_all_project_metadata() From b81c8859b79bda8b0b8657a1487b8f720baf6191 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 23:29:54 -0500 Subject: [PATCH 7/9] wip --- sdk/python/feast/infra/registry/sql.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 98eeaca116..d4e4ebf56d 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1080,9 +1080,13 @@ def search( """ fv_list = [] with self.engine.connect() as conn: - stmt = select(feature_views).where( - name in feature_views.c.feature_view_name - ) + if name: + stmt = select(feature_views).where( + feature_views.c.feature_view_name == name + ) + else: + stmt = select(feature_views) + rows = conn.execute(stmt).all() print('Before if rows') print(rows) From 6319ab33c7074f1ab6b3972cd5712432f3be6480 Mon Sep 17 00:00:00 2001 From: William Parsley Date: Tue, 21 May 2024 23:58:06 -0500 Subject: [PATCH 8/9] wip --- sdk/python/feast/infra/registry/sql.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index d4e4ebf56d..0d14cc9cf1 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1103,16 +1103,16 @@ def search( fv_list = [ view for view in fv_list - if view.created_timestamp is not None - and view.created_timestamp >= created_at + if view.created_timestamp is None + or view.created_timestamp >= created_at ] print('1') print(fv_list) fv_list = [ view for view in fv_list - if view.last_updated_timestamp is not None - and view.last_updated_timestamp >= updated_at + if view.last_updated_timestamp is None + or view.last_updated_timestamp >= updated_at ] print('2') print(fv_list) @@ -1144,8 +1144,8 @@ def search( project_list = [ project for project in project_list - if project.last_updated_timestamp is not None - and project.last_updated_timestamp >= updated_at + if project.last_updated_timestamp is None + or project.last_updated_timestamp >= updated_at ] final_list: List[FeatureView | ProjectMetadataModel] = [] From dfdc5116b5b4e35244c92dfa6e644d434c37f19d Mon Sep 17 00:00:00 2001 From: William Parsley Date: Wed, 26 Jun 2024 13:14:44 -0500 Subject: [PATCH 9/9] Hua changes --- go.mod | 2 ++ go.sum | 4 +++ sdk/python/feast/infra/registry/sql.py | 29 +++++-------------- .../requirements/py3.8-ci-requirements.txt | 8 ++--- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 50052a64c2..dc781e6fd1 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,8 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/gonuts/commander v0.1.0 // indirect + github.com/gonuts/flag v0.1.0 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.16.3 // indirect diff --git a/go.sum b/go.sum index 992c943199..8fc87c1d14 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gonuts/commander v0.1.0 h1:EcDTiVw9oAVORFjQOEOuHQqcl6OXMyTgELocTq6zJ0I= +github.com/gonuts/commander v0.1.0/go.mod h1:qkb5mSlcWodYgo7vs8ulLnXhfinhZsZcm6+H/z1JjgY= +github.com/gonuts/flag v0.1.0 h1:fqMv/MZ+oNGu0i9gp0/IQ/ZaPIDoAZBOBaJoV7viCWM= +github.com/gonuts/flag v0.1.0/go.mod h1:ZTmTGtrSPejTo/SRNhCqwLTmiAgyBdCkLYhHrAoBdz4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v2.0.5+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 0d14cc9cf1..a30b4996a0 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import concurrent.futures import logging import threading @@ -1066,12 +1068,12 @@ def create_project_if_not_exists(self, project): def search( self, - online, name="", application="", owning_team="", created_at=datetime.min, updated_at=datetime.min, + online=False, ) -> List[Union[FeatureView, ProjectMetadataModel]]: """ Search for feature views or projects based on the provided search @@ -1080,16 +1082,12 @@ def search( """ fv_list = [] with self.engine.connect() as conn: + stmt = select(feature_views) if name: - stmt = select(feature_views).where( - feature_views.c.feature_view_name == name - ) - else: - stmt = select(feature_views) + stmt = stmt.where(feature_views.c.feature_view_name == name) rows = conn.execute(stmt).all() - print('Before if rows') - print(rows) + if rows: fv_list = [ FeatureView.from_proto( @@ -1097,8 +1095,6 @@ def search( ) for row in rows ] - print('after converting from proto') - print(fv_list) fv_list = [ view @@ -1106,35 +1102,26 @@ def search( if view.created_timestamp is None or view.created_timestamp >= created_at ] - print('1') - print(fv_list) + fv_list = [ view for view in fv_list if view.last_updated_timestamp is None or view.last_updated_timestamp >= updated_at ] - print('2') - print(fv_list) if owning_team: fv_list = [ view for view in fv_list if view.tags.get("team") == owning_team ] - print('3') - print(fv_list) if application: fv_list = [ view for view in fv_list if view.tags.get("application") == application ] - print('4') - print(fv_list) - if online: + if online is not None: fv_list = [view for view in fv_list if view.online == online] - print('5') - print(fv_list) project_list = self.get_all_project_metadata() diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 5a8ca76192..a4ffddc908 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -216,7 +216,7 @@ executing==1.2.0 # via stack-data fastapi==0.95.2 # via feast (setup.py) -fastavro==1.7.4 +fastavro==1.8.2 # via # feast (setup.py) # pandavro @@ -529,7 +529,7 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1 # via feast (setup.py) -mysqlclient==2.1.1 +mysqlclient==2.2.4 # via feast (setup.py) nbclassic==1.0.0 # via notebook @@ -732,7 +732,7 @@ pyjwt[crypto]==2.7.0 # snowflake-connector-python pymilvus==2.2.14 # via eg-feast (setup.py) -pymssql==2.2.7 +pymssql==2.2.8 # via feast (setup.py) pymysql==1.0.3 # via feast (setup.py) @@ -798,7 +798,7 @@ pytz==2023.3 # pandas # snowflake-connector-python # trino -pyyaml==6.0 +pyyaml==6.0.1 # via # dask # feast (setup.py)