Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add imports #161

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,82 @@ RerankResult result = inference.rerank(model, query, documents, rankFields, topN
System.out.println(result.getData());
```

# Imports
## Start an import

The following example initiates an asynchronous import of vectors from object storage into the index.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ImportErrorMode;
import org.openapitools.db_data.client.model.StartImportResponse;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// s3 uri
String uri = "s3://path/to/file.parquet";

// Start an import
StartImportResponse response = asyncIndex.startImport(uri, "123-456-789", ImportErrorMode.OnErrorEnum.CONTINUE);
```

## List imports

The following example lists all recent and ongoing import operations for the specified index.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ListImportsResponse;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// List imports
ListImportsResponse response = asyncIndex.listImports(100, "some-pagination-token");
```

## Describe an import

The following example retrieves detailed information about a specific import operation using its unique identifier.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ImportModel;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// Describe import
ImportModel importDetails = asyncIndex.describeImport("1");
```

## Cancel an import

The following example attempts to cancel an ongoing import operation using its unique identifier.

```java
import org.openapitools.db_data.client.ApiException;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// Cancel import
asyncIndex.cancelImport("2");
```

## Examples

- The data and control plane operation examples can be found in `io/pinecone/integration` folder.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.pinecone.clients.AsyncIndex;
import io.pinecone.clients.Index;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeValidationException;
import io.pinecone.proto.VectorServiceGrpc;
Expand All @@ -24,6 +25,7 @@ public class QueryErrorTest {

@BeforeAll
public static void setUp() throws IOException, InterruptedException {
PineconeConfig config = mock(PineconeConfig.class);
PineconeConnection connectionMock = mock(PineconeConnection.class);

VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
Expand All @@ -33,7 +35,7 @@ public static void setUp() throws IOException, InterruptedException {
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);

index = new Index(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.pinecone.clients.AsyncIndex;
import io.pinecone.clients.Index;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeException;
import io.pinecone.exceptions.PineconeValidationException;
Expand All @@ -27,6 +28,7 @@ public class UpsertErrorTest {

@BeforeAll
public static void setUp() throws IOException, InterruptedException {
PineconeConfig config = mock(PineconeConfig.class);
PineconeConnection connectionMock = mock(PineconeConnection.class);

VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
Expand All @@ -36,7 +38,7 @@ public static void setUp() throws IOException, InterruptedException {
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);

index = new Index(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
}

@Test
Expand Down
200 changes: 199 additions & 1 deletion src/main/java/io/pinecone/clients/AsyncIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,32 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Struct;
import io.pinecone.commons.IndexInterface;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeValidationException;
import io.pinecone.proto.*;
import io.pinecone.proto.DeleteRequest;
import io.pinecone.proto.DescribeIndexStatsRequest;
import io.pinecone.proto.FetchResponse;
import io.pinecone.proto.ListResponse;
import io.pinecone.proto.QueryRequest;
import io.pinecone.proto.QueryResponse;
import io.pinecone.proto.UpdateRequest;
import io.pinecone.proto.UpsertRequest;
import io.pinecone.proto.UpsertResponse;
import io.pinecone.unsigned_indices_model.QueryResponseWithUnsignedIndices;
import io.pinecone.unsigned_indices_model.VectorWithUnsignedIndices;
import okhttp3.OkHttpClient;
import org.openapitools.db_data.client.ApiClient;
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.Configuration;
import org.openapitools.db_data.client.api.BulkOperationsApi;
import org.openapitools.db_data.client.model.*;

import java.util.List;

import static io.pinecone.clients.Pinecone.buildOkHttpClient;


/**
* A client for interacting with a Pinecone index via GRPC asynchronously. Allows for upserting, querying, fetching, updating, and deleting vectors.
Expand All @@ -38,6 +56,7 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
private final PineconeConnection connection;
private final VectorServiceGrpc.VectorServiceFutureStub asyncStub;
private final String indexName;
BulkOperationsApi bulkOperations;

/**
* Constructs an {@link AsyncIndex} instance for interacting with a Pinecone index.
Expand All @@ -55,14 +74,24 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
* @param indexName The name of the index to interact with. The index host will be automatically resolved.
* @throws PineconeValidationException if the connection object is null.
*/
public AsyncIndex(PineconeConnection connection, String indexName) {
public AsyncIndex(PineconeConfig config, PineconeConnection connection, String indexName) {
if (connection == null) {
throw new PineconeValidationException("Pinecone connection object cannot be null.");
}

this.indexName = indexName;
this.connection = connection;
this.asyncStub = connection.getAsyncStub();

OkHttpClient customOkHttpClient = config.getCustomOkHttpClient();
ApiClient apiClient = (customOkHttpClient != null) ? new ApiClient(customOkHttpClient) : new ApiClient(buildOkHttpClient(config.getProxyConfig()));
apiClient.setApiKey(config.getApiKey());
apiClient.setUserAgent(config.getUserAgent());
apiClient.addDefaultHeader("X-Pinecone-Api-Version", Configuration.VERSION);

this.bulkOperations = new BulkOperationsApi(apiClient);
String protocol = config.isTLSEnabled() ? "https://" : "http://";
bulkOperations.setCustomBaseUrl(protocol + config.getHost());
}

/**
Expand Down Expand Up @@ -1039,6 +1068,175 @@ public ListenableFuture<ListResponse> list(String namespace, String prefix, Stri
return asyncStub.list(listRequest);
}

/**
rohanshah18 marked this conversation as resolved.
Show resolved Hide resolved
* <p>Initiates an asynchronous import of vectors from object storage into a specified index.</p>
*
* <p>The method constructs a {@link StartImportRequest} using the provided URI for the data and optional
* storage integration ID. It also allows for specifying how to respond to errors during the import process
* through the {@link ImportErrorMode}. The import operation is then initiated via a call to the
* underlying {@link BulkOperationsApi}.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ImportErrorMode;
*
* ...
*
* String uri = "s3://path/to/file.parquet";
* String integrationId = "123-456-789";
* StartImportResponse response = asyncIndex.startImport(uri, integrationId, ImportErrorMode.OnErrorEnum.CONTINUE);
* }</pre>
*
* @param uri The URI prefix under which the data to import is available.
* @param integrationId The ID of the storage integration to access the data. Can be null or empty.
* @param errorMode Indicates how to respond to errors during the import process. Can be null.
* @return {@link StartImportResponse} containing the details of the initiated import operation.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public StartImportResponse startImport(String uri, String integrationId, ImportErrorMode.OnErrorEnum errorMode) throws ApiException {
StartImportRequest importRequest = new StartImportRequest();
importRequest.setUri(uri);
if(integrationId != null && !integrationId.isEmpty()) {
importRequest.setIntegrationId(integrationId);
}
if(errorMode != null) {
ImportErrorMode importErrorMode = new ImportErrorMode().onError(errorMode);
importRequest.setErrorMode(importErrorMode);
}

return bulkOperations.startBulkImport(importRequest);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index with default limit and pagination.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the default value set to 100
* number of operations to return per page. The pagination token is set to null as well by default.</p>
*
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
*
* ListImportsResponse response = asyncIndex.listImports();
* }</pre>
*
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports() throws ApiException {
return listImports(100, null);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index based on limit.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
* maximum number of operations to return per page. The pagination token is set to null by default.</p>
*
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
* int limit = 10;
* ListImportsResponse response = asyncIndex.listImports(limit);
* }</pre>
*
* @param limit The maximum number of operations to return per page. Default is 100.
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports(Integer limit) throws ApiException {
return listImports(limit, null);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
* maximum number of operations to return per page. The pagination token allows for
* deterministic pagination through the list of import operations.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
* int limit = 10;
* String paginationToken = "some-pagination-token";
* ListImportsResponse response = asyncIndex.listImports(limit, paginationToken);
* }</pre>
*
* @param limit The maximum number of operations to return per page. Default is 100.
* @param paginationToken The token to continue a previous listing operation. Can be null or empty.
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports(Integer limit, String paginationToken) throws ApiException {
return bulkOperations.listBulkImports(limit, paginationToken);
}

/**
* <p>Retrieves detailed information about a specific import operation using its unique identifier.</p>
*
* <p>The method constructs a request to fetch details of the specified import operation by its ID,
* allowing users to monitor the status and results of the import process.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ImportModel;
*
* ...
*
* String importId = "1";
* ImportModel importDetails = asyncIndex.describeImport(importId);
* }</pre>
*
* @param id The unique identifier for the import operation.
* @return {@link ImportModel} containing details of the specified import operation.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ImportModel describeImport(String id) throws ApiException {
return bulkOperations.describeBulkImport(id);
}

/**
* <p>Attempts to cancel an ongoing import operation using its unique identifier.</p>
*
* <p>The method issues a request to cancel the specified import operation if it has not yet finished.
* If the operation is already completed, the method has no effect.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
*
* ...
* String importId = "2";
* asyncIndex.cancelImport(importId);
* }</pre>
*
* @param id The unique identifier for the import operation to cancel.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public void cancelImport(String id) throws ApiException {
bulkOperations.cancelBulkImport(id);
}

/**
* {@inheritDoc}
* Closes the current index connection gracefully, releasing any resources associated with it. This method should
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/pinecone/clients/Pinecone.java
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ public AsyncIndex getAsyncIndexConnection(String indexName) throws PineconeValid

config.setHost(getIndexHost(indexName));
PineconeConnection connection = getConnection(indexName);
return new AsyncIndex(connection, indexName);
return new AsyncIndex(config, connection, indexName);
}

/**
Expand Down
Loading