-
Notifications
You must be signed in to change notification settings - Fork 2
/
concurrentloader.go
236 lines (204 loc) · 7.43 KB
/
concurrentloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// Package raggo provides utilities for concurrent document loading and processing.
package raggo
import (
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strings"
"sync"
"github.com/teilomillet/raggo/rag"
)
// ConcurrentPDFLoader extends the basic Loader interface with concurrent PDF processing
// capabilities. It provides efficient handling of multiple PDF files by:
// - Loading files in parallel using goroutines
// - Managing concurrent file operations safely
// - Handling file duplication when needed
// - Providing progress tracking and error handling
type ConcurrentPDFLoader interface {
// Embeds the basic Loader interface
Loader
// LoadPDFsConcurrent loads a specified number of PDF files concurrently from a source directory.
// If the source directory contains fewer files than the requested count, it automatically
// duplicates existing PDFs to reach the desired number.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - sourceDir: Directory containing source PDF files
// - targetDir: Directory where duplicated PDFs will be stored
// - count: Desired number of PDF files to load
//
// Returns:
// - []string: Paths to all successfully loaded files
// - error: Any error encountered during the process
//
// Example usage:
// loader := raggo.NewConcurrentPDFLoader(raggo.SetTimeout(1*time.Minute))
// files, err := loader.LoadPDFsConcurrent(ctx, "source", "target", 10)
LoadPDFsConcurrent(ctx context.Context, sourceDir string, targetDir string, count int) ([]string, error)
}
// concurrentPDFLoaderWrapper wraps the internal loader and adds concurrent PDF loading capability.
// It implements thread-safe operations and efficient resource management.
type concurrentPDFLoaderWrapper struct {
internal *rag.Loader
}
// NewConcurrentPDFLoader creates a new ConcurrentPDFLoader with the given options.
// It supports all standard loader options plus concurrent processing capabilities.
//
// Options can include:
// - SetTimeout: Maximum time for loading operations
// - SetTempDir: Directory for temporary files
// - SetRetryCount: Number of retries for failed operations
//
// Example:
// loader := raggo.NewConcurrentPDFLoader(
// raggo.SetTimeout(1*time.Minute),
// raggo.SetTempDir(os.TempDir()),
// )
func NewConcurrentPDFLoader(opts ...LoaderOption) ConcurrentPDFLoader {
return &concurrentPDFLoaderWrapper{internal: rag.NewLoader(opts...)}
}
// LoadPDFsConcurrent implements the concurrent PDF loading strategy.
// It performs the following steps:
// 1. Lists all PDF files in the source directory
// 2. Creates the target directory if it doesn't exist
// 3. Duplicates PDFs if necessary to reach the desired count
// 4. Loads files concurrently using goroutines
// 5. Collects results and errors from concurrent operations
//
// The function uses channels for thread-safe communication and a WaitGroup
// to ensure all operations complete before returning.
func (clw *concurrentPDFLoaderWrapper) LoadPDFsConcurrent(ctx context.Context, sourceDir string, targetDir string, count int) ([]string, error) {
pdfs, err := listPDFFiles(sourceDir)
if err != nil {
return nil, fmt.Errorf("failed to list PDF files in directory: %w", err)
}
if len(pdfs) == 0 {
return nil, fmt.Errorf("no PDF files found in directory: %s", sourceDir)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create target directory: %w", err)
}
// Duplicate PDFs if necessary
duplicatedPDFs, err := duplicatePDFs(pdfs, targetDir, count)
if err != nil {
return nil, fmt.Errorf("failed to duplicate PDFs: %w", err)
}
var wg sync.WaitGroup
results := make(chan string, len(duplicatedPDFs))
errors := make(chan error, len(duplicatedPDFs))
for _, pdf := range duplicatedPDFs {
wg.Add(1)
go func(pdfPath string) {
defer wg.Done()
loadedPath, err := clw.internal.LoadFile(ctx, pdfPath)
if err != nil {
errors <- err
return
}
results <- loadedPath
}(pdf)
}
go func() {
wg.Wait()
close(results)
close(errors)
}()
var loadedFiles []string
var loadErrors []error
for i := 0; i < len(duplicatedPDFs); i++ {
select {
case result := <-results:
loadedFiles = append(loadedFiles, result)
case err := <-errors:
loadErrors = append(loadErrors, err)
}
}
if len(loadErrors) > 0 {
return loadedFiles, fmt.Errorf("encountered %d errors during loading", len(loadErrors))
}
return loadedFiles, nil
}
// LoadURL implements the Loader interface by loading a document from a URL.
// This method is inherited from the basic Loader interface.
func (clw *concurrentPDFLoaderWrapper) LoadURL(ctx context.Context, url string) (string, error) {
return clw.internal.LoadURL(ctx, url)
}
// LoadFile implements the Loader interface by loading a single file.
// This method is inherited from the basic Loader interface.
func (clw *concurrentPDFLoaderWrapper) LoadFile(ctx context.Context, path string) (string, error) {
return clw.internal.LoadFile(ctx, path)
}
// LoadDir implements the Loader interface by loading all files in a directory.
// This method is inherited from the basic Loader interface.
func (clw *concurrentPDFLoaderWrapper) LoadDir(ctx context.Context, dir string) ([]string, error) {
return clw.internal.LoadDir(ctx, dir)
}
// listPDFFiles returns a list of all PDF files in the given directory.
// It recursively walks through the directory tree and identifies files
// with a .pdf extension (case-insensitive).
func listPDFFiles(dir string) ([]string, error) {
var pdfs []string
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && strings.ToLower(filepath.Ext(path)) == ".pdf" {
pdfs = append(pdfs, path)
}
return nil
})
return pdfs, err
}
// duplicatePDFs duplicates the given PDF files to reach the desired count.
// If the number of source PDFs is less than the desired count, it creates
// copies with unique names by appending a counter to the original filename.
//
// The function ensures that:
// - Each copy has a unique name
// - The total number of files matches the desired count
// - File copying is performed safely
func duplicatePDFs(pdfs []string, targetDir string, desiredCount int) ([]string, error) {
var duplicatedPDFs []string
numOriginalPDFs := len(pdfs)
if numOriginalPDFs >= desiredCount {
return pdfs[:desiredCount], nil
}
duplicationsNeeded := int(math.Ceil(float64(desiredCount) / float64(numOriginalPDFs)))
for i := 0; i < duplicationsNeeded; i++ {
for _, pdf := range pdfs {
if len(duplicatedPDFs) >= desiredCount {
break
}
newFileName := fmt.Sprintf("%s_copy%d%s", strings.TrimSuffix(filepath.Base(pdf), ".pdf"), i, ".pdf")
newFilePath := filepath.Join(targetDir, newFileName)
if err := copyFile(pdf, newFilePath); err != nil {
return nil, fmt.Errorf("failed to copy file %s: %w", pdf, err)
}
duplicatedPDFs = append(duplicatedPDFs, newFilePath)
}
}
return duplicatedPDFs, nil
}
// copyFile performs a safe copy of a file from src to dst.
// It handles:
// - Opening source and destination files
// - Proper resource cleanup with defer
// - Efficient copying with io.Copy
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}