Skip to content

Commit

Permalink
fix goroutine and cpu heavy usage during file download #bug #154 , fi…
Browse files Browse the repository at this point in the history
…x last file part download dc mismatch bug, add more sensible logging to file download, TL192, v2.3.19
  • Loading branch information
AmarnathCJD committed Oct 30, 2024
1 parent 0878b58 commit e7a9108
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
2 changes: 1 addition & 1 deletion telegram/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "regexp"

const (
ApiVersion = 192
Version = "v2.3.18"
Version = "v2.3.19"

LogDebug = "debug"
LogInfo = "info"
Expand Down
40 changes: 29 additions & 11 deletions telegram/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,23 @@ func (c *Client) DownloadMedia(file interface{}, Opts ...*DownloadOptions) (stri

if opts.Buffer != nil {
dest = ":mem-buffer:"
c.Logger.Warn("downloading to buffer (memory) - use with caution")
c.Logger.Warn("downloading to buffer (memory) - use with caution (memory usage)")
}

c.Logger.Info(fmt.Sprintf("file - download: (%s) - (%d) - (%d)", dest, size, parts))
sizetoHuman := func(size int64) string {
if size < 1024 {
return fmt.Sprintf("%d B", size)
} else if size < 1024*1024 {
return fmt.Sprintf("%.2f KB", float64(size)/1024)
} else if size < 1024*1024*1024 {
return fmt.Sprintf("%.2f MB", float64(size)/1024/1024)
} else {
return fmt.Sprintf("%.2f GB", float64(size)/1024/1024/1024)
}
}

c.Logger.Info(fmt.Sprintf("file - download: (%s) - (%s) - (%d)", dest, sizetoHuman(size), parts))
c.Logger.Info(fmt.Sprintf("expected workers: %d, preallocated workers: %d", numWorkers, wPreallocated))
c.Logger.Debug(fmt.Sprintf("expected workers: %d, preallocated workers: %d", numWorkers, wPreallocated))

go c.allocateRemainingWorkers(dc, w, numWorkers, wPreallocated)

Expand All @@ -423,7 +434,7 @@ func (c *Client) DownloadMedia(file interface{}, Opts ...*DownloadOptions) (stri
var doneArray = make([]bool, totalParts+1)
var doneBytes = atomic.Int64{}

c.downloadParts(&wg, &mu, w, partSize, doneArray, numWorkers, location, &fs, opts, parts, &doneBytes)
c.downloadParts(&mu, w, partSize, doneArray, numWorkers, location, &fs, opts, parts, &doneBytes)

wg.Wait()

Expand Down Expand Up @@ -478,22 +489,30 @@ func (c *Client) createAndAppendSender(dcId int, senders []Sender, senderIndex i
}
}

func (c *Client) downloadParts(wg *sync.WaitGroup, mu *sync.Mutex, w []Sender, partSize int, doneArray []bool, numWorkers int, location InputFileLocation, fs *Destination, opts *DownloadOptions, parts int64, doneBytes *atomic.Int64) {
func (c *Client) downloadParts(mu *sync.Mutex, w []Sender, partSize int, doneArray []bool, numWorkers int, location InputFileLocation, fs *Destination, opts *DownloadOptions, parts int64, doneBytes *atomic.Int64) {
wg := sync.WaitGroup{}
sem := make(chan struct{}, 1)

for p := int64(0); p < parts; p++ {
sem <- struct{}{}
wg.Add(1)
go func(p int64) {
defer wg.Done()
defer func() { <-sem }()
for {
mu.Lock()
found, workerIndex := c.findAvailableWorker(w, numWorkers)
mu.Unlock()

if found {
go c.downloadPart(wg, mu, w, workerIndex, int(p), partSize, doneArray, location, fs, opts, doneBytes, parts*int64(partSize))
go c.downloadPart(mu, w, workerIndex, int(p), partSize, doneArray, location, fs, opts, doneBytes, parts*int64(partSize))
break
}
}
}(p)
}

wg.Wait()
}

func (c *Client) findAvailableWorker(w []Sender, numWorkers int) (bool, int) {
Expand All @@ -506,8 +525,7 @@ func (c *Client) findAvailableWorker(w []Sender, numWorkers int) (bool, int) {
return false, -1
}

func (c *Client) downloadPart(wg *sync.WaitGroup, mu *sync.Mutex, w []Sender, workerIndex, p int, partSize int, doneArray []bool, location InputFileLocation, fs *Destination, opts *DownloadOptions, doneBytes *atomic.Int64, totalBytes int64) {
defer wg.Done()
func (c *Client) downloadPart(mu *sync.Mutex, w []Sender, workerIndex, p int, partSize int, doneArray []bool, location InputFileLocation, fs *Destination, opts *DownloadOptions, doneBytes *atomic.Int64, totalBytes int64) {
defer func() {
mu.Lock()
w[workerIndex].buzy = false
Expand Down Expand Up @@ -549,7 +567,7 @@ partDownloadStartPoint:
if handleIfFlood(err, c) {
goto partDownloadStartPoint
}
c.Logger.Error(err)
c.Logger.Error(err, " - ", p)
case <-time.After(reqTimeout):
retryCount++
if retryCount > 2 {
Expand Down Expand Up @@ -610,7 +628,7 @@ downloadLastPartStartPoint:
errorChan := make(chan error, 1)

go func() {
upl, err := c.UploadGetFile(&UploadGetFileParams{
upl, err := w[0].c.UploadGetFile(&UploadGetFileParams{
Location: location,
Offset: int64(int(parts) * partSize),
Limit: int32(partSize),
Expand All @@ -635,7 +653,7 @@ downloadLastPartStartPoint:
if handleIfFlood(err, c) {
goto downloadLastPartStartPoint
}
c.Logger.Error(err)
c.Logger.Error(err, " - ", parts, " - (last part)")
case <-time.After(reqTimeout):
retryCount++
if retryCount > 2 {
Expand Down

0 comments on commit e7a9108

Please sign in to comment.