Skip to content

Commit

Permalink
Add imports (#161)
Browse files Browse the repository at this point in the history
## Problem

Add four endpoints of the `BulkOperationsApi`.

## Solution

Added the following four endpoints of the `BulkOperationsApi`:
1. `startImport(String uri, String integrationId,
ImportErrorMode.OnErrorEnum errorMode)`
2. `describeImport(Integer limit, String paginationToken)`
3. `listImport(String id)`
4. `cancelImport(String id)`

## Type of Change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [X] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] This change requires a documentation update
- [ ] Infrastructure change (CI configs, etc)
- [ ] Non-code change (docs, etc)
- [ ] None of the above: (explain here)

## Test Plan

Added unit tests.
  • Loading branch information
rohanshah18 committed Oct 24, 2024
1 parent b341a05 commit 848d81b
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 4 deletions.
76 changes: 76 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,82 @@ RerankResult result = inference.rerank(model, query, documents, rankFields, topN
System.out.println(result.getData());
```

# Imports
## Start an import

The following example initiates an asynchronous import of vectors from object storage into the index.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ImportErrorMode;
import org.openapitools.db_data.client.model.StartImportResponse;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// s3 uri
String uri = "s3://path/to/file.parquet";

// Start an import
StartImportResponse response = asyncIndex.startImport(uri, "123-456-789", ImportErrorMode.OnErrorEnum.CONTINUE);
```

## List imports

The following example lists all recent and ongoing import operations for the specified index.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ListImportsResponse;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// List imports
ListImportsResponse response = asyncIndex.listImports(100, "some-pagination-token");
```

## Describe an import

The following example retrieves detailed information about a specific import operation using its unique identifier.

```java
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.model.ImportModel;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// Describe import
ImportModel importDetails = asyncIndex.describeImport("1");
```

## Cancel an import

The following example attempts to cancel an ongoing import operation using its unique identifier.

```java
import org.openapitools.db_data.client.ApiException;
...

// Initialize pinecone object
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
// Get async imports connection object
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");

// Cancel import
asyncIndex.cancelImport("2");
```

## Examples

- The data and control plane operation examples can be found in `io/pinecone/integration` folder.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.pinecone.clients.AsyncIndex;
import io.pinecone.clients.Index;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeValidationException;
import io.pinecone.proto.VectorServiceGrpc;
Expand All @@ -24,6 +25,7 @@ public class QueryErrorTest {

@BeforeAll
public static void setUp() throws IOException, InterruptedException {
PineconeConfig config = mock(PineconeConfig.class);
PineconeConnection connectionMock = mock(PineconeConnection.class);

VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
Expand All @@ -33,7 +35,7 @@ public static void setUp() throws IOException, InterruptedException {
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);

index = new Index(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.pinecone.clients.AsyncIndex;
import io.pinecone.clients.Index;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeException;
import io.pinecone.exceptions.PineconeValidationException;
Expand All @@ -27,6 +28,7 @@ public class UpsertErrorTest {

@BeforeAll
public static void setUp() throws IOException, InterruptedException {
PineconeConfig config = mock(PineconeConfig.class);
PineconeConnection connectionMock = mock(PineconeConnection.class);

VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
Expand All @@ -36,7 +38,7 @@ public static void setUp() throws IOException, InterruptedException {
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);

index = new Index(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
}

@Test
Expand Down
200 changes: 199 additions & 1 deletion src/main/java/io/pinecone/clients/AsyncIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,32 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Struct;
import io.pinecone.commons.IndexInterface;
import io.pinecone.configs.PineconeConfig;
import io.pinecone.configs.PineconeConnection;
import io.pinecone.exceptions.PineconeValidationException;
import io.pinecone.proto.*;
import io.pinecone.proto.DeleteRequest;
import io.pinecone.proto.DescribeIndexStatsRequest;
import io.pinecone.proto.FetchResponse;
import io.pinecone.proto.ListResponse;
import io.pinecone.proto.QueryRequest;
import io.pinecone.proto.QueryResponse;
import io.pinecone.proto.UpdateRequest;
import io.pinecone.proto.UpsertRequest;
import io.pinecone.proto.UpsertResponse;
import io.pinecone.unsigned_indices_model.QueryResponseWithUnsignedIndices;
import io.pinecone.unsigned_indices_model.VectorWithUnsignedIndices;
import okhttp3.OkHttpClient;
import org.openapitools.db_data.client.ApiClient;
import org.openapitools.db_data.client.ApiException;
import org.openapitools.db_data.client.Configuration;
import org.openapitools.db_data.client.api.BulkOperationsApi;
import org.openapitools.db_data.client.model.*;

import java.util.List;

import static io.pinecone.clients.Pinecone.buildOkHttpClient;


/**
* A client for interacting with a Pinecone index via GRPC asynchronously. Allows for upserting, querying, fetching, updating, and deleting vectors.
Expand All @@ -38,6 +56,7 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
private final PineconeConnection connection;
private final VectorServiceGrpc.VectorServiceFutureStub asyncStub;
private final String indexName;
BulkOperationsApi bulkOperations;

/**
* Constructs an {@link AsyncIndex} instance for interacting with a Pinecone index.
Expand All @@ -55,14 +74,24 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
* @param indexName The name of the index to interact with. The index host will be automatically resolved.
* @throws PineconeValidationException if the connection object is null.
*/
public AsyncIndex(PineconeConnection connection, String indexName) {
public AsyncIndex(PineconeConfig config, PineconeConnection connection, String indexName) {
if (connection == null) {
throw new PineconeValidationException("Pinecone connection object cannot be null.");
}

this.indexName = indexName;
this.connection = connection;
this.asyncStub = connection.getAsyncStub();

OkHttpClient customOkHttpClient = config.getCustomOkHttpClient();
ApiClient apiClient = (customOkHttpClient != null) ? new ApiClient(customOkHttpClient) : new ApiClient(buildOkHttpClient(config.getProxyConfig()));
apiClient.setApiKey(config.getApiKey());
apiClient.setUserAgent(config.getUserAgent());
apiClient.addDefaultHeader("X-Pinecone-Api-Version", Configuration.VERSION);

this.bulkOperations = new BulkOperationsApi(apiClient);
String protocol = config.isTLSEnabled() ? "https://" : "http://";
bulkOperations.setCustomBaseUrl(protocol + config.getHost());
}

/**
Expand Down Expand Up @@ -1039,6 +1068,175 @@ public ListenableFuture<ListResponse> list(String namespace, String prefix, Stri
return asyncStub.list(listRequest);
}

/**
* <p>Initiates an asynchronous import of vectors from object storage into a specified index.</p>
*
* <p>The method constructs a {@link StartImportRequest} using the provided URI for the data and optional
* storage integration ID. It also allows for specifying how to respond to errors during the import process
* through the {@link ImportErrorMode}. The import operation is then initiated via a call to the
* underlying {@link BulkOperationsApi}.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ImportErrorMode;
*
* ...
*
* String uri = "s3://path/to/file.parquet";
* String integrationId = "123-456-789";
* StartImportResponse response = asyncIndex.startImport(uri, integrationId, ImportErrorMode.OnErrorEnum.CONTINUE);
* }</pre>
*
* @param uri The URI prefix under which the data to import is available.
* @param integrationId The ID of the storage integration to access the data. Can be null or empty.
* @param errorMode Indicates how to respond to errors during the import process. Can be null.
* @return {@link StartImportResponse} containing the details of the initiated import operation.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public StartImportResponse startImport(String uri, String integrationId, ImportErrorMode.OnErrorEnum errorMode) throws ApiException {
StartImportRequest importRequest = new StartImportRequest();
importRequest.setUri(uri);
if(integrationId != null && !integrationId.isEmpty()) {
importRequest.setIntegrationId(integrationId);
}
if(errorMode != null) {
ImportErrorMode importErrorMode = new ImportErrorMode().onError(errorMode);
importRequest.setErrorMode(importErrorMode);
}

return bulkOperations.startBulkImport(importRequest);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index with default limit and pagination.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the default value set to 100
* number of operations to return per page. The pagination token is set to null as well by default.</p>
*
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
*
* ListImportsResponse response = asyncIndex.listImports();
* }</pre>
*
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports() throws ApiException {
return listImports(100, null);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index based on limit.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
* maximum number of operations to return per page. The pagination token is set to null by default.</p>
*
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
* int limit = 10;
* ListImportsResponse response = asyncIndex.listImports(limit);
* }</pre>
*
* @param limit The maximum number of operations to return per page. Default is 100.
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports(Integer limit) throws ApiException {
return listImports(limit, null);
}

/**
* <p>Lists all recent and ongoing import operations for the specified index.</p>
*
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
* maximum number of operations to return per page. The pagination token allows for
* deterministic pagination through the list of import operations.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ListImportsResponse;
*
* ...
* int limit = 10;
* String paginationToken = "some-pagination-token";
* ListImportsResponse response = asyncIndex.listImports(limit, paginationToken);
* }</pre>
*
* @param limit The maximum number of operations to return per page. Default is 100.
* @param paginationToken The token to continue a previous listing operation. Can be null or empty.
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ListImportsResponse listImports(Integer limit, String paginationToken) throws ApiException {
return bulkOperations.listBulkImports(limit, paginationToken);
}

/**
* <p>Retrieves detailed information about a specific import operation using its unique identifier.</p>
*
* <p>The method constructs a request to fetch details of the specified import operation by its ID,
* allowing users to monitor the status and results of the import process.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
* import org.openapitools.db_data.client.model.ImportModel;
*
* ...
*
* String importId = "1";
* ImportModel importDetails = asyncIndex.describeImport(importId);
* }</pre>
*
* @param id The unique identifier for the import operation.
* @return {@link ImportModel} containing details of the specified import operation.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public ImportModel describeImport(String id) throws ApiException {
return bulkOperations.describeBulkImport(id);
}

/**
* <p>Attempts to cancel an ongoing import operation using its unique identifier.</p>
*
* <p>The method issues a request to cancel the specified import operation if it has not yet finished.
* If the operation is already completed, the method has no effect.</p>
*
* <p>Example:
* <pre>{@code
* import org.openapitools.db_data.client.ApiException;
*
* ...
* String importId = "2";
* asyncIndex.cancelImport(importId);
* }</pre>
*
* @param id The unique identifier for the import operation to cancel.
* @throws ApiException if there are issues processing the request or communicating with the server.
* This includes network issues, server errors, or serialization issues with the request or response.
*/
public void cancelImport(String id) throws ApiException {
bulkOperations.cancelBulkImport(id);
}

/**
* {@inheritDoc}
* Closes the current index connection gracefully, releasing any resources associated with it. This method should
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/pinecone/clients/Pinecone.java
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ public AsyncIndex getAsyncIndexConnection(String indexName) throws PineconeValid

config.setHost(getIndexHost(indexName));
PineconeConnection connection = getConnection(indexName);
return new AsyncIndex(connection, indexName);
return new AsyncIndex(config, connection, indexName);
}

/**
Expand Down
Loading

0 comments on commit 848d81b

Please sign in to comment.