Skip to content

Commit

Permalink
706-Add: progress
Browse files Browse the repository at this point in the history
add info at the end
  • Loading branch information
cshintov committed May 26, 2024
1 parent c4a2798 commit 00386a9
Showing 1 changed file with 150 additions and 147 deletions.
297 changes: 150 additions & 147 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (

"github.com/cenkalti/backoff"
"github.com/dustin/go-humanize"
"github.com/k0kubun/go-ansi"
"github.com/miolini/datacounter"
"github.com/nightlyone/lockfile"
"golang.org/x/sync/errgroup"
"github.com/schollz/progressbar/v3"
"github.com/k0kubun/go-ansi"
"golang.org/x/sync/errgroup"

"github.com/someone1/zfsbackup-go/backends"
"github.com/someone1/zfsbackup-go/config"
Expand Down Expand Up @@ -419,16 +419,19 @@ func Backup(pctx context.Context, jobInfo *files.JobInfo) error {
} else {
fmt.Fprintf(
config.Stdout,
"Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d\n",
"Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d\n\tAverage Upload Rate: %s/TB\n",
jobInfo.ZFSStreamBytes,
humanize.IBytes(jobInfo.ZFSStreamBytes),
totalWrittenBytes,
humanize.IBytes(totalWrittenBytes),
time.Since(jobInfo.StartTime),
len(jobInfo.Volumes)+1,
humanize.IBytes(uint64(float64(totalWrittenBytes)/time.Since(jobInfo.StartTime).Hours()/1024)),
)
}

fmt.Printf("Backup of %s completed successfully.\n", jobInfo.VolumeName)

log.AppLogger.Debugf("Cleaning up resources...")

for _, backend := range usedBackends {
Expand Down Expand Up @@ -483,28 +486,28 @@ func saveManifest(ctx context.Context, j *files.JobInfo, final bool) (*files.Vol
// nolint:funlen,gocyclo // Difficult to break this apart

func sendStream(ctx context.Context, j *files.JobInfo, c chan<- *files.VolumeInfo, buffer <-chan bool) error {
var group *errgroup.Group
group, ctx = errgroup.WithContext(ctx)

buf := bytes.NewBuffer(nil)
cmd := zfs.GetZFSSendCommand(ctx, j)
cin, cout := io.Pipe()
cmd.Stdout = cout
cmd.Stderr = buf
counter := datacounter.NewReaderCounter(cin)
usingPipe := false
if j.MaxFileBuffer == 0 {
usingPipe = true
}

// Get total dataset size for progress tracking
totalSize, err := zfs.GetDatasetSize(ctx, j.VolumeName)
if err != nil {
return err
}

// Initialize progress bar
bar := progressbar.NewOptions64(int64(totalSize),
var group *errgroup.Group
group, ctx = errgroup.WithContext(ctx)

buf := bytes.NewBuffer(nil)
cmd := zfs.GetZFSSendCommand(ctx, j)
cin, cout := io.Pipe()
cmd.Stdout = cout
cmd.Stderr = buf
counter := datacounter.NewReaderCounter(cin)
usingPipe := false
if j.MaxFileBuffer == 0 {
usingPipe = true
}

// Get total dataset size for progress tracking
totalSize, err := zfs.GetDatasetSize(ctx, j.VolumeName)
if err != nil {
return err
}

// Initialize progress bar
bar := progressbar.NewOptions64(int64(totalSize),
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionShowBytes(true),
Expand All @@ -526,128 +529,128 @@ func sendStream(ctx context.Context, j *files.JobInfo, c chan<- *files.VolumeInf
}),
)

// Initialize chunk tracking variables
totalChunks := int(totalSize / (j.VolumeSize * humanize.MiByte))
var processedChunks int

group.Go(func() error {
var lastTotalBytes uint64
defer close(c)
var err error
var volume *files.VolumeInfo
skipBytes, volNum := j.TotalBytesStreamedAndVols()
lastTotalBytes = skipBytes
for {
// Skip bytes if we are resuming
if skipBytes > 0 {
log.AppLogger.Debugf("Want to skip %d bytes.", skipBytes)
written, serr := io.CopyN(ioutil.Discard, counter, int64(skipBytes))
if serr != nil && serr != io.EOF {
log.AppLogger.Errorf("Error while trying to read from the zfs stream to skip %d bytes - %v", skipBytes, serr)
return serr
}
skipBytes -= uint64(written)
log.AppLogger.Debugf("Skipped %d bytes of the ZFS send stream.", written)
continue
}

// Setup next Volume
if volume == nil || volume.Counter() >= (j.VolumeSize*humanize.MiByte)-50*humanize.KiByte {
if volume != nil {
log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName)
volume.ZFSStreamBytes = counter.Count() - lastTotalBytes
lastTotalBytes = counter.Count()
if err = volume.Close(); err != nil {
log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err)
return err
}
if !usingPipe {
c <- volume
}
processedChunks++
bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks))
}
<-buffer
volume, err = files.CreateBackupVolume(ctx, j, volNum)
if err != nil {
log.AppLogger.Errorf("Error while creating volume %d - %v", volNum, err)
return err
}
log.AppLogger.Debugf("Starting volume %s", volume.ObjectName)
volNum++
if usingPipe {
c <- volume
}
}

// Write a little at a time and break the output between volumes as needed
bytesWritten, ierr := io.CopyN(volume, counter, files.BufferSize*2)
if ierr == io.EOF {
// We are done!
log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName)
volume.ZFSStreamBytes = counter.Count() - lastTotalBytes
if err = volume.Close(); err != nil {
log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err)
return err
}
if !usingPipe {
c <- volume
}
processedChunks++
bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks))
return nil
} else if ierr != nil {
log.AppLogger.Errorf("Error while trying to read from the zfs stream for volume %s - %v", volume.ObjectName, ierr)
return ierr
}
// Update progress bar
bar.Add64(int64(bytesWritten))
}
})

