From 6fcbb497ce99568000f0a6891c3594b2ba7a8225 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Thu, 12 Dec 2024 23:38:36 -0500 Subject: [PATCH] enhance: refine query iterator cp(#2412) (#2441) related: #2412 master_pr: https://github.com/milvus-io/pymilvus/pull/2413 Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han --- examples/orm/iterator.py | 4 +++- pymilvus/orm/iterator.py | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/examples/orm/iterator.py b/examples/orm/iterator.py index 3b2c92fd3..a482bedc2 100644 --- a/examples/orm/iterator.py +++ b/examples/orm/iterator.py @@ -118,7 +118,7 @@ def query_iterate_collection_no_offset(collection): query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE], offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL, reduce_stop_for_best="true", iterator_cp_file="/tmp/it_cp") - + best_ids: set = set({}) page_idx = 0 while True: @@ -127,6 +127,8 @@ def query_iterate_collection_no_offset(collection): print("query iteration finished, close") query_iterator.close() break + cursor = query_iterator.get_cursor() + print(f"got pk_cursor:{cursor.str_pk}") for i in range(len(res)): print(res[i]) best_ids.add(res[i]['id']) diff --git a/pymilvus/orm/iterator.py b/pymilvus/orm/iterator.py index a1aa748fa..6bd6115a5 100644 --- a/pymilvus/orm/iterator.py +++ b/pymilvus/orm/iterator.py @@ -11,6 +11,7 @@ MilvusException, ParamError, ) +from pymilvus.grpc_gen import milvus_pb2 as milvus_types from .connections import Connections from .constants import ( @@ -301,6 +302,15 @@ def __maybe_cache(self, result: List): def __is_res_sufficient(self, res: List): return res is not None and len(res) >= self._kwargs[BATCH_SIZE] + def get_cursor(self) -> milvus_types.QueryCursor: + cursor = milvus_types.QueryCursor + cursor.session_ts = self._session_ts + if self._pk_str: + cursor.str_pk = str(self._next_id) + else: + cursor.int_pk = self._next_id + return cursor + def next(self): cached_res = iterator_cache.fetch_cache(self._cache_id_in_use) ret = None