diff --git a/cli/storage_azure.go b/cli/storage_azure.go index 7fd1a92123c..1c9ec017fd1 100644 --- a/cli/storage_azure.go +++ b/cli/storage_azure.go @@ -2,8 +2,10 @@ package cli import ( "context" + "time" "github.com/alecthomas/kingpin/v2" + "github.com/pkg/errors" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/azure" @@ -25,11 +27,32 @@ func (c *storageAzureFlags) Setup(svc StorageProviderServices, cmd *kingpin.CmdC cmd.Flag("client-secret", "Azure service principle client secret (overrides AZURE_CLIENT_SECRET environment variable)").Envar(svc.EnvName("AZURE_CLIENT_SECRET")).StringVar(&c.azOptions.ClientSecret) commonThrottlingFlags(cmd, &c.azOptions.Limits) + + var pointInTimeStr string + + pitPreAction := func(pc *kingpin.ParseContext) error { + if pointInTimeStr != "" { + t, err := time.Parse(time.RFC3339, pointInTimeStr) + if err != nil { + return errors.Wrap(err, "invalid point-in-time argument") + } + + c.azOptions.PointInTime = &t + } + + return nil + } + + cmd.Flag("point-in-time", "Use a point-in-time view of the storage repository when supported").PlaceHolder(time.RFC3339).PreAction(pitPreAction).StringVar(&pointInTimeStr) } func (c *storageAzureFlags) Connect(ctx context.Context, isCreate bool, formatVersion int) (blob.Storage, error) { _ = formatVersion + if isCreate && c.azOptions.PointInTime != nil && !c.azOptions.PointInTime.IsZero() { + return nil, errors.New("Cannot specify a 'point-in-time' option when creating a repository") + } + //nolint:wrapcheck return azure.New(ctx, &c.azOptions, isCreate) } diff --git a/repo/blob/azure/azure_options.go b/repo/blob/azure/azure_options.go index cd55c1da9ad..80d11a504af 100644 --- a/repo/blob/azure/azure_options.go +++ b/repo/blob/azure/azure_options.go @@ -1,6 +1,8 @@ package azure import ( + "time" + "github.com/kopia/kopia/repo/blob/throttling" ) @@ -29,4 +31,7 @@ type Options struct { StorageDomain string `json:"storageDomain,omitempty"` throttling.Limits + + // PointInTime specifies a view of the (versioned) store at that time + PointInTime *time.Time `json:"pointInTime,omitempty"` } diff --git a/repo/blob/azure/azure_pit.go b/repo/blob/azure/azure_pit.go new file mode 100644 index 00000000000..9d3b8901a4f --- /dev/null +++ b/repo/blob/azure/azure_pit.go @@ -0,0 +1,201 @@ +package azure + +import ( + "context" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/readonly" + "github.com/kopia/kopia/repo/format" +) + +type azPointInTimeStorage struct { + azStorage + + pointInTime time.Time +} + +func (az *azPointInTimeStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error { + var ( + previousID blob.ID + vs []versionMetadata + ) + + err := az.listBlobVersions(ctx, blobIDPrefix, func(vm versionMetadata) error { + if vm.BlobID != previousID { + // different blob, process previous one + if v, found := newestAtUnlessDeleted(vs, az.pointInTime); found { + if err := cb(v.Metadata); err != nil { + return err + } + } + + previousID = vm.BlobID + vs = vs[:0] // reset for next blob + } + + vs = append(vs, vm) + + return nil + }) + if err != nil { + return errors.Wrapf(err, "could not list blob versions at time %s", az.pointInTime) + } + + // process last blob + if v, found := newestAtUnlessDeleted(vs, az.pointInTime); found { + if err := cb(v.Metadata); err != nil { + return err + } + } + + return nil +} + +func (az *azPointInTimeStorage) GetBlob(ctx context.Context, blobID blob.ID, offset, length int64, output blob.OutputBuffer) error { + // getMetadata returns the specific blob version at time t + m, err := az.getVersionedMetadata(ctx, blobID) + if err != nil { + return errors.Wrap(err, "getting metadata") + } + + return az.getBlobWithVersion(ctx, blobID, m.Version, offset, length, output) +} + +// newestAtUnlessDeleted returns the last version in the list older than the PIT. +// Azure sorts in ascending order so return the last element in the list. +func newestAtUnlessDeleted(vs []versionMetadata, t time.Time) (v versionMetadata, found bool) { + vs = getOlderThan(vs, t) + + if len(vs) == 0 { + return versionMetadata{}, false + } + + v = vs[len(vs)-1] + + return v, !v.IsDeleteMarker +} + +// Removes versions that are newer than t. The filtering is done in place +// and uses the same slice storage as vs. Assumes entries in vs are in ascending +// timestamp order (and version order), unlike S3 which assumes descending. +// Versions in Azure follow the time.RFC3339Nano syntax. +func getOlderThan(vs []versionMetadata, t time.Time) []versionMetadata { + for i := range vs { + if vs[i].Timestamp.After(t) { + return vs[:i] + } + + // The DeleteMarker blob takes the Timestamp of the previous version but has its own Version. + // If there was a Kopia Delete Marker (the blob was protected) it will be caught above but if + // the container has versioning enabled but no blob retention protection (or the blob was deleted outside + // of the protection window) then we need to check the time of the VersionID because there could be a situation + // where Azure's DeleteMarker version has Timestamp 2023-10-20 but Version 2023-10-27...then if PIT was 2023-10-22 the DeleteMarker + // would be returned without this extra test + if vs[i].IsDeleteMarker { + versionTime, err := time.Parse(time.RFC3339Nano, vs[i].Version) + if err != nil { + return nil + } + + if versionTime.After(t) { + return vs[:i] + } + } + } + + return vs +} + +// listBlobVersions returns a list of blob versions but the blob is deleted, it returns Azure's delete marker version but excludes +// the Kopia delete marker version that is used to get around immutability protections. +func (az *azPointInTimeStorage) listBlobVersions(ctx context.Context, prefix blob.ID, callback func(vm versionMetadata) error) error { + prefixStr := az.getObjectNameString(prefix) + + pager := az.service.NewListBlobsFlatPager(az.container, &azblob.ListBlobsFlatOptions{ + Prefix: &prefixStr, + Include: azblob.ListBlobsInclude{ + Metadata: true, + DeletedWithVersions: true, // this shows DeleteMarkers aka blobs with HasVersionsOnly set to true + Versions: true, + }, + }) + + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + return translateError(err) + } + + for _, it := range page.Segment.BlobItems { + vm := az.getVersionedBlobMeta(it) + + if err := callback(vm); err != nil { + return err + } + } + } + + return nil +} + +func (az *azPointInTimeStorage) getVersionedMetadata(ctx context.Context, blobID blob.ID) (versionMetadata, error) { + var vml []versionMetadata + + if err := az.getBlobVersions(ctx, blobID, func(vm versionMetadata) error { + if !vm.Timestamp.After(az.pointInTime) { + vml = append(vml, vm) + } + + return nil + }); err != nil { + return versionMetadata{}, errors.Wrapf(err, "could not get version metadata for blob %s", blobID) + } + + if v, found := newestAtUnlessDeleted(vml, az.pointInTime); found { + return v, nil + } + + return versionMetadata{}, blob.ErrBlobNotFound +} + +// isAzureDeleteMarker checks for Azure created delete markers. +func (az *azPointInTimeStorage) isAzureDeleteMarker(it *azblobmodels.BlobItem) bool { + var isDeleteMarker bool + // HasVersionsOnly - Indicates that this root blob has been deleted + if it.HasVersionsOnly != nil { + isDeleteMarker = *it.HasVersionsOnly + } + + return isDeleteMarker +} + +// maybePointInTimeStore wraps s with a point-in-time store when s is versioned +// and a point-in-time value is specified. Otherwise, s is returned. +func maybePointInTimeStore(ctx context.Context, s *azStorage, pointInTime *time.Time) (blob.Storage, error) { + if pit := s.Options.PointInTime; pit == nil || pit.IsZero() { + return s, nil + } + + // Versioning is needed for PIT. This check will fail if someone deleted the Kopia Repository file. + props, err := s.service.ServiceClient(). + NewContainerClient(s.container). + NewBlobClient(s.getObjectNameString(format.KopiaRepositoryBlobID)). + GetProperties(ctx, nil) + if err != nil { + return nil, errors.Wrapf(err, "could not get determine if container '%s' supports versioning", s.container) + } + + if props.VersionID == nil { + return nil, errors.Errorf("cannot create point-in-time view for non-versioned container '%s'", s.container) + } + + return readonly.NewWrapper(&azPointInTimeStorage{ + azStorage: *s, + pointInTime: *pointInTime, + }), nil +} diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index 6e81e8c23ae..374fd2b991e 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -4,6 +4,7 @@ package azure import ( "context" "fmt" + "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" @@ -13,6 +14,7 @@ import ( azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" azblockblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/pkg/errors" "github.com/kopia/kopia/internal/clock" @@ -25,7 +27,8 @@ import ( ) const ( - azStorageType = "azureBlob" + azStorageType = "azureBlob" + latestVersionID = "" timeMapKey = "Kopiamtime" // this must be capital letter followed by lowercase, to comply with AZ tags naming convention. ) @@ -39,6 +42,10 @@ type azStorage struct { } func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { + return az.getBlobWithVersion(ctx, b, latestVersionID, offset, length, output) +} + +func (az *azStorage) getBlobWithVersion(ctx context.Context, b blob.ID, versionID string, offset, length int64, output blob.OutputBuffer) error { if offset < 0 { return errors.Wrap(blob.ErrInvalidRange, "invalid offset") } @@ -56,7 +63,15 @@ func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int6 opt.Range.Count = l1 } - resp, err := az.service.DownloadStream(ctx, az.container, az.getObjectNameString(b), opt) + bc, err := az.service.ServiceClient(). + NewContainerClient(az.container). + NewBlobClient(az.getObjectNameString(b)). + WithVersionID(versionID) + if err != nil { + return errors.Wrap(err, "failed to get versioned blob client") + } + + resp, err := bc.DownloadStream(ctx, opt) if err != nil { return translateError(err) } @@ -71,7 +86,6 @@ func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int6 if err := iocopy.JustCopy(output, body); err != nil { return translateError(err) } - //nolint:wrapcheck return blob.EnsureLengthExactly(output.Length(), length) } @@ -215,6 +229,27 @@ func (az *azStorage) DisplayName() string { return fmt.Sprintf("Azure: %v", az.Options.Container) } +func (az *azStorage) getBlobName(it *azblobmodels.BlobItem) blob.ID { + n := *it.Name + return blob.ID(strings.TrimPrefix(n, az.Prefix)) +} + +func (az *azStorage) getBlobMeta(it *azblobmodels.BlobItem) blob.Metadata { + bm := blob.Metadata{ + BlobID: az.getBlobName(it), + Length: *it.Properties.ContentLength, + } + + // see if we have 'Kopiamtime' metadata, if so - trust it. + if t, ok := timestampmeta.FromValue(stringDefault(it.Metadata["kopiamtime"], "")); ok { + bm.Timestamp = t + } else { + bm.Timestamp = *it.Properties.LastModified + } + + return bm +} + func (az *azStorage) putBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) (azblockblob.UploadResponse, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -368,7 +403,12 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error) service: service, } - az := retrying.NewWrapper(raw) + st, err := maybePointInTimeStore(ctx, raw, opt.PointInTime) + if err != nil { + return nil, err + } + + az := retrying.NewWrapper(st) // verify Azure connection is functional by listing blobs in a bucket, which will fail if the container // does not exist. We list with a prefix that will not exist, to avoid iterating through any objects. diff --git a/repo/blob/azure/azure_storage_test.go b/repo/blob/azure/azure_storage_test.go index 78a162bf6ba..2c7ba9131e5 100644 --- a/repo/blob/azure/azure_storage_test.go +++ b/repo/blob/azure/azure_storage_test.go @@ -23,13 +23,17 @@ import ( ) const ( - testContainerEnv = "KOPIA_AZURE_TEST_CONTAINER" - testStorageAccountEnv = "KOPIA_AZURE_TEST_STORAGE_ACCOUNT" - testStorageKeyEnv = "KOPIA_AZURE_TEST_STORAGE_KEY" - testStorageSASTokenEnv = "KOPIA_AZURE_TEST_SAS_TOKEN" - testStorageTenantIDEnv = "KOPIA_AZURE_TEST_TENANT_ID" - testStorageClientIDEnv = "KOPIA_AZURE_TEST_CLIENT_ID" - testStorageClientSecretEnv = "KOPIA_AZURE_TEST_CLIENT_SECRET" + testContainerEnv = "KOPIA_AZURE_TEST_CONTAINER" + testStorageAccountEnv = "KOPIA_AZURE_TEST_STORAGE_ACCOUNT" + testStorageKeyEnv = "KOPIA_AZURE_TEST_STORAGE_KEY" + testStorageSASTokenEnv = "KOPIA_AZURE_TEST_SAS_TOKEN" + testImmutableContainerEnv = "KOPIA_AZURE_TEST_IMMUTABLE_CONTAINER" + testImmutableStorageAccountEnv = "KOPIA_AZURE_TEST_IMMUTABLE_STORAGE_ACCOUNT" + testImmutableStorageKeyEnv = "KOPIA_AZURE_TEST_IMMUTABLE_STORAGE_KEY" + testImmutableStorageSASTokenEnv = "KOPIA_AZURE_TEST_IMMUTABLE_SAS_TOKEN" + testStorageTenantIDEnv = "KOPIA_AZURE_TEST_TENANT_ID" + testStorageClientIDEnv = "KOPIA_AZURE_TEST_CLIENT_ID" + testStorageClientSecretEnv = "KOPIA_AZURE_TEST_CLIENT_SECRET" ) func getEnvOrSkip(t *testing.T, name string) string { @@ -119,7 +123,7 @@ func TestAzureStorage(t *testing.T) { Container: container, StorageAccount: storageAccount, StorageKey: storageKey, - Prefix: fmt.Sprintf("test-%v-%x-", clock.Now().Unix(), data), + Prefix: fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data), }, false) cancel() @@ -152,7 +156,7 @@ func TestAzureStorageSASToken(t *testing.T) { Container: container, StorageAccount: storageAccount, SASToken: sasToken, - Prefix: fmt.Sprintf("sastest-%v-%x-", clock.Now().Unix(), data), + Prefix: fmt.Sprintf("sastest-%v-%x/", clock.Now().Unix(), data), }, false) require.NoError(t, err) @@ -190,7 +194,7 @@ func TestAzureStorageClientSecret(t *testing.T) { TenantID: tenantID, ClientID: clientID, ClientSecret: clientSecret, - Prefix: fmt.Sprintf("sastest-%v-%x-", clock.Now().Unix(), data), + Prefix: fmt.Sprintf("sastest-%v-%x/", clock.Now().Unix(), data), }, false) require.NoError(t, err) @@ -270,3 +274,17 @@ func TestAzureStorageInvalidCreds(t *testing.T) { t.Errorf("unexpected success connecting to Azure blob storage, wanted error") } } + +func getBlobCount(ctx context.Context, t *testing.T, st blob.Storage, prefix blob.ID) int { + t.Helper() + + var count int + + err := st.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { + count++ + return nil + }) + require.NoError(t, err) + + return count +} diff --git a/repo/blob/azure/azure_versioned.go b/repo/blob/azure/azure_versioned.go new file mode 100644 index 00000000000..e3c1981b756 --- /dev/null +++ b/repo/blob/azure/azure_versioned.go @@ -0,0 +1,49 @@ +package azure + +import ( + "context" + + azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + + "github.com/kopia/kopia/repo/blob" +) + +// versionMetadata has metadata for a single BLOB version. +type versionMetadata struct { + blob.Metadata + + // Version has the format of time.RFC3339Nano + Version string + IsDeleteMarker bool +} + +type versionMetadataCallback func(versionMetadata) error + +func (az *azPointInTimeStorage) getVersionedBlobMeta(it *azblobmodels.BlobItem) versionMetadata { + bm := az.getBlobMeta(it) + + return versionMetadata{ + Metadata: bm, + Version: *it.VersionID, + IsDeleteMarker: az.isAzureDeleteMarker(it), + } +} + +// getBlobVersions lists all the versions for the blob with the given prefix. +func (az *azPointInTimeStorage) getBlobVersions(ctx context.Context, prefix blob.ID, callback versionMetadataCallback) error { + var foundBlobs bool + + if err := az.listBlobVersions(ctx, prefix, func(vm versionMetadata) error { + foundBlobs = true + + return callback(vm) + }); err != nil { + return err + } + + if !foundBlobs { + return blob.ErrBlobNotFound + } + + return nil +} diff --git a/repo/blob/azure/azure_versioned_test.go b/repo/blob/azure/azure_versioned_test.go new file mode 100644 index 00000000000..f1d36dbe518 --- /dev/null +++ b/repo/blob/azure/azure_versioned_test.go @@ -0,0 +1,227 @@ +package azure_test + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/azure" + "github.com/kopia/kopia/repo/format" +) + +func TestGetBlobVersions(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + // must be with Immutable Storage with Versioning enabled + container := getEnvOrSkip(t, testImmutableContainerEnv) + storageAccount := getEnvOrSkip(t, testImmutableStorageAccountEnv) + storageKey := getEnvOrSkip(t, testImmutableStorageKeyEnv) + + createContainer(t, container, storageAccount, storageKey) + + ctx := testlogging.Context(t) + data := make([]byte, 8) + rand.Read(data) + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + opts := &azure.Options{ + Container: container, + StorageAccount: storageAccount, + StorageKey: storageKey, + Prefix: prefix, + } + st, err := azure.New(newctx, opts, false) + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + // required for PIT versioning check + err = st.PutBlob(ctx, format.KopiaRepositoryBlobID, gather.FromSlice([]byte(nil)), blob.PutOptions{}) + require.NoError(t, err) + + const ( + originalData = "original" + updatedData = "some update" + latestData = "latest version" + ) + + dataBlobs := []string{originalData, updatedData, latestData} + + const blobName = "TestGetBlobVersions" + blobID := blob.ID(blobName) + dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs) + require.NoError(t, err) + + pastPIT := dataTimestamps[0].Add(-1 * time.Second) + futurePIT := dataTimestamps[2].Add(1 * time.Second) + + for _, tt := range []struct { + testName string + pointInTime *time.Time + expectedBlobData string + expectedError error + }{ + { + testName: "unset PIT", + pointInTime: nil, + expectedBlobData: latestData, + expectedError: nil, + }, + { + testName: "set in the future", + pointInTime: &futurePIT, + expectedBlobData: latestData, + expectedError: nil, + }, + { + testName: "set in the past", + pointInTime: &pastPIT, + expectedBlobData: "", + expectedError: blob.ErrBlobNotFound, + }, + { + testName: "original data", + pointInTime: &dataTimestamps[0], + expectedBlobData: originalData, + expectedError: nil, + }, + { + testName: "updated data", + pointInTime: &dataTimestamps[1], + expectedBlobData: updatedData, + expectedError: nil, + }, + { + testName: "latest data", + pointInTime: &dataTimestamps[2], + expectedBlobData: latestData, + expectedError: nil, + }, + } { + fmt.Printf("Running test: %s\n", tt.testName) + opts.PointInTime = tt.pointInTime + st, err = azure.New(ctx, opts, false) + require.NoError(t, err) + + var tmp gather.WriteBuffer + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.ErrorIs(t, err, tt.expectedError) + require.Equal(t, tt.expectedBlobData, string(tmp.ToByteSlice())) + } +} + +func TestGetBlobVersionsWithDeletion(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + // must be with Immutable Storage with Versioning enabled + container := getEnvOrSkip(t, testImmutableContainerEnv) + storageAccount := getEnvOrSkip(t, testImmutableStorageAccountEnv) + storageKey := getEnvOrSkip(t, testImmutableStorageKeyEnv) + + createContainer(t, container, storageAccount, storageKey) + + ctx := testlogging.Context(t) + data := make([]byte, 8) + rand.Read(data) + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + opts := &azure.Options{ + Container: container, + StorageAccount: storageAccount, + StorageKey: storageKey, + Prefix: prefix, + } + st, err := azure.New(newctx, opts, false) + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + // required for PIT versioning check + err = st.PutBlob(ctx, format.KopiaRepositoryBlobID, gather.FromSlice([]byte(nil)), blob.PutOptions{}) + require.NoError(t, err) + + const ( + originalData = "original" + updatedData = "some update" + ) + + dataBlobs := []string{originalData, updatedData} + + const blobName = "TestGetBlobVersionsWithDeletion" + blobID := blob.ID(blobName) + dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs) + require.NoError(t, err) + + count := getBlobCount(ctx, t, st, blobID) + require.Equal(t, 1, count) + + err = st.DeleteBlob(ctx, blobID) + require.NoError(t, err) + + // blob no longer found + count = getBlobCount(ctx, t, st, blobID) + require.Equal(t, 0, count) + + opts.PointInTime = &dataTimestamps[1] + st, err = azure.New(ctx, opts, false) + require.NoError(t, err) + + // blob visible again with PIT set. + count = getBlobCount(ctx, t, st, blobID) + require.Equal(t, 1, count) + + var tmp gather.WriteBuffer + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.NoError(t, err) + require.Equal(t, updatedData, string(tmp.ToByteSlice())) + + opts.PointInTime = &dataTimestamps[0] + st, err = azure.New(ctx, opts, false) + require.NoError(t, err) + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.NoError(t, err) + require.Equal(t, originalData, string(tmp.ToByteSlice())) +} + +func putBlobs(ctx context.Context, cli blob.Storage, blobID blob.ID, blobs []string) ([]time.Time, error) { + var putTimes []time.Time + + for _, b := range blobs { + if err := cli.PutBlob(ctx, blobID, gather.FromSlice([]byte(b)), blob.PutOptions{}); err != nil { + return nil, errors.Wrap(err, "putting blob") + } + + m, err := cli.GetMetadata(ctx, blobID) + if err != nil { + return nil, errors.Wrap(err, "getting metadata") + } + + putTimes = append(putTimes, m.Timestamp) + // sleep because granularity is 1 second and we should separate to show PIT views. + time.Sleep(1 * time.Second) + } + + return putTimes, nil +}