Skip to content

Commit

Permalink
feat(repository): Add Azure PIT support (kopia#3407)
Browse files Browse the repository at this point in the history
  • Loading branch information
KastenMike authored Nov 28, 2023
1 parent 8eee29a commit d4a380f
Show file tree
Hide file tree
Showing 7 changed files with 577 additions and 14 deletions.
23 changes: 23 additions & 0 deletions cli/storage_azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cli

import (
"context"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/pkg/errors"

"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/azure"
Expand All @@ -25,11 +27,32 @@ func (c *storageAzureFlags) Setup(svc StorageProviderServices, cmd *kingpin.CmdC
cmd.Flag("client-secret", "Azure service principle client secret (overrides AZURE_CLIENT_SECRET environment variable)").Envar(svc.EnvName("AZURE_CLIENT_SECRET")).StringVar(&c.azOptions.ClientSecret)

commonThrottlingFlags(cmd, &c.azOptions.Limits)

var pointInTimeStr string

pitPreAction := func(pc *kingpin.ParseContext) error {
if pointInTimeStr != "" {
t, err := time.Parse(time.RFC3339, pointInTimeStr)
if err != nil {
return errors.Wrap(err, "invalid point-in-time argument")
}

c.azOptions.PointInTime = &t
}

return nil
}

cmd.Flag("point-in-time", "Use a point-in-time view of the storage repository when supported").PlaceHolder(time.RFC3339).PreAction(pitPreAction).StringVar(&pointInTimeStr)
}

func (c *storageAzureFlags) Connect(ctx context.Context, isCreate bool, formatVersion int) (blob.Storage, error) {
_ = formatVersion

if isCreate && c.azOptions.PointInTime != nil && !c.azOptions.PointInTime.IsZero() {
return nil, errors.New("Cannot specify a 'point-in-time' option when creating a repository")
}

//nolint:wrapcheck
return azure.New(ctx, &c.azOptions, isCreate)
}
5 changes: 5 additions & 0 deletions repo/blob/azure/azure_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package azure

import (
"time"

"github.com/kopia/kopia/repo/blob/throttling"
)

Expand Down Expand Up @@ -29,4 +31,7 @@ type Options struct {
StorageDomain string `json:"storageDomain,omitempty"`

throttling.Limits

// PointInTime specifies a view of the (versioned) store at that time
PointInTime *time.Time `json:"pointInTime,omitempty"`
}
201 changes: 201 additions & 0 deletions repo/blob/azure/azure_pit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package azure

import (
"context"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/pkg/errors"

"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/readonly"
"github.com/kopia/kopia/repo/format"
)

type azPointInTimeStorage struct {
azStorage

pointInTime time.Time
}

func (az *azPointInTimeStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error {
var (
previousID blob.ID
vs []versionMetadata
)

err := az.listBlobVersions(ctx, blobIDPrefix, func(vm versionMetadata) error {
if vm.BlobID != previousID {
// different blob, process previous one
if v, found := newestAtUnlessDeleted(vs, az.pointInTime); found {
if err := cb(v.Metadata); err != nil {
return err
}
}

previousID = vm.BlobID
vs = vs[:0] // reset for next blob
}

vs = append(vs, vm)

return nil
})
if err != nil {
return errors.Wrapf(err, "could not list blob versions at time %s", az.pointInTime)
}

// process last blob
if v, found := newestAtUnlessDeleted(vs, az.pointInTime); found {
if err := cb(v.Metadata); err != nil {
return err
}
}

return nil
}

func (az *azPointInTimeStorage) GetBlob(ctx context.Context, blobID blob.ID, offset, length int64, output blob.OutputBuffer) error {
// getMetadata returns the specific blob version at time t
m, err := az.getVersionedMetadata(ctx, blobID)
if err != nil {
return errors.Wrap(err, "getting metadata")
}

return az.getBlobWithVersion(ctx, blobID, m.Version, offset, length, output)
}

// newestAtUnlessDeleted returns the last version in the list older than the PIT.
// Azure sorts in ascending order so return the last element in the list.
func newestAtUnlessDeleted(vs []versionMetadata, t time.Time) (v versionMetadata, found bool) {
vs = getOlderThan(vs, t)

if len(vs) == 0 {
return versionMetadata{}, false
}

v = vs[len(vs)-1]

return v, !v.IsDeleteMarker
}

// Removes versions that are newer than t. The filtering is done in place
// and uses the same slice storage as vs. Assumes entries in vs are in ascending
// timestamp order (and version order), unlike S3 which assumes descending.
// Versions in Azure follow the time.RFC3339Nano syntax.
func getOlderThan(vs []versionMetadata, t time.Time) []versionMetadata {
for i := range vs {
if vs[i].Timestamp.After(t) {
return vs[:i]
}

// The DeleteMarker blob takes the Timestamp of the previous version but has its own Version.
// If there was a Kopia Delete Marker (the blob was protected) it will be caught above but if
// the container has versioning enabled but no blob retention protection (or the blob was deleted outside
// of the protection window) then we need to check the time of the VersionID because there could be a situation
// where Azure's DeleteMarker version has Timestamp 2023-10-20 but Version 2023-10-27...then if PIT was 2023-10-22 the DeleteMarker
// would be returned without this extra test
if vs[i].IsDeleteMarker {
versionTime, err := time.Parse(time.RFC3339Nano, vs[i].Version)
if err != nil {
return nil
}

if versionTime.After(t) {
return vs[:i]
}
}
}

return vs
}

// listBlobVersions returns a list of blob versions but the blob is deleted, it returns Azure's delete marker version but excludes
// the Kopia delete marker version that is used to get around immutability protections.
func (az *azPointInTimeStorage) listBlobVersions(ctx context.Context, prefix blob.ID, callback func(vm versionMetadata) error) error {
prefixStr := az.getObjectNameString(prefix)

pager := az.service.NewListBlobsFlatPager(az.container, &azblob.ListBlobsFlatOptions{
Prefix: &prefixStr,
Include: azblob.ListBlobsInclude{
Metadata: true,
DeletedWithVersions: true, // this shows DeleteMarkers aka blobs with HasVersionsOnly set to true
Versions: true,
},
})

for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return translateError(err)
}

for _, it := range page.Segment.BlobItems {
vm := az.getVersionedBlobMeta(it)

if err := callback(vm); err != nil {
return err
}
}
}

return nil
}

func (az *azPointInTimeStorage) getVersionedMetadata(ctx context.Context, blobID blob.ID) (versionMetadata, error) {
var vml []versionMetadata

if err := az.getBlobVersions(ctx, blobID, func(vm versionMetadata) error {
if !vm.Timestamp.After(az.pointInTime) {
vml = append(vml, vm)
}

return nil
}); err != nil {
return versionMetadata{}, errors.Wrapf(err, "could not get version metadata for blob %s", blobID)
}

if v, found := newestAtUnlessDeleted(vml, az.pointInTime); found {
return v, nil
}

return versionMetadata{}, blob.ErrBlobNotFound
}

// isAzureDeleteMarker checks for Azure created delete markers.
func (az *azPointInTimeStorage) isAzureDeleteMarker(it *azblobmodels.BlobItem) bool {
var isDeleteMarker bool
// HasVersionsOnly - Indicates that this root blob has been deleted
if it.HasVersionsOnly != nil {
isDeleteMarker = *it.HasVersionsOnly
}

return isDeleteMarker
}

// maybePointInTimeStore wraps s with a point-in-time store when s is versioned
// and a point-in-time value is specified. Otherwise, s is returned.
func maybePointInTimeStore(ctx context.Context, s *azStorage, pointInTime *time.Time) (blob.Storage, error) {
if pit := s.Options.PointInTime; pit == nil || pit.IsZero() {
return s, nil
}

// Versioning is needed for PIT. This check will fail if someone deleted the Kopia Repository file.
props, err := s.service.ServiceClient().
NewContainerClient(s.container).
NewBlobClient(s.getObjectNameString(format.KopiaRepositoryBlobID)).
GetProperties(ctx, nil)
if err != nil {
return nil, errors.Wrapf(err, "could not get determine if container '%s' supports versioning", s.container)
}

if props.VersionID == nil {
return nil, errors.Errorf("cannot create point-in-time view for non-versioned container '%s'", s.container)
}

return readonly.NewWrapper(&azPointInTimeStorage{
azStorage: *s,
pointInTime: *pointInTime,
}), nil
}
48 changes: 44 additions & 4 deletions repo/blob/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package azure
import (
"context"
"fmt"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand All @@ -13,6 +14,7 @@ import (
azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
azblockblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/pkg/errors"

"github.com/kopia/kopia/internal/clock"
Expand All @@ -25,7 +27,8 @@ import (
)

const (
azStorageType = "azureBlob"
azStorageType = "azureBlob"
latestVersionID = ""

timeMapKey = "Kopiamtime" // this must be capital letter followed by lowercase, to comply with AZ tags naming convention.
)
Expand All @@ -39,6 +42,10 @@ type azStorage struct {
}

func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error {
return az.getBlobWithVersion(ctx, b, latestVersionID, offset, length, output)
}

func (az *azStorage) getBlobWithVersion(ctx context.Context, b blob.ID, versionID string, offset, length int64, output blob.OutputBuffer) error {
if offset < 0 {
return errors.Wrap(blob.ErrInvalidRange, "invalid offset")
}
Expand All @@ -56,7 +63,15 @@ func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int6
opt.Range.Count = l1
}

resp, err := az.service.DownloadStream(ctx, az.container, az.getObjectNameString(b), opt)
bc, err := az.service.ServiceClient().
NewContainerClient(az.container).
NewBlobClient(az.getObjectNameString(b)).
WithVersionID(versionID)
if err != nil {
return errors.Wrap(err, "failed to get versioned blob client")
}

resp, err := bc.DownloadStream(ctx, opt)
if err != nil {
return translateError(err)
}
Expand All @@ -71,7 +86,6 @@ func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int6
if err := iocopy.JustCopy(output, body); err != nil {
return translateError(err)
}

//nolint:wrapcheck
return blob.EnsureLengthExactly(output.Length(), length)
}
Expand Down Expand Up @@ -215,6 +229,27 @@ func (az *azStorage) DisplayName() string {
return fmt.Sprintf("Azure: %v", az.Options.Container)
}

func (az *azStorage) getBlobName(it *azblobmodels.BlobItem) blob.ID {
n := *it.Name
return blob.ID(strings.TrimPrefix(n, az.Prefix))
}

func (az *azStorage) getBlobMeta(it *azblobmodels.BlobItem) blob.Metadata {
bm := blob.Metadata{
BlobID: az.getBlobName(it),
Length: *it.Properties.ContentLength,
}

// see if we have 'Kopiamtime' metadata, if so - trust it.
if t, ok := timestampmeta.FromValue(stringDefault(it.Metadata["kopiamtime"], "")); ok {
bm.Timestamp = t
} else {
bm.Timestamp = *it.Properties.LastModified
}

return bm
}

func (az *azStorage) putBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) (azblockblob.UploadResponse, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -368,7 +403,12 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error)
service: service,
}

az := retrying.NewWrapper(raw)
st, err := maybePointInTimeStore(ctx, raw, opt.PointInTime)
if err != nil {
return nil, err
}

az := retrying.NewWrapper(st)

// verify Azure connection is functional by listing blobs in a bucket, which will fail if the container
// does not exist. We list with a prefix that will not exist, to avoid iterating through any objects.
Expand Down
Loading

0 comments on commit d4a380f

Please sign in to comment.