Skip to content

Commit

Permalink
Merge 10.3.4 master changes into dev (#883)
Browse files Browse the repository at this point in the history
* Enable job cleanup on windows by unmapping files and closing logs during a list call. (#840)

* Unmap memory mapped files during list call; unmap unused mmfs.

* Allow JobPartPlan to reuse an existing MMF to avoid leaking

(cherry picked from commit e9c4b4e)

* Added new option ifSourceNewer for overwrite flag (#836)

* Added new option ifSourceNewer for overwrite flag

(cherry picked from commit 77d5f8d , with manual fixups)

* Updated changelog

(cherry picked from commit 784dc44)

* Don't attempt to close the logs if the log level is none (#845)

They were never opened in the first place.

(cherry picked from commit 7eb7b49)

* Change expected counts, from JSON, to strings
  • Loading branch information
JohnRusk authored Mar 5, 2020
1 parent 451254c commit 486c7a2
Show file tree
Hide file tree
Showing 20 changed files with 216 additions and 28 deletions.
11 changes: 10 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@

# Change Log

## Version 10.4

### New features
Expand All @@ -24,6 +23,16 @@
for when files are skipped and for when the setting of folder properties is skipped. This affects the input and
output of `azcopy jobs show` and the status values shown in the JSON output format.

## Version 10.3.4

### New features

1. Fixed feature parity issue by adding support for "ifSourceNewer" option on the `overwrite` flag. It serves as a replacement of the '\XO' flag in V8.

### Bug fixes

1. Fixed `jobs clean` command on Windows which was previously crashing when the `with-status` flag was used.

## Version 10.3.3

### New features
Expand Down
2 changes: 1 addition & 1 deletion cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@ func init() {
// This flag is implemented only for Storage Explorer.
cpCmd.PersistentFlags().StringVar(&raw.listOfFilesToCopy, "list-of-files", "", "Defines the location of text file which has the list of only files to be copied.")
cpCmd.PersistentFlags().StringVar(&raw.exclude, "exclude-pattern", "", "Exclude these files when copying. This option supports wildcard characters (*)")
cpCmd.PersistentFlags().StringVar(&raw.forceWrite, "overwrite", "true", "Overwrite the conflicting files and blobs at the destination if this flag is set to true. (default 'true') Possible values include 'true', 'false', and 'prompt'. For destinations that support folders, any conflicting folder-level properties will be overwritten only if this flag is 'true'.")
cpCmd.PersistentFlags().StringVar(&raw.forceWrite, "overwrite", "true", "Overwrite the conflicting files and blobs at the destination if this flag is set to true. (default 'true') Possible values include 'true', 'false', 'prompt', and 'ifSourceNewer'. For destinations that support folders, any conflicting folder-level properties will be overwritten only if this flag is 'true'.")
cpCmd.PersistentFlags().BoolVar(&raw.autoDecompress, "decompress", false, "Automatically decompress files when downloading, if their content-encoding indicates that they are compressed. The supported content-encoding values are 'gzip' and 'deflate'. File extensions of '.gz'/'.gzip' or '.zz' aren't necessary, but will be removed if present.")
cpCmd.PersistentFlags().BoolVar(&raw.recursive, "recursive", false, "Look into sub-directories recursively when uploading from local file system.")
cpCmd.PersistentFlags().StringVar(&raw.fromTo, "from-to", "", "Optionally specifies the source destination combination. For Example: LocalBlob, BlobLocal, LocalBlobFS.")
Expand Down
3 changes: 2 additions & 1 deletion cmd/jobsClean.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
"fmt"
"strings"

"github.com/Azure/azure-storage-azcopy/common"
"github.com/spf13/cobra"

"github.com/Azure/azure-storage-azcopy/common"
)

func init() {
Expand Down
7 changes: 4 additions & 3 deletions common/fe-ste-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,10 @@ var EOverwriteOption = OverwriteOption(0)

type OverwriteOption uint8

func (OverwriteOption) True() OverwriteOption { return OverwriteOption(0) }
func (OverwriteOption) False() OverwriteOption { return OverwriteOption(1) }
func (OverwriteOption) Prompt() OverwriteOption { return OverwriteOption(2) }
func (OverwriteOption) True() OverwriteOption { return OverwriteOption(0) }
func (OverwriteOption) False() OverwriteOption { return OverwriteOption(1) }
func (OverwriteOption) Prompt() OverwriteOption { return OverwriteOption(2) }
func (OverwriteOption) IfSourceNewer() OverwriteOption { return OverwriteOption(3) }

func (o *OverwriteOption) Parse(s string) error {
val, err := enum.Parse(reflect.TypeOf(o), s, true)
Expand Down
1 change: 1 addition & 0 deletions common/folderCreationTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (f *simpleFolderTracker) ShouldSetProperties(folder string, overwrite Overw
case EOverwriteOption.True():
return true
case EOverwriteOption.Prompt(), // "prompt" is treated as "false" because otherwise we'd have to display, and maintain state for, two different prompts - one for folders and one for files, since its too hard to find wording for ONE prompt to cover both cases. (And having two prompts would confuse users).
EOverwriteOption.IfSourceNewer(), // likewise "if source newer" is treated as "false"
EOverwriteOption.False():

f.mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions common/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (jl *jobLogger) ShouldLog(level pipeline.LogLevel) bool {
}

func (jl *jobLogger) CloseLog() {
if jl.minimumLevelToLog == pipeline.LogNone {
return
}

jl.logger.Println("Closing Log")
err := jl.file.Close()
PanicIfErr(err)
Expand Down
2 changes: 1 addition & 1 deletion common/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package common

const AzcopyVersion = "10.3.3"
const AzcopyVersion = "10.3.4"
const UserAgent = "AzCopy/" + AzcopyVersion
const S3ImportUserAgent = "S3Import " + UserAgent
const BenchmarkUserAgent = "Benchmark " + UserAgent
4 changes: 2 additions & 2 deletions ste/JobsAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (ja *jobsAdmin) ResurrectJob(jobId common.JobID, sourceSAS string, destinat
}
mmf := planFile.Map()
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
jm.AddJobPart(partNum, planFile, sourceSAS, destinationSAS, false)
jm.AddJobPart(partNum, planFile, mmf, sourceSAS, destinationSAS, false)
}
return true
}
Expand Down Expand Up @@ -665,7 +665,7 @@ func (ja *jobsAdmin) ResurrectJobParts() {
mmf := planFile.Map()
//todo : call the compute transfer function here for each job.
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
jm.AddJobPart(partNum, planFile, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false)
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false)
}
}

Expand Down
10 changes: 9 additions & 1 deletion ste/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-azcopy/common"
)

Expand Down Expand Up @@ -151,7 +152,8 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
InMemoryTransitJobState{
credentialInfo: order.CredentialInfo,
})
jpm.AddJobPart(order.PartNum, jppfn, order.SourceSAS, order.DestinationSAS, true) // Add this part to the Job and schedule its transfers
// Supply no plan MMF because we don't have one, and AddJobPart will create one on its own.
jpm.AddJobPart(order.PartNum, jppfn, nil, order.SourceSAS, order.DestinationSAS, true) // Add this part to the Job and schedule its transfers
return common.CopyJobPartOrderResponse{JobStarted: true}
}

