Skip to content

Commit

Permalink
fix: improve concurrency in file cleaning and thumbnail verification …
Browse files Browse the repository at this point in the history
…processes
  • Loading branch information
PlusOne committed Dec 31, 2024
1 parent 50dba88 commit b40f7b5
Showing 1 changed file with 84 additions and 66 deletions.
150 changes: 84 additions & 66 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,15 +2322,30 @@ func runFileCleaner(ctx context.Context, storeDir string, ttl time.Duration) {
return
case <-ticker.C:
now := time.Now()
var wg sync.WaitGroup
err := filepath.Walk(storeDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
// Check if file metadata is cached
if metadata, found := fileMetadataCache.Get(path); found {
if fileMetadata, ok := metadata.(FileMetadata); ok {
if now.Sub(fileMetadata.CreationDate) > ttl {
wg.Add(1)
go func(path string, info os.FileInfo) {
defer wg.Done()
// Check if file metadata is cached
if metadata, found := fileMetadataCache.Get(path); found {
if fileMetadata, ok := metadata.(FileMetadata); ok {
if now.Sub(fileMetadata.CreationDate) > ttl {
err := os.Remove(path)
if err != nil {
log.WithError(err).Errorf("Failed to remove expired file: %s", path)
} else {
log.Infof("Removed expired file: %s", path)
}
}
}
} else {
// If metadata is not cached, use file modification time
if now.Sub(info.ModTime()) > ttl {
err := os.Remove(path)
if err != nil {
log.WithError(err).Errorf("Failed to remove expired file: %s", path)
Expand All @@ -2339,23 +2354,14 @@ func runFileCleaner(ctx context.Context, storeDir string, ttl time.Duration) {
}
}
}
} else {
// If metadata is not cached, use file modification time
if now.Sub(info.ModTime()) > ttl {
err := os.Remove(path)
if err != nil {
log.WithError(err).Errorf("Failed to remove expired file: %s", path)
} else {
log.Infof("Removed expired file: %s", path)
}
}
}
}(path, info)
}
return nil
})
if err != nil {
log.WithError(err).Error("Error cleaning files")
}
wg.Wait()
}
}
}
Expand Down Expand Up @@ -3145,67 +3151,79 @@ func verifyAndRepairThumbnails(thumbnailPaths []string, redisClient *redis.Clien
return
}

for _, thumbPath := range thumbnailPaths {
// Compute SHA-256 hash of the thumbnail file
file, err := os.Open(thumbPath)
if err != nil {
log.Warnf("Error opening thumbnail %s: %v", thumbPath, err)
continue
}
hasher := sha256.New()
if _, err := io.Copy(hasher, file); err != nil {
log.Warnf("Error hashing thumbnail %s: %v", thumbPath, err)
file.Close()
continue
}
file.Close()
computedHash := hex.EncodeToString(hasher.Sum(nil))
// Use a buffered channel to limit concurrent I/O operations
sem := make(chan struct{}, 10) // Limit to 10 concurrent operations

// Get stored hash from Redis
storedHash, err := redisClient.Get(context.Background(), thumbPath).Result()
if err == redis.Nil || storedHash != computedHash {
log.Warnf("Thumbnail %s is corrupted or missing. Regenerating...", thumbPath)

// Assume original image is in originalDir with the same base name
originalPath := filepath.Join(originalDir, filepath.Base(thumbPath))
origImage, err := imaging.Open(originalPath)
if err != nil {
log.Warnf("Error opening original image %s: %v", originalPath, err)
continue
}

// Generate thumbnail (e.g., 200x200 pixels)
thumbnail := imaging.Thumbnail(origImage, 200, 200, imaging.Lanczos)

// Save the regenerated thumbnail
err = imaging.Save(thumbnail, thumbPath)
if err != nil {
log.Warnf("Error saving regenerated thumbnail %s: %v", thumbPath, err)
continue
}
var wg sync.WaitGroup
for _, thumbPath := range thumbnailPaths {
wg.Add(1)
go func(thumbPath string) {
defer wg.Done()
sem <- struct{}{} // Acquire a slot
defer func() { <-sem }() // Release the slot

// Compute new hash
// Compute SHA-256 hash of the thumbnail file
file, err := os.Open(thumbPath)
if err != nil {
log.Warnf("Error opening regenerated thumbnail %s: %v", thumbPath, err)
continue
log.Warnf("Error opening thumbnail %s: %v", thumbPath, err)
return
}
hasher.Reset()
hasher := sha256.New()
if _, err := io.Copy(hasher, file); err != nil {
log.Warnf("Error hashing regenerated thumbnail %s: %v", thumbPath, err)
log.Warnf("Error hashing thumbnail %s: %v", thumbPath, err)
file.Close()
continue
return
}
file.Close()
newHash := hex.EncodeToString(hasher.Sum(nil))
computedHash := hex.EncodeToString(hasher.Sum(nil))

// Store new hash in Redis
err = redisClient.Set(context.Background(), thumbPath, newHash, 0).Err()
if err != nil {
log.Warnf("Error storing new hash for thumbnail %s in Redis: %v", thumbPath, err)
continue
// Get stored hash from Redis
storedHash, err := redisClient.Get(context.Background(), thumbPath).Result()
if err == redis.Nil || storedHash != computedHash {
log.Warnf("Thumbnail %s is corrupted or missing. Regenerating...", thumbPath)

// Assume original image is in originalDir with the same base name
originalPath := filepath.Join(originalDir, filepath.Base(thumbPath))
origImage, err := imaging.Open(originalPath)
if err != nil {
log.Warnf("Error opening original image %s: %v", originalPath, err)
return
}

// Generate thumbnail (e.g., 200x200 pixels)
thumbnail := imaging.Thumbnail(origImage, 200, 200, imaging.Lanczos)

// Save the regenerated thumbnail
err = imaging.Save(thumbnail, thumbPath)
if err != nil {
log.Warnf("Error saving regenerated thumbnail %s: %v", thumbPath, err)
return
}

// Compute new hash
file, err := os.Open(thumbPath)
if err != nil {
log.Warnf("Error opening regenerated thumbnail %s: %v", thumbPath, err)
return
}
hasher.Reset()
if _, err := io.Copy(hasher, file); err != nil {
log.Warnf("Error hashing regenerated thumbnail %s: %v", thumbPath, err)
file.Close()
return
}
file.Close()
newHash := hex.EncodeToString(hasher.Sum(nil))

// Store new hash in Redis
err = redisClient.Set(context.Background(), thumbPath, newHash, 0).Err()
if err != nil {
log.Warnf("Error storing new hash for thumbnail %s in Redis: %v", thumbPath, err)
return
}
log.Infof("Successfully regenerated and updated thumbnail %s", thumbPath)
}
log.Infof("Successfully regenerated and updated thumbnail %s", thumbPath)
}
}(thumbPath)
}
wg.Wait()
}

0 comments on commit b40f7b5

Please sign in to comment.