Skip to content

Commit

Permalink
RawRecord PR follow-up - address DGollings' comment (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
jf-tech authored Jan 13, 2021
1 parent 4aa56d6 commit 675471c
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ situations.
- Golang 1.14

## Recent Major Feature Additions/Changes
- Added `Transform.CurrentRawRecord()` for caller of omniparser to access the raw ingested record.
- Added `Transform.RawRecord()` for caller of omniparser to access the raw ingested record.
- Deprecated `custom_parse` in favor of `custom_func` (`custom_parse` is still usable for
back-compatibility, it is just removed from all public docs and samples).
- Added `NonValidatingReader` EDI segment reader.
Expand Down
5 changes: 4 additions & 1 deletion doc/gettingstarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,10 @@ for {
break
}
if err != nil { ... }
// output contains a []byte of the ingested and transformed record.
// output contains a []byte of the ingested and transformed record.

// Also transform.RawRecord() gives you access to the raw record.
fmt.Println(transform.RawRecord().Checksum())
}
```
Expand Down
8 changes: 3 additions & 5 deletions doc/programmability.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ for {
}
if err != nil { ... }
// output contains a []byte of the ingested and transformed record.
raw, err := transform.CurrentRawRecord()
if err != nil { ... }
rawRecord := raw.(*omniv21.RawRecord) // assuming the schema is of `omni.2.1` version.
fmt.Println(rawRecord.UUIDv3()) // rawRecord.UUIDv3() returns a stable hash of the current raw record.
// Also transform.RawRecord() gives you access to the raw record.
fmt.Println(transform.RawRecord().Checksum())
}
```
Note this out-of-box omniparser setup contains only the `omni.2.1` schema handler, meaning only schemas
Expand Down
30 changes: 16 additions & 14 deletions extensions/omniv21/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
"github.com/jf-tech/omniparser/extensions/omniv21/transform"
"github.com/jf-tech/omniparser/idr"
"github.com/jf-tech/omniparser/schemahandler"
"github.com/jf-tech/omniparser/transformctx"
)

// RawRecord contains the raw data ingested in from the input stream in the form of an IDR tree.
// Note callers outside this package should absolutely make **NO** modifications to the content of
// RawRecord. Treat it like read-only.
type RawRecord struct {
Node *idr.Node
type rawRecord struct {
node *idr.Node
}

// UUIDv3 returns a stable MD5(v3) hash of the RawRecord.
func (rr *RawRecord) UUIDv3() string {
hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.Node))
func (rr *rawRecord) Raw() interface{} {
return rr.node
}

// Checksum returns a stable MD5(v3) hash of the rawRecord.
func (rr *rawRecord) Checksum() string {
hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.node))
return hash
}

Expand All @@ -31,19 +33,19 @@ type ingester struct {
customParseFuncs transform.CustomParseFuncs // Deprecated.
ctx *transformctx.Ctx
reader fileformat.FormatReader
rawRecord RawRecord
rawRecord rawRecord
}

