Skip to content

Commit

Permalink
Merge pull request #81 from glouvigny/glouvigny/bump-ipfslog-v1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Sep 15, 2021
2 parents ad349a0 + 975b2fb commit 5030118
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 168 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ jobs:
strategy:
matrix:
golang:
- 1.15.x
- 1.16.x
- 1.17.x
#- tip
env:
OS: ubuntu-latest
Expand Down Expand Up @@ -80,8 +80,8 @@ jobs:
strategy:
matrix:
golang:
- 1.15.x
- 1.16.x
- 1.17.x
#- tip
env:
OS: macos-latest
Expand Down
4 changes: 2 additions & 2 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
coreapi "github.com/ipfs/interface-go-ipfs-core"
p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -357,7 +357,7 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
}

if options.Tracer == nil {
options.Tracer = trace.NoopTracer{}
options.Tracer = trace.NewNoopTracerProvider().Tracer("")
}

if options.DirectChannelFactory == nil {
Expand Down
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
module berty.tech/go-orbit-db

go 1.15
go 1.16

require (
berty.tech/go-ipfs-log v1.4.1
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
berty.tech/go-ipfs-log v1.5.0
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/golang/snappy v0.0.1 // indirect
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs v0.9.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p v0.14.3
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.4.2
github.com/pkg/errors v0.9.1
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.8.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0
)
529 changes: 410 additions & 119 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
coreapi "github.com/ipfs/interface-go-ipfs-core"
p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down
4 changes: 2 additions & 2 deletions pubsub/pubsubcoreapi/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
options "github.com/ipfs/interface-go-ipfs-core/options"
p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"berty.tech/go-orbit-db/events"
Expand Down Expand Up @@ -173,7 +173,7 @@ func NewPubSub(api coreapi.CoreAPI, id peer.ID, pollInterval time.Duration, logg
}

if tracer == nil {
tracer = trace.NoopTracer{}
tracer = trace.NewNoopTracerProvider().Tracer("")
}

return &coreAPIPubSub{
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pubsubraw/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
p2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"berty.tech/go-orbit-db/events"
Expand Down Expand Up @@ -135,7 +135,7 @@ func NewPubSub(ps *p2ppubsub.PubSub, id peer.ID, logger *zap.Logger, tracer trac
}

if tracer == nil {
tracer = trace.NoopTracer{}
tracer = trace.NewNoopTracerProvider().Tracer("")
}

return &rawPubSub{
Expand Down
56 changes: 28 additions & 28 deletions stores/basestore/base_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
coreapi "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/pkg/errors"
otkv "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
otkv "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -131,7 +131,7 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
}

if options.Tracer == nil {
options.Tracer = trace.NoopTracer{}
options.Tracer = trace.NewNoopTracerProvider().Tracer("")
}

if identity == nil {
Expand Down Expand Up @@ -218,17 +218,17 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
for e := range sub {
switch evt := e.(type) {
case *replicator.EventLoadAdded:
span.AddEvent(ctx, "replicator-load-added", otkv.String("hash", evt.Hash.String()))
span.AddEvent("replicator-load-added", trace.WithAttributes(otkv.String("hash", evt.Hash.String())))
b.ReplicationStatus().IncQueued()
b.recalculateReplicationMax(0)
b.Emit(ctx, stores.NewEventReplicate(b.Address(), evt.Hash))

case *replicator.EventLoadEnd:
span.AddEvent(ctx, "replicator-load-end")
span.AddEvent("replicator-load-end")
b.replicationLoadComplete(ctx, evt.Logs)

case *replicator.EventLoadProgress:
span.AddEvent(ctx, "replicator-load-progress")
span.AddEvent("replicator-load-progress")
if b.ReplicationStatus().GetBuffered() > evt.BufferLength {
b.recalculateReplicationProgress(b.ReplicationStatus().GetProgress() + evt.BufferLength)
} else {
Expand Down Expand Up @@ -336,38 +336,38 @@ func (b *BaseStore) Load(ctx context.Context, amount int) error {
var localHeads, remoteHeads []*entry.Entry
localHeadsBytes, err := b.Cache().Get(datastore.NewKey("_localHeads"))
if err != nil && err != datastore.ErrNotFound {
span.AddEvent(ctx, "local-heads-load-failed")
span.AddEvent("local-heads-load-failed")
return errors.Wrap(err, "unable to get local heads from cache")
}

err = nil

if localHeadsBytes != nil {
span.AddEvent(ctx, "local-heads-unmarshall")
span.AddEvent("local-heads-unmarshall")
err = json.Unmarshal(localHeadsBytes, &localHeads)
if err != nil {
span.AddEvent(ctx, "local-heads-unmarshall-failed")
span.AddEvent("local-heads-unmarshall-failed")
b.logger.Warn("unable to unmarshal cached local heads", zap.Error(err))
}
span.AddEvent(ctx, "local-heads-unmarshalled")
span.AddEvent("local-heads-unmarshalled")
}

remoteHeadsBytes, err := b.Cache().Get(datastore.NewKey("_remoteHeads"))
if err != nil && err != datastore.ErrNotFound {
span.AddEvent(ctx, "remote-heads-load-failed")
span.AddEvent("remote-heads-load-failed")
return errors.Wrap(err, "unable to get data from cache")
}

err = nil

if remoteHeadsBytes != nil {
span.AddEvent(ctx, "remote-heads-unmarshall")
span.AddEvent("remote-heads-unmarshall")
err = json.Unmarshal(remoteHeadsBytes, &remoteHeads)
if err != nil {
span.AddEvent(ctx, "remote-heads-unmarshall-failed")
span.AddEvent("remote-heads-unmarshall-failed")
return errors.Wrap(err, "unable to unmarshal cached remote heads")
}
span.AddEvent(ctx, "remote-heads-unmarshalled")
span.AddEvent("remote-heads-unmarshalled")
}

heads := append(localHeads, remoteHeads...)
Expand Down Expand Up @@ -395,7 +395,7 @@ func (b *BaseStore) Load(ctx context.Context, amount int) error {

b.recalculateReplicationMax(h.GetClock().GetTime())

span.AddEvent(ctx, "store-head-loading")
span.AddEvent("store-head-loading")
l, inErr := ipfslog.NewFromEntryHash(ctx, b.IPFS(), b.Identity(), h.GetHash(), &ipfslog.LogOptions{
ID: oplog.GetID(),
AccessController: b.AccessController(),
Expand All @@ -408,40 +408,40 @@ func (b *BaseStore) Load(ctx context.Context, amount int) error {
})

if inErr != nil {
span.AddEvent(ctx, "store-head-loading-error")
span.AddEvent("store-head-loading-error")
err = errors.Wrap(inErr, "unable to create log from entry hash")
return
}

span.AddEvent(ctx, "store-head-loaded")
span.AddEvent("store-head-loaded")

span.AddEvent(ctx, "store-heads-joining")
span.AddEvent("store-heads-joining")
if _, inErr = oplog.Join(l, amount); inErr != nil {
span.AddEvent(ctx, "store-heads-joining-failed")
span.AddEvent("store-heads-joining-failed")
// err = errors.Wrap(err, "unable to join log")
// TODO: log
_ = inErr
} else {
span.AddEvent(ctx, "store-heads-joined")
span.AddEvent("store-heads-joined")
}
}(h)
}

wg.Wait()

if err != nil {
span.AddEvent(ctx, "store-handling-head-error", otkv.String("error", err.Error()))
span.AddEvent("store-handling-head-error", trace.WithAttributes(otkv.String("error", err.Error())))
return err
}

// Update the index
if len(heads) > 0 {
span.AddEvent(ctx, "store-index-updating")
span.AddEvent("store-index-updating")
if err := b.updateIndex(ctx); err != nil {
span.AddEvent(ctx, "store-index-updating-error", otkv.String("error", err.Error()))
span.AddEvent("store-index-updating-error", trace.WithAttributes(otkv.String("error", err.Error())))
return errors.Wrap(err, "unable to update index")
}
span.AddEvent(ctx, "store-index-updated")
span.AddEvent("store-index-updated")
}

b.Emit(ctx, stores.NewEventReady(b.Address(), b.OpLog().Heads().Slice()))
Expand Down Expand Up @@ -481,24 +481,24 @@ func (b *BaseStore) Sync(ctx context.Context, heads []ipfslog.Entry) error {

canAppend := b.AccessController().CanAppend(h, identityProvider, &CanAppendContext{log: b.OpLog()})
if canAppend != nil {
span.AddEvent(ctx, "store-sync-cant-append", otkv.String("error", canAppend.Error()))
span.AddEvent("store-sync-cant-append", trace.WithAttributes(otkv.String("error", canAppend.Error())))
b.Logger().Debug("warning: Given input entry is not allowed in this log and was discarded (no write access)", zap.Error(canAppend))
continue
}

hash, err := b.IO().Write(ctx, b.IPFS(), h, nil)
if err != nil {
span.AddEvent(ctx, "store-sync-cant-write", otkv.String("error", err.Error()))
span.AddEvent("store-sync-cant-write", trace.WithAttributes(otkv.String("error", err.Error())))
return errors.Wrap(err, "unable to write entry on dag")
}

if hash.String() != h.GetHash().String() {
span.AddEvent(ctx, "store-sync-cant-verify-hash")
span.AddEvent("store-sync-cant-verify-hash")
return errors.New("WARNING! Head hash didn't match the contents")
}

savedEntriesCIDs = append(savedEntriesCIDs, hash)
span.AddEvent(ctx, "store-sync-head-verified")
span.AddEvent("store-sync-head-verified")
}

b.Replicator().Load(ctx, savedEntriesCIDs)
Expand Down
8 changes: 4 additions & 4 deletions stores/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"berty.tech/go-orbit-db/events"
cid "github.com/ipfs/go-cid"
"github.com/pkg/errors"
otkv "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
otkv "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -108,7 +108,7 @@ func NewReplicator(ctx context.Context, store storeInterface, concurrency uint,
}

if opts.Tracer == nil {
opts.Tracer = trace.NoopTracer{}
opts.Tracer = trace.NewNoopTracerProvider().Tracer("")
}

ctx, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (r *replicator) processQueue(ctx context.Context) {
}

func (r *replicator) addToQueue(ctx context.Context, span trace.Span, h cid.Cid) {
span.AddEvent(ctx, "replicator-add-to-queue", otkv.String("cid", h.String()))
span.AddEvent("replicator-add-to-queue", trace.WithAttributes(otkv.String("cid", h.String())))

r.lock.Lock()
defer r.lock.Unlock()
Expand Down

0 comments on commit 5030118

Please sign in to comment.