Skip to content

Commit

Permalink
[Go SDK]: Refactor avroio and parquetio Read to use fileio abstractio…
Browse files Browse the repository at this point in the history
…ns (#28177)

* Refactor avroio.Read to use fileio abstractions

* Refactor parquetio.Read to use fileio abstractions
  • Loading branch information
johannaojeling authored Aug 30, 2023
1 parent 93fd021 commit a636f25
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 76 deletions.
41 changes: 7 additions & 34 deletions sdks/go/pkg/beam/io/avroio/avroio.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"context"
"encoding/json"
"reflect"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/linkedin/goavro/v2"
)

func init() {
register.Function3x1(expandFn)
register.DoFn3x1[context.Context, string, func(beam.X), error]((*avroReadFn)(nil))
register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), error]((*avroReadFn)(nil))
register.DoFn3x1[context.Context, int, func(*string) bool, error]((*writeAvroFn)(nil))
register.Emitter1[beam.X]()
register.Iter1[string]()
Expand All @@ -49,50 +48,24 @@ func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection {
}

func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection {
files := beam.ParDo(s, expandFn, col)
matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
return beam.ParDo(s,
&avroReadFn{Type: beam.EncodedType{T: t}},
files,
beam.TypeDefinition{Var: beam.XType, T: t},
)
}

func expandFn(ctx context.Context, glob string, emit func(string)) error {
if strings.TrimSpace(glob) == "" {
return nil // ignore empty string elements here
}

fs, err := filesystem.New(ctx, glob)
if err != nil {
return err
}
defer fs.Close()

files, err := fs.List(ctx, glob)
if err != nil {
return err
}
for _, filename := range files {
emit(filename)
}
return nil
}

type avroReadFn struct {
// Avro schema type
Type beam.EncodedType
}

func (f *avroReadFn) ProcessElement(ctx context.Context, filename string, emit func(beam.X)) (err error) {
log.Infof(ctx, "Reading AVRO from %v", filename)

fs, err := filesystem.New(ctx, filename)
if err != nil {
return
}
defer fs.Close()
func (f *avroReadFn) ProcessElement(ctx context.Context, file fileio.ReadableFile, emit func(beam.X)) (err error) {
log.Infof(ctx, "Reading AVRO from %v", file.Metadata.Path)

fd, err := fs.OpenRead(ctx, filename)
fd, err := file.Open(ctx)
if err != nil {
return
}
Expand Down
48 changes: 6 additions & 42 deletions sdks/go/pkg/beam/io/parquetio/parquetio.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package parquetio

import (
"context"
"io"
"reflect"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/xitongsys/parquet-go-source/buffer"
Expand All @@ -31,14 +30,11 @@ import (
)

func init() {
register.Function3x1(expandFn)
register.Emitter1[string]()

beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem())
register.DoFn3x1[context.Context, string, func(beam.X), error](&parquetReadFn{})
register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), error](&parquetReadFn{})
register.Emitter1[beam.X]()

beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem())
register.DoFn3x1[context.Context, int, func(*beam.X) bool, error](&parquetWriteFn{})
register.Iter1[beam.X]()
}
Expand All @@ -63,53 +59,21 @@ func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection {
}

func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection {
files := beam.ParDo(s, expandFn, col)
matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
return beam.ParDo(s,
&parquetReadFn{Type: beam.EncodedType{T: t}},
files,
beam.TypeDefinition{Var: beam.XType, T: t},
)
}

func expandFn(ctx context.Context, glob string, emit func(string)) error {
if strings.TrimSpace(glob) == "" {
return nil // ignore empty string elements here
}

fs, err := filesystem.New(ctx, glob)
if err != nil {
return err
}
defer fs.Close()

files, err := fs.List(ctx, glob)
if err != nil {
return err
}
for _, filename := range files {
emit(filename)
}
return nil
}

type parquetReadFn struct {
Type beam.EncodedType
}

func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string, emit func(beam.X)) error {
fs, err := filesystem.New(ctx, filename)
if err != nil {
return err
}
defer fs.Close()

fd, err := fs.OpenRead(ctx, filename)
if err != nil {
return err
}
defer fd.Close()

data, err := io.ReadAll(fd)
func (a *parquetReadFn) ProcessElement(ctx context.Context, file fileio.ReadableFile, emit func(beam.X)) error {
data, err := file.Read(ctx)
if err != nil {
return err
}
Expand Down

0 comments on commit a636f25

Please sign in to comment.