// Read ingests a raw record from the input stream, transforms it according the given schema and return
// the raw record, transformed JSON bytes.
func (g *ingester) Read() (interface{}, []byte, error) {
if g.rawRecord.Node != nil {
g.reader.Release(g.rawRecord.Node)
g.rawRecord.Node = nil
func (g *ingester) Read() (schemahandler.RawRecord, []byte, error) {
if g.rawRecord.node != nil {
g.reader.Release(g.rawRecord.node)
g.rawRecord.node = nil
}
n, err := g.reader.Read()
if n != nil {
g.rawRecord.Node = n
g.rawRecord.node = n
}
if err != nil {
// Read() supposed to have already done CtxAwareErr error wrapping. So directly return.
Expand Down
3 changes: 2 additions & 1 deletion extensions/omniv21/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func TestIngester_Read_Success(t *testing.T) {
}
raw, b, err := g.Read()
assert.NoError(t, err)
assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.(*RawRecord).UUIDv3())
assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.Checksum())
assert.Equal(t, "{}", idr.JSONify2(raw.Raw().(*idr.Node)))
assert.Equal(t, "123", string(b))
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
raw, b, err = g.Read()
Expand Down
8 changes: 3 additions & 5 deletions extensions/omniv21/samples/testCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/extensions/omniv21"
"github.com/jf-tech/omniparser/idr"
"github.com/jf-tech/omniparser/transformctx"
)
Expand Down Expand Up @@ -49,12 +48,11 @@ func SampleTestCommon(t *testing.T, schemaFile, inputFile string) string {
err = json.Unmarshal(recordBytes, &transformed)
assert.NoError(t, err)

raw, err := transform.CurrentRawRecord()
raw, err := transform.RawRecord()
assert.NoError(t, err)
rawRecord := raw.(*omniv21.RawRecord)
records = append(records, record{
RawRecord: idr.JSONify2(rawRecord.Node),
RawRecordHash: rawRecord.UUIDv3(),
RawRecord: idr.JSONify2(raw.Raw().(*idr.Node)),
RawRecordHash: raw.Checksum(),
TransformedRecord: transformed,
})
}
Expand Down
10 changes: 9 additions & 1 deletion schemahandler/schemaHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type SchemaHandler interface {
NewIngester(ctx *transformctx.Ctx, input io.Reader) (Ingester, error)
}

// RawRecord represents a raw record ingested from the input.
type RawRecord interface {
// Raw returns the actual raw record that is version specific to each of the schema handler.
Raw() interface{}
// Checksum returns a UUIDv3 (MD5) stable hash of the raw record.
Checksum() string
}

// Ingester is an interface of ingestion and transformation for a given input stream.
type Ingester interface {
// Read is called repeatedly during the processing of an input stream. Each call it should return
Expand All @@ -46,7 +54,7 @@ type Ingester interface {
// one record at a time, OR, processes and returns one record for each call. However, the overall
// design principle of omniparser is to have streaming processing capability so memory won't be a
// constraint when dealing with large input file. All built-in ingesters are implemented this way.
Read() (interface{}, []byte, error)
Read() (RawRecord, []byte, error)

// IsContinuableError is called to determine if the error returned by Read is fatal or not. After Read
// is called, the result record or error will be returned to caller. After caller consumes record or
Expand Down
16 changes: 7 additions & 9 deletions transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ type Transform interface {
// return the same error.
// Note if returned error isn't nil, then returned []byte will be nil.
Read() ([]byte, error)
// CurrentRawRecord returns the current raw record ingested from the input stream. If
// the last Read call failed, or Read hasn't been called yet, it will return an error.
// Each schema handler and extension has its own definition of what a raw record is
// so please check their corresponding doc.
CurrentRawRecord() (interface{}, error)
// RawRecord returns the current raw record ingested from the input stream. If the last
// Read call failed, or Read hasn't been called yet, it will return an error.
RawRecord() (schemahandler.RawRecord, error)
}

type transform struct {
ingester schemahandler.Ingester
lastRawRecord interface{}
lastRawRecord schemahandler.RawRecord
lastErr error
}

Expand Down Expand Up @@ -70,9 +68,9 @@ func (o *transform) Read() ([]byte, error) {
return transformed, err
}

// CurrentRawRecord returns the current raw record ingested from the input stream. If
// the last Read call failed, or Read hasn't been called yet, it will return an error.
func (o *transform) CurrentRawRecord() (interface{}, error) {
// RawRecord returns the current raw record ingested from the input stream. If the last
// Read call failed, or Read hasn't been called yet, it will return an error.
func (o *transform) RawRecord() (schemahandler.RawRecord, error) {
if o.lastErr != nil {
return nil, o.lastErr
}
Expand Down
56 changes: 37 additions & 19 deletions transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,41 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jf-tech/omniparser/errs"
"github.com/jf-tech/omniparser/schemahandler"
)

type testReadCall struct {
record []byte
result []byte
err error
}

func (trc testReadCall) Checksum() string {
if trc.err != nil {
panic("Checksum() called when err != nil")
}
return fmt.Sprintf("checksum of raw record of '%s'", string(trc.result))
}

func (trc testReadCall) Raw() interface{} {
if trc.err != nil {
panic("Raw() called when err != nil")
}
return fmt.Sprintf("raw record of '%s'", string(trc.result))
}

type testIngester struct {
readCalled int
readCalls []testReadCall
continuableErrs map[error]bool
}

func (g *testIngester) Read() (interface{}, []byte, error) {
func (g *testIngester) Read() (schemahandler.RawRecord, []byte, error) {
if g.readCalled >= len(g.readCalls) {
panic(fmt.Sprintf("Read() called %d time(s), but not enough mock entries setup", g.readCalled))
}
r := g.readCalls[g.readCalled]
g.readCalled++
return fmt.Sprintf("raw record %d", g.readCalled-1), r.record, r.err
return r, r.result, r.err
}

func (g *testIngester) IsContinuableError(err error) bool {
Expand All @@ -45,9 +60,9 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
tfm := &transform{
ingester: &testIngester{
readCalls: []testReadCall{
{record: []byte("1st good read")},
{result: []byte("1st good read")},
{err: continuableErr1},
{record: []byte("2nd good read")},
{result: []byte("2nd good read")},
{err: io.EOF},
},
continuableErrs: map[error]bool{continuableErr1: true},
Expand All @@ -56,32 +71,34 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
record, err := tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "1st good read", string(record))
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 0", raw.(string))
assert.Equal(t, "raw record of '1st good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.True(t, errs.IsErrTransformFailed(err))
assert.Equal(t, continuableErr1.Error(), err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.True(t, errs.IsErrTransformFailed(err))
assert.Nil(t, raw)

record, err = tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "2nd good read", string(record))
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 2", raw.(string))
assert.Equal(t, "raw record of '2nd good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '2nd good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, raw)
Expand All @@ -91,7 +108,7 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, raw)
Expand All @@ -101,24 +118,25 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) {
tfm := &transform{
ingester: &testIngester{
readCalls: []testReadCall{
{record: []byte("1st good read")},
{result: []byte("1st good read")},
{err: errors.New("fatal error")},
},
},
}
record, err := tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "1st good read", string(record))
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 0", raw.(string))
assert.Equal(t, "raw record of '1st good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.False(t, errs.IsErrTransformFailed(err))
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.False(t, errs.IsErrTransformFailed(err))
assert.Equal(t, "fatal error", err.Error())
Expand All @@ -129,15 +147,15 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, raw)
}

func TestTransform_CurrentRawRecord_CalledBeforeRead(t *testing.T) {
func TestTransform_RawRecord_CalledBeforeRead(t *testing.T) {
tfm := &transform{ingester: &testIngester{readCalls: []testReadCall{}}}
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, "must call Read first", err.Error())
assert.Nil(t, raw)
Expand Down

0 comments on commit 675471c

Please sign in to comment.