From 00386a961918253dfac464df29e82e8780c630e2 Mon Sep 17 00:00:00 2001 From: Shinto C V Date: Sun, 26 May 2024 12:32:05 +0530 Subject: [PATCH] 706-Add: progress add info at the end --- backup/backup.go | 297 ++++++++++++++++++++++++----------------------- 1 file changed, 150 insertions(+), 147 deletions(-) diff --git a/backup/backup.go b/backup/backup.go index e21b88d..904e20f 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -38,11 +38,11 @@ import ( "github.com/cenkalti/backoff" "github.com/dustin/go-humanize" + "github.com/k0kubun/go-ansi" "github.com/miolini/datacounter" "github.com/nightlyone/lockfile" - "golang.org/x/sync/errgroup" "github.com/schollz/progressbar/v3" - "github.com/k0kubun/go-ansi" + "golang.org/x/sync/errgroup" "github.com/someone1/zfsbackup-go/backends" "github.com/someone1/zfsbackup-go/config" @@ -419,16 +419,19 @@ func Backup(pctx context.Context, jobInfo *files.JobInfo) error { } else { fmt.Fprintf( config.Stdout, - "Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d\n", + "Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d\n\tAverage Upload Rate: %s/TB\n", jobInfo.ZFSStreamBytes, humanize.IBytes(jobInfo.ZFSStreamBytes), totalWrittenBytes, humanize.IBytes(totalWrittenBytes), time.Since(jobInfo.StartTime), len(jobInfo.Volumes)+1, + humanize.IBytes(uint64(float64(totalWrittenBytes)/time.Since(jobInfo.StartTime).Hours()/1024)), ) } + fmt.Printf("Backup of %s completed successfully.\n", jobInfo.VolumeName) + log.AppLogger.Debugf("Cleaning up resources...") for _, backend := range usedBackends { @@ -483,28 +486,28 @@ func saveManifest(ctx context.Context, j *files.JobInfo, final bool) (*files.Vol // nolint:funlen,gocyclo // Difficult to break this apart func sendStream(ctx context.Context, j *files.JobInfo, c chan<- *files.VolumeInfo, buffer <-chan bool) error { - var group *errgroup.Group - group, ctx = errgroup.WithContext(ctx) - - buf := bytes.NewBuffer(nil) - cmd := zfs.GetZFSSendCommand(ctx, j) - cin, cout := io.Pipe() - cmd.Stdout = cout - cmd.Stderr = buf - counter := datacounter.NewReaderCounter(cin) - usingPipe := false - if j.MaxFileBuffer == 0 { - usingPipe = true - } - - // Get total dataset size for progress tracking - totalSize, err := zfs.GetDatasetSize(ctx, j.VolumeName) - if err != nil { - return err - } - - // Initialize progress bar - bar := progressbar.NewOptions64(int64(totalSize), + var group *errgroup.Group + group, ctx = errgroup.WithContext(ctx) + + buf := bytes.NewBuffer(nil) + cmd := zfs.GetZFSSendCommand(ctx, j) + cin, cout := io.Pipe() + cmd.Stdout = cout + cmd.Stderr = buf + counter := datacounter.NewReaderCounter(cin) + usingPipe := false + if j.MaxFileBuffer == 0 { + usingPipe = true + } + + // Get total dataset size for progress tracking + totalSize, err := zfs.GetDatasetSize(ctx, j.VolumeName) + if err != nil { + return err + } + + // Initialize progress bar + bar := progressbar.NewOptions64(int64(totalSize), progressbar.OptionSetWriter(ansi.NewAnsiStdout()), progressbar.OptionEnableColorCodes(true), progressbar.OptionShowBytes(true), @@ -526,128 +529,128 @@ func sendStream(ctx context.Context, j *files.JobInfo, c chan<- *files.VolumeInf }), ) - // Initialize chunk tracking variables - totalChunks := int(totalSize / (j.VolumeSize * humanize.MiByte)) - var processedChunks int - - group.Go(func() error { - var lastTotalBytes uint64 - defer close(c) - var err error - var volume *files.VolumeInfo - skipBytes, volNum := j.TotalBytesStreamedAndVols() - lastTotalBytes = skipBytes - for { - // Skip bytes if we are resuming - if skipBytes > 0 { - log.AppLogger.Debugf("Want to skip %d bytes.", skipBytes) - written, serr := io.CopyN(ioutil.Discard, counter, int64(skipBytes)) - if serr != nil && serr != io.EOF { - log.AppLogger.Errorf("Error while trying to read from the zfs stream to skip %d bytes - %v", skipBytes, serr) - return serr - } - skipBytes -= uint64(written) - log.AppLogger.Debugf("Skipped %d bytes of the ZFS send stream.", written) - continue - } - - // Setup next Volume - if volume == nil || volume.Counter() >= (j.VolumeSize*humanize.MiByte)-50*humanize.KiByte { - if volume != nil { - log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName) - volume.ZFSStreamBytes = counter.Count() - lastTotalBytes - lastTotalBytes = counter.Count() - if err = volume.Close(); err != nil { - log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err) - return err - } - if !usingPipe { - c <- volume - } - processedChunks++ - bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks)) - } - <-buffer - volume, err = files.CreateBackupVolume(ctx, j, volNum) - if err != nil { - log.AppLogger.Errorf("Error while creating volume %d - %v", volNum, err) - return err - } - log.AppLogger.Debugf("Starting volume %s", volume.ObjectName) - volNum++ - if usingPipe { - c <- volume - } - } - - // Write a little at a time and break the output between volumes as needed - bytesWritten, ierr := io.CopyN(volume, counter, files.BufferSize*2) - if ierr == io.EOF { - // We are done! - log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName) - volume.ZFSStreamBytes = counter.Count() - lastTotalBytes - if err = volume.Close(); err != nil { - log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err) - return err - } - if !usingPipe { - c <- volume - } - processedChunks++ - bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks)) - return nil - } else if ierr != nil { - log.AppLogger.Errorf("Error while trying to read from the zfs stream for volume %s - %v", volume.ObjectName, ierr) - return ierr - } - // Update progress bar - bar.Add64(int64(bytesWritten)) - } - }) - - // Start the zfs send command - log.AppLogger.Infof("Starting zfs send command: %s", strings.Join(cmd.Args, " ")) - err = cmd.Start() - if err != nil { - log.AppLogger.Errorf("Error starting zfs command - %v", err) - return err - } - - group.Go(func() error { - defer cout.Close() - return cmd.Wait() - }) - - defer func() { - if cmd.ProcessState == nil || !cmd.ProcessState.Exited() { - err = cmd.Process.Kill() - if err != nil { - log.AppLogger.Errorf("Could not kill zfs send command due to error - %v", err) - return - } - err = cmd.Process.Release() - if err != nil { - log.AppLogger.Errorf("Could not release resources from zfs send command due to error - %v", err) - return - } - } - }() - - manifestmutex.Lock() - j.ZFSCommandLine = strings.Join(cmd.Args, " ") - manifestmutex.Unlock() - // Wait for the command to finish - - err = group.Wait() - if err != nil { - log.AppLogger.Errorf("Error waiting for zfs command to finish - %v: %s", err, buf.String()) - return err - } - log.AppLogger.Infof("zfs send completed without error") - manifestmutex.Lock() - j.ZFSStreamBytes = counter.Count() - manifestmutex.Unlock() - return nil + // Initialize chunk tracking variables + totalChunks := int(totalSize / (j.VolumeSize * humanize.MiByte)) + var processedChunks int + + group.Go(func() error { + var lastTotalBytes uint64 + defer close(c) + var err error + var volume *files.VolumeInfo + skipBytes, volNum := j.TotalBytesStreamedAndVols() + lastTotalBytes = skipBytes + for { + // Skip bytes if we are resuming + if skipBytes > 0 { + log.AppLogger.Debugf("Want to skip %d bytes.", skipBytes) + written, serr := io.CopyN(ioutil.Discard, counter, int64(skipBytes)) + if serr != nil && serr != io.EOF { + log.AppLogger.Errorf("Error while trying to read from the zfs stream to skip %d bytes - %v", skipBytes, serr) + return serr + } + skipBytes -= uint64(written) + log.AppLogger.Debugf("Skipped %d bytes of the ZFS send stream.", written) + continue + } + + // Setup next Volume + if volume == nil || volume.Counter() >= (j.VolumeSize*humanize.MiByte)-50*humanize.KiByte { + if volume != nil { + log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName) + volume.ZFSStreamBytes = counter.Count() - lastTotalBytes + lastTotalBytes = counter.Count() + if err = volume.Close(); err != nil { + log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err) + return err + } + if !usingPipe { + c <- volume + } + processedChunks++ + bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks)) + } + <-buffer + volume, err = files.CreateBackupVolume(ctx, j, volNum) + if err != nil { + log.AppLogger.Errorf("Error while creating volume %d - %v", volNum, err) + return err + } + log.AppLogger.Debugf("Starting volume %s", volume.ObjectName) + volNum++ + if usingPipe { + c <- volume + } + } + + // Write a little at a time and break the output between volumes as needed + bytesWritten, ierr := io.CopyN(volume, counter, files.BufferSize*2) + if ierr == io.EOF { + // We are done! + log.AppLogger.Debugf("Finished creating volume %s", volume.ObjectName) + volume.ZFSStreamBytes = counter.Count() - lastTotalBytes + if err = volume.Close(); err != nil { + log.AppLogger.Errorf("Error while trying to close volume %s - %v", volume.ObjectName, err) + return err + } + if !usingPipe { + c <- volume + } + processedChunks++ + bar.Describe(fmt.Sprintf("Backing up... (%d/%d chunks)", processedChunks, totalChunks)) + return nil + } else if ierr != nil { + log.AppLogger.Errorf("Error while trying to read from the zfs stream for volume %s - %v", volume.ObjectName, ierr) + return ierr + } + // Update progress bar + bar.Add64(int64(bytesWritten)) + } + }) + + // Start the zfs send command + log.AppLogger.Infof("Starting zfs send command: %s", strings.Join(cmd.Args, " ")) + err = cmd.Start() + if err != nil { + log.AppLogger.Errorf("Error starting zfs command - %v", err) + return err + } + + group.Go(func() error { + defer cout.Close() + return cmd.Wait() + }) + + defer func() { + if cmd.ProcessState == nil || !cmd.ProcessState.Exited() { + err = cmd.Process.Kill() + if err != nil { + log.AppLogger.Errorf("Could not kill zfs send command due to error - %v", err) + return + } + err = cmd.Process.Release() + if err != nil { + log.AppLogger.Errorf("Could not release resources from zfs send command due to error - %v", err) + return + } + } + }() + + manifestmutex.Lock() + j.ZFSCommandLine = strings.Join(cmd.Args, " ") + manifestmutex.Unlock() + // Wait for the command to finish + + err = group.Wait() + if err != nil { + log.AppLogger.Errorf("Error waiting for zfs command to finish - %v: %s", err, buf.String()) + return err + } + log.AppLogger.Infof("zfs send completed without error") + manifestmutex.Lock() + j.ZFSStreamBytes = counter.Count() + manifestmutex.Unlock() + return nil } func tryResume(ctx context.Context, j *files.JobInfo) error {