Expand Down Expand Up @@ -607,6 +609,12 @@ func ListJobs() common.ListJobsResponse {
listJobResponse.JobIDDetails = append(listJobResponse.JobIDDetails,
common.JobIDDetails{JobId: jobId, CommandString: jpm.Plan().CommandString(),
StartTime: jpm.Plan().StartTime, JobStatus: jpm.Plan().JobStatus()})

// Close the job part managers and the log.
jm.(*jobMgr).jobPartMgrs.Iterate(false, func(k common.PartNumber, v IJobPartMgr) {
v.Close()
})
jm.CloseLog()
}
return listJobResponse
}
Expand Down
13 changes: 10 additions & 3 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type IJobMgr interface {
JobID() common.JobID
JobPartMgr(partNum PartNumber) (IJobPartMgr, bool)
//Throughput() XferThroughput
AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, sourceSAS string,
// If existingPlanMMF is nil, a new MMF is opened.
AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool) IJobPartMgr
SetIncludeExclude(map[string]int, map[string]int)
IncludeExclude() (map[string]int, map[string]int)
Expand Down Expand Up @@ -317,14 +318,20 @@ func (jm *jobMgr) TryGetPerformanceAdvice(bytesInJob uint64, filesInJob uint32,
}

// initializeJobPartPlanInfo func initializes the JobPartPlanInfo handler for given JobPartOrder
func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, sourceSAS string,
func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool) IJobPartMgr {
jpm := &jobPartMgr{jobMgr: jm, filename: planFile, sourceSAS: sourceSAS,
destinationSAS: destinationSAS, pacer: JobsAdmin.(*jobsAdmin).pacer,
slicePool: JobsAdmin.(*jobsAdmin).slicePool,
cacheLimiter: JobsAdmin.(*jobsAdmin).cacheLimiter,
fileCountLimiter: JobsAdmin.(*jobsAdmin).fileCountLimiter}
jpm.planMMF = jpm.filename.Map()
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
if existingPlanMMF == nil {
jpm.planMMF = jpm.filename.Map()
} else {
jpm.planMMF = existingPlanMMF
}

jm.jobPartMgrs.Set(partNum, jpm)
jm.setFinalPartOrdered(partNum, jpm.planMMF.Plan().IsFinalPart)
jm.setDirection(jpm.Plan().FromTo)
Expand Down
13 changes: 9 additions & 4 deletions ste/remoteObjectExists.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,30 @@ package ste

import (
"net/http"
"time"
)

// an error with an HTTP Response
type responseError interface {
Response() *http.Response
}

type lastModifiedTimerProvider interface {
LastModified() time.Time
}

// remoteObjectExists takes the error returned when trying to access a remote object, sees whether is
// a "not found" error. If the object exists (i.e. error is nil) it returns (true, nil). If the
// error is a "not found" error, it returns (false, nil). Else it returns false and the original error.
// The initial, dummy, parameter, is to allow callers to conveniently call it with functions that return a tuple
// - even though we only need the error.
func remoteObjectExists(_ interface{}, errWhenAccessingRemoteObject error) (bool, error) {
func remoteObjectExists(props lastModifiedTimerProvider, errWhenAccessingRemoteObject error) (bool, time.Time, error) {

if typedErr, ok := errWhenAccessingRemoteObject.(responseError); ok && typedErr.Response().StatusCode == http.StatusNotFound {
return false, nil // 404 error, so it does NOT exist
return false, time.Time{}, nil // 404 error, so it does NOT exist
} else if errWhenAccessingRemoteObject != nil {
return false, errWhenAccessingRemoteObject // some other error happened, so we return it
return false, time.Time{}, errWhenAccessingRemoteObject // some other error happened, so we return it
} else {
return true, nil // If err equals nil, the file exists
return true, props.LastModified(), nil // If err equals nil, the file exists
}
}
2 changes: 1 addition & 1 deletion ste/sender-appendBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *appendBlobSenderBase) NumChunks() uint32 {
return s.numChunks
}

func (s *appendBlobSenderBase) RemoteFileExists() (bool, error) {
func (s *appendBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
return remoteObjectExists(s.destAppendBlobURL.GetProperties(s.jptm.Context(), azblob.BlobAccessConditions{}))
}

Expand Down
2 changes: 1 addition & 1 deletion ste/sender-azureFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (u *azureFileSenderBase) NumChunks() uint32 {
return u.numChunks
}

func (u *azureFileSenderBase) RemoteFileExists() (bool, error) {
func (u *azureFileSenderBase) RemoteFileExists() (bool, time.Time, error) {
return remoteObjectExists(u.fileURL().GetProperties(u.ctx))
}

Expand Down
28 changes: 26 additions & 2 deletions ste/sender-blobFS.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,32 @@ func (u *blobFSSenderBase) NumChunks() uint32 {
return u.numChunks
}

func (u *blobFSSenderBase) RemoteFileExists() (bool, error) {
return remoteObjectExists(u.fileURL().GetProperties(u.jptm.Context()))
// simply provides the parse lmt from the path properties
// TODO it's not the best solution as usually the SDK should provide the time in parsed format already
type blobFSLastModifiedTimeProvider struct {
lmt time.Time
}

func (b blobFSLastModifiedTimeProvider) LastModified() time.Time {
return b.lmt
}

func newBlobFSLastModifiedTimeProvider(props *azbfs.PathGetPropertiesResponse) blobFSLastModifiedTimeProvider {
var lmt time.Time
// parse the lmt if the props is not empty
if props != nil {
parsedLmt, err := time.Parse(time.RFC1123, props.LastModified())
if err == nil {
lmt = parsedLmt
}
}

return blobFSLastModifiedTimeProvider{lmt: lmt}
}

func (u *blobFSSenderBase) RemoteFileExists() (bool, time.Time, error) {
props, err := u.fileURL().GetProperties(u.jptm.Context())
return remoteObjectExists(newBlobFSLastModifiedTimeProvider(props), err)
}

func (u *blobFSSenderBase) Prologue(state common.PrologueState) (destinationModified bool) {
Expand Down
2 changes: 1 addition & 1 deletion ste/sender-blockBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *blockBlobSenderBase) NumChunks() uint32 {
return s.numChunks
}

func (s *blockBlobSenderBase) RemoteFileExists() (bool, error) {
func (s *blockBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
return remoteObjectExists(s.destBlockBlobURL.GetProperties(s.jptm.Context(), azblob.BlobAccessConditions{}))
}

Expand Down
2 changes: 1 addition & 1 deletion ste/sender-pageBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *pageBlobSenderBase) NumChunks() uint32 {
return s.numChunks
}

func (s *pageBlobSenderBase) RemoteFileExists() (bool, error) {
func (s *pageBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
return remoteObjectExists(s.destPageBlobURL.GetProperties(s.jptm.Context(), azblob.BlobAccessConditions{}))
}

Expand Down
6 changes: 4 additions & 2 deletions ste/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package ste

import (
"errors"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
Expand Down Expand Up @@ -51,8 +52,9 @@ type IFileSender interface {
// NumChunks returns the number of chunks that will be required for the target file
NumChunks() uint32

// RemoteFileExists is called to see whether the file already exists at the remote location
RemoteFileExists() (bool, error)
// RemoteFileExists is called to see whether the file already exists at the remote location (so we know whether we'll be overwriting it)
// the lmt is returned if the file exists
RemoteFileExists() (bool, time.Time, error)

// Prologue is called automatically before the first chunkFunc is generated.
// Implementation should do any initialization that is necessary - e.g.
Expand Down
9 changes: 7 additions & 2 deletions ste/xfer-anyToRemote-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func anyToRemote_file(jptm IJobPartTransferMgr, info TransferInfo, p pipeline.Pi
// then check the file exists at the remote location
// if it does, react accordingly
if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
exists, existenceErr := s.RemoteFileExists()
exists, dstLmt, existenceErr := s.RemoteFileExists()
if existenceErr != nil {
jptm.LogSendError(info.Source, info.Destination, "Could not check file existence. "+existenceErr.Error(), 0)
jptm.LogSendError(info.Source, info.Destination, "Could not check destination file existence. "+existenceErr.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed()) // is a real failure, not just a SkippedFileAlreadyExists, in this case
jptm.ReportTransferDone()
return
Expand All @@ -109,6 +109,11 @@ func anyToRemote_file(jptm IJobPartTransferMgr, info TransferInfo, p pipeline.Pi
parsed, _ := url.Parse(info.Destination)
parsed.RawQuery = ""
shouldOverwrite = jptm.GetOverwritePrompter().shouldOverwrite(parsed.String())
} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
// only overwrite if source lmt is newer (after) the destination
if jptm.LastModifiedTime().After(dstLmt) {
shouldOverwrite = true
}
}

if !shouldOverwrite {
Expand Down
7 changes: 6 additions & 1 deletion ste/xfer-remoteToLocal-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ func remoteToLocal_file(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pac
// then check the file exists at the remote location
// if it does, react accordingly
if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
_, err := os.Stat(info.Destination)
dstProps, err := os.Stat(info.Destination)
if err == nil {
// if the error is nil, then file exists locally
shouldOverwrite := false

// if necessary, prompt to confirm user's intent
if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
shouldOverwrite = jptm.GetOverwritePrompter().shouldOverwrite(info.Destination)
} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
// only overwrite if source lmt is newer (after) the destination
if jptm.LastModifiedTime().After(dstProps.ModTime()) {
shouldOverwrite = true
}
}

if !shouldOverwrite {
Expand Down
Loading

0 comments on commit 486c7a2

Please sign in to comment.