// Start the zfs send command
log.AppLogger.Infof("Starting zfs send command: %s", strings.Join(cmd.Args, " "))
err = cmd.Start()
if err != nil {
log.AppLogger.Errorf("Error starting zfs command - %v", err)
return err
}

group.Go(func() error {
defer cout.Close()
return cmd.Wait()
})

defer func() {
if cmd.ProcessState == nil || !cmd.ProcessState.Exited() {
err = cmd.Process.Kill()
if err != nil {
log.AppLogger.Errorf("Could not kill zfs send command due to error - %v", err)
return
}
err = cmd.Process.Release()
if err != nil {
log.AppLogger.Errorf("Could not release resources from zfs send command due to error - %v", err)
return
}
}
}()

manifestmutex.Lock()
j.ZFSCommandLine = strings.Join(cmd.Args, " ")
manifestmutex.Unlock()
// Wait for the command to finish

err = group.Wait()
if err != nil {
log.AppLogger.Errorf("Error waiting for zfs command to finish - %v: %s", err, buf.String())
return err
}
log.AppLogger.Infof("zfs send completed without error")
manifestmutex.Lock()
j.ZFSStreamBytes = counter.Count()
manifestmutex.Unlock()
return nil
// Initialize chunk tracking variables
totalChunks := int(totalSize / (j.VolumeSize * humanize.MiByte))
var processedChunks int

group.Go(func() error {
var lastTotalBytes uint64
defer close(c)
var err error
var volume *files.VolumeInfo
skipBytes, volNum := j.TotalBytesStreamedAndVols()
lastTotalBytes = skipBytes
for {
// Skip bytes if we are resuming
if skipBytes > 0 {
log.AppLogger.Debugf("Want to skip %d bytes.", skipBytes)
written, serr := io.CopyN(ioutil.Discard, counter, int64(skipBytes))
if serr != nil && serr != io.EOF {
log.AppLogger.Errorf("Error while trying to read from the zfs stream to skip %d bytes - %v", skipBytes, serr)
return serr
}
skipBytes -= uint64(written)
log.AppLogger.Debugf("Skipped %d bytes of the ZFS send stream.", written)
continue
}

// Setup next Volume
if volume == nil || volume.Counter() >= (j.VolumeSize*humanize.MiByte)-50*humanize.KiByte {
if volume != nil {
log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName)
volume.ZFSStreamBytes = counter.Count() - lastTotalBytes
lastTotalBytes = counter.Count()
if err = volume.Close(); err != nil {
log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err)
return err
}
if !usingPipe {
c <- volume
}
processedChunks++
bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks))
}
<-buffer
volume, err = files.CreateBackupVolume(ctx, j, volNum)
if err != nil {
log.AppLogger.Errorf("Error while creating volume %d - %v", volNum, err)
return err
}
log.AppLogger.Debugf("Starting volume %s", volume.ObjectName)
volNum++
if usingPipe {
c <- volume
}
}

// Write a little at a time and break the output between volumes as needed
bytesWritten, ierr := io.CopyN(volume, counter, files.BufferSize*2)
if ierr == io.EOF {
// We are done!
log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName)
volume.ZFSStreamBytes = counter.Count() - lastTotalBytes
if err = volume.Close(); err != nil {
log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err)
return err
}
if !usingPipe {
c <- volume
}
processedChunks++
bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks))
return nil
} else if ierr != nil {
log.AppLogger.Errorf("Error while trying to read from the zfs stream for volume %s - %v", volume.ObjectName, ierr)
return ierr
}
// Update progress bar
bar.Add64(int64(bytesWritten))
}
})

// Start the zfs send command
log.AppLogger.Infof("Starting zfs send command: %s", strings.Join(cmd.Args, " "))
err = cmd.Start()
if err != nil {
log.AppLogger.Errorf("Error starting zfs command - %v", err)
return err
}

group.Go(func() error {
defer cout.Close()
return cmd.Wait()
})

defer func() {
if cmd.ProcessState == nil || !cmd.ProcessState.Exited() {
err = cmd.Process.Kill()
if err != nil {
log.AppLogger.Errorf("Could not kill zfs send command due to error - %v", err)
return
}
err = cmd.Process.Release()
if err != nil {
log.AppLogger.Errorf("Could not release resources from zfs send command due to error - %v", err)
return
}
}
}()

manifestmutex.Lock()
j.ZFSCommandLine = strings.Join(cmd.Args, " ")
manifestmutex.Unlock()
// Wait for the command to finish

err = group.Wait()
if err != nil {
log.AppLogger.Errorf("Error waiting for zfs command to finish - %v: %s", err, buf.String())
return err
}
log.AppLogger.Infof("zfs send completed without error")
manifestmutex.Lock()
j.ZFSStreamBytes = counter.Count()
manifestmutex.Unlock()
return nil
}

func tryResume(ctx context.Context, j *files.JobInfo) error {
Expand Down

0 comments on commit 00386a9

Please sign in to comment.