diff --git a/sdks/go/pkg/beam/io/avroio/avroio.go b/sdks/go/pkg/beam/io/avroio/avroio.go index b00c6d2eea00..809c9479f7a4 100644 --- a/sdks/go/pkg/beam/io/avroio/avroio.go +++ b/sdks/go/pkg/beam/io/avroio/avroio.go @@ -20,9 +20,9 @@ import ( "context" "encoding/json" "reflect" - "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" @@ -30,8 +30,7 @@ import ( ) func init() { - register.Function3x1(expandFn) - register.DoFn3x1[context.Context, string, func(beam.X), error]((*avroReadFn)(nil)) + register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), error]((*avroReadFn)(nil)) register.DoFn3x1[context.Context, int, func(*string) bool, error]((*writeAvroFn)(nil)) register.Emitter1[beam.X]() register.Iter1[string]() @@ -49,7 +48,8 @@ func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection { } func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection { - files := beam.ParDo(s, expandFn, col) + matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow()) + files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed()) return beam.ParDo(s, &avroReadFn{Type: beam.EncodedType{T: t}}, files, @@ -57,42 +57,15 @@ func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection { ) } -func expandFn(ctx context.Context, glob string, emit func(string)) error { - if strings.TrimSpace(glob) == "" { - return nil // ignore empty string elements here - } - - fs, err := filesystem.New(ctx, glob) - if err != nil { - return err - } - defer fs.Close() - - files, err := fs.List(ctx, glob) - if err != nil { - return err - } - for _, filename := range files { - emit(filename) - } - return nil -} - type avroReadFn struct { // Avro schema type Type beam.EncodedType } -func (f *avroReadFn) ProcessElement(ctx context.Context, filename string, emit func(beam.X)) (err error) { - log.Infof(ctx, "Reading AVRO from %v", filename) - - fs, err := filesystem.New(ctx, filename) - if err != nil { - return - } - defer fs.Close() +func (f *avroReadFn) ProcessElement(ctx context.Context, file fileio.ReadableFile, emit func(beam.X)) (err error) { + log.Infof(ctx, "Reading AVRO from %v", file.Metadata.Path) - fd, err := fs.OpenRead(ctx, filename) + fd, err := file.Open(ctx) if err != nil { return } diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio.go b/sdks/go/pkg/beam/io/parquetio/parquetio.go index 0d5a4fbb8316..9c48d134014b 100644 --- a/sdks/go/pkg/beam/io/parquetio/parquetio.go +++ b/sdks/go/pkg/beam/io/parquetio/parquetio.go @@ -18,11 +18,10 @@ package parquetio import ( "context" - "io" "reflect" - "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/xitongsys/parquet-go-source/buffer" @@ -31,14 +30,11 @@ import ( ) func init() { - register.Function3x1(expandFn) register.Emitter1[string]() - beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem()) - register.DoFn3x1[context.Context, string, func(beam.X), error](&parquetReadFn{}) + register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), error](&parquetReadFn{}) register.Emitter1[beam.X]() - beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem()) register.DoFn3x1[context.Context, int, func(*beam.X) bool, error](&parquetWriteFn{}) register.Iter1[beam.X]() } @@ -63,7 +59,8 @@ func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection { } func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection { - files := beam.ParDo(s, expandFn, col) + matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow()) + files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed()) return beam.ParDo(s, &parquetReadFn{Type: beam.EncodedType{T: t}}, files, @@ -71,45 +68,12 @@ func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection { ) } -func expandFn(ctx context.Context, glob string, emit func(string)) error { - if strings.TrimSpace(glob) == "" { - return nil // ignore empty string elements here - } - - fs, err := filesystem.New(ctx, glob) - if err != nil { - return err - } - defer fs.Close() - - files, err := fs.List(ctx, glob) - if err != nil { - return err - } - for _, filename := range files { - emit(filename) - } - return nil -} - type parquetReadFn struct { Type beam.EncodedType } -func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string, emit func(beam.X)) error { - fs, err := filesystem.New(ctx, filename) - if err != nil { - return err - } - defer fs.Close() - - fd, err := fs.OpenRead(ctx, filename) - if err != nil { - return err - } - defer fd.Close() - - data, err := io.ReadAll(fd) +func (a *parquetReadFn) ProcessElement(ctx context.Context, file fileio.ReadableFile, emit func(beam.X)) error { + data, err := file.Read(ctx) if err != nil { return err }