Skip to content

Commit

Permalink
enhance: [2.5] add release_collection, drop_index, create_partition, …
Browse files Browse the repository at this point in the history
…drop_partition, load_partition and release_partition (#2529)

Signed-off-by: Ruichen Bao <ruichen.bao@zju.edu.cn>
  • Loading branch information
brcarry authored Dec 30, 2024
1 parent 3592604 commit 940af37
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 0 deletions.
128 changes: 128 additions & 0 deletions pymilvus/client/async_grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ async def _get_info(self, collection_name: str, timeout: Optional[float] = None,

return fields_info, enable_dynamic

@retry_on_rpc_failure()
async def release_collection(
self, collection_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(collection_name=collection_name, timeout=timeout)
request = Prepare.release_collection("", collection_name)
response = await self._async_stub.ReleaseCollection(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def insert_rows(
self,
Expand Down Expand Up @@ -742,6 +752,124 @@ async def get_index_state(

raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName)

@retry_on_rpc_failure()
async def drop_index(
self,
collection_name: str,
field_name: str,
index_name: str,
timeout: Optional[float] = None,
**kwargs,
):
await self.ensure_channel_ready()
check_pass_param(collection_name=collection_name, timeout=timeout)
request = Prepare.drop_index_request(collection_name, field_name, index_name)
response = await self._async_stub.DropIndex(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def create_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name=partition_name, timeout=timeout
)
request = Prepare.create_partition_request(collection_name, partition_name)
response = await self._async_stub.CreatePartition(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def drop_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name=partition_name, timeout=timeout
)
request = Prepare.drop_partition_request(collection_name, partition_name)

response = await self._async_stub.DropPartition(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def load_partitions(
self,
collection_name: str,
partition_names: List[str],
replica_number: int = 1,
timeout: Optional[float] = None,
**kwargs,
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name,
partition_name_array=partition_names,
replica_number=replica_number,
timeout=timeout,
)
refresh = kwargs.get("refresh", kwargs.get("_refresh", False))
resource_groups = kwargs.get("resource_groups", kwargs.get("_resource_groups"))
load_fields = kwargs.get("load_fields", kwargs.get("_load_fields"))
skip_load_dynamic_field = kwargs.get(
"skip_load_dynamic_field", kwargs.get("_skip_load_dynamic_field", False)
)

request = Prepare.load_partitions(
"",
collection_name,
partition_names,
replica_number,
refresh,
resource_groups,
load_fields,
skip_load_dynamic_field,
)
response = await self._async_stub.LoadPartitions(request, timeout=timeout)
check_status(response)

await self.wait_for_loading_partitions(collection_name, partition_names, is_refresh=refresh)

@retry_on_rpc_failure()
async def wait_for_loading_partitions(
self,
collection_name: str,
partition_names: List[str],
timeout: Optional[float] = None,
is_refresh: bool = False,
):
start = time.time()

def can_loop(t: int) -> bool:
return True if timeout is None else t <= (start + timeout)

while can_loop(time.time()):
progress = await self.get_loading_progress(
collection_name, partition_names, timeout=timeout, is_refresh=is_refresh
)
if progress >= 100:
return
await asyncio.sleep(Config.WaitTimeDurationWhenLoad)
raise MilvusException(
message=f"wait for loading partition timeout, collection: {collection_name}, partitions: {partition_names}"
)

@retry_on_rpc_failure()
async def release_partitions(
self,
collection_name: str,
partition_names: List[str],
timeout: Optional[float] = None,
**kwargs,
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name_array=partition_names, timeout=timeout
)
request = Prepare.release_partitions("", collection_name, partition_names)
response = await self._async_stub.ReleasePartitions(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def get(
self,
Expand Down
53 changes: 53 additions & 0 deletions pymilvus/milvus_client/async_milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ async def load_collection(
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

async def release_collection(
self, collection_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
try:
await conn.release_collection(collection_name, timeout=timeout, **kwargs)
except MilvusException as ex:
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

async def create_index(
self,
collection_name: str,
Expand Down Expand Up @@ -201,6 +211,49 @@ async def _create_index(
logger.error("Failed to create an index on collection: %s", collection_name)
raise ex from ex

async def drop_index(
self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.drop_index(collection_name, "", index_name, timeout=timeout, **kwargs)

async def create_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.create_partition(collection_name, partition_name, timeout=timeout, **kwargs)

async def drop_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.drop_partition(collection_name, partition_name, timeout=timeout, **kwargs)

async def load_partitions(
self,
collection_name: str,
partition_names: Union[str, List[str]],
timeout: Optional[float] = None,
**kwargs,
):
if isinstance(partition_names, str):
partition_names = [partition_names]

conn = self._get_connection()
await conn.load_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

async def release_partitions(
self,
collection_name: str,
partition_names: Union[str, List[str]],
timeout: Optional[float] = None,
**kwargs,
):
if isinstance(partition_names, str):
partition_names = [partition_names]
conn = self._get_connection()
await conn.release_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

async def insert(
self,
collection_name: str,
Expand Down

0 comments on commit 940af37

Please sign in to comment.