diff --git a/dht.go b/dht.go index 2ee2894eb..c2c3f9abd 100644 --- a/dht.go +++ b/dht.go @@ -24,7 +24,6 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" "github.com/jbenet/goprocess" @@ -33,6 +32,7 @@ import ( record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" "github.com/multiformats/go-base32" + "github.com/multiformats/go-multihash" ) var logger = logging.Logger("dht") @@ -374,11 +374,11 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { - eip := logger.EventBegin(ctx, "findProvidersSingle", p, key) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) { + eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key)) defer eip.Done() - pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0) + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0) resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: diff --git a/dht_test.go b/dht_test.go index 87be440ec..7f447b194 100644 --- a/dht_test.go +++ b/dht_test.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "math/rand" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" "github.com/multiformats/go-multistream" "golang.org/x/xerrors" @@ -42,8 +44,26 @@ func init() { for i := 0; i < 100; i++ { v := fmt.Sprintf("%d -- value", i) - mhv := u.Hash([]byte(v)) - testCaseCids = append(testCaseCids, cid.NewCidV0(mhv)) + var newCid cid.Cid + switch i % 3 { + case 0: + mhv := u.Hash([]byte(v)) + newCid = cid.NewCidV0(mhv) + case 1: + mhv := u.Hash([]byte(v)) + newCid = cid.NewCidV1(cid.DagCBOR, mhv) + case 2: + rawMh := make([]byte, 12) + binary.PutUvarint(rawMh, cid.Raw) + binary.PutUvarint(rawMh[1:], 10) + copy(rawMh[2:], []byte(v)[:10]) + _, mhv, err := multihash.MHFromBytes(rawMh) + if err != nil { + panic(err) + } + newCid = cid.NewCidV1(cid.Raw, mhv) + } + testCaseCids = append(testCaseCids, newCid) } } @@ -633,7 +653,7 @@ func TestLocalProvides(t *testing.T) { for _, c := range testCaseCids { for i := 0; i < 3; i++ { - provs := dhts[i].providers.GetProviders(ctx, c) + provs := dhts[i].providers.GetProviders(ctx, c.Hash()) if len(provs) > 0 { t.Fatal("shouldnt know this") } @@ -1374,7 +1394,7 @@ func TestClientModeConnect(t *testing.T) { c := testCaseCids[0] p := peer.ID("TestPeer") - a.providers.AddProvider(ctx, c, p) + a.providers.AddProvider(ctx, c.Hash(), p) time.Sleep(time.Millisecond * 5) // just in case... provs, err := b.FindProviders(ctx, c) @@ -1548,6 +1568,7 @@ func TestFindClosestPeers(t *testing.T) { func TestProvideDisabled(t *testing.T) { k := testCaseCids[0] + kHash := k.Hash() for i := 0; i < 3; i++ { enabledA := (i & 0x1) > 0 enabledB := (i & 0x2) > 0 @@ -1588,7 +1609,7 @@ func TestProvideDisabled(t *testing.T) { if err != routing.ErrNotSupported { t.Fatal("get should have failed on node B") } - provs := dhtB.providers.GetProviders(ctx, k) + provs := dhtB.providers.GetProviders(ctx, kHash) if len(provs) != 0 { t.Fatal("node B should not have found local providers") } @@ -1604,7 +1625,7 @@ func TestProvideDisabled(t *testing.T) { t.Fatal("node A should not have found providers") } } - provAddrs := dhtA.providers.GetProviders(ctx, k) + provAddrs := dhtA.providers.GetProviders(ctx, kHash) if len(provAddrs) != 0 { t.Fatal("node A should not have found local providers") } diff --git a/go.mod b/go.mod index 48b801e7b..80df33a74 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/multiformats/go-base32 v0.0.3 github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr-dns v0.2.0 + github.com/multiformats/go-multihash v0.0.10 github.com/multiformats/go-multistream v0.1.0 github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.2 diff --git a/go.sum b/go.sum index c2f35654f..d5492936c 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,6 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p v0.4.2 h1:p0cthB0jDNHO4gH2HzS8/nAMMXbfUlFHs0jwZ4U+F2g= -github.com/libp2p/go-libp2p v0.4.2/go.mod h1:MNmgUxUw5pMsdOzMlT0EE7oKjRasl+WyVwM0IBlpKgQ= github.com/libp2p/go-libp2p v0.5.0 h1:/nnb5mc2TK6TwknECsWIkfCwMTHv0AXbvzxlnVivfeg= github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkTQUB3G/wA= github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI= @@ -189,8 +187,6 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= -github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs= -github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM= @@ -268,8 +264,6 @@ github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQti github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc= github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw= github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY= -github.com/libp2p/go-ws-transport v0.1.2 h1:VnxQcLfSGtqupqPpBNu8fUiCv+IN1RJ2BcVqQEM+z8E= -github.com/libp2p/go-ws-transport v0.1.2/go.mod h1:dsh2Ld8F+XNmzpkaAijmg5Is+e9l6/1tK/6VFOdN69Y= github.com/libp2p/go-ws-transport v0.2.0 h1:MJCw2OrPA9+76YNRvdo1wMnSOxb9Bivj6sVFY1Xrj6w= github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzlHoKzu0yY9p/klM= github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg= diff --git a/handlers.go b/handlers.go index 23292ce80..fec98c51e 100644 --- a/handlers.go +++ b/handlers.go @@ -13,7 +13,6 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" u "github.com/ipfs/go-ipfs-util" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -317,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. logger.SetTag(ctx, "peer", p) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) - c, err := cid.Cast([]byte(pmes.GetKey())) - if err != nil { - return nil, err + key := pmes.GetKey() + if len(key) > 80 { + return nil, fmt.Errorf("handleGetProviders key size too large") } - logger.SetTag(ctx, "key", c) + logger.SetTag(ctx, "key", key) // debug logging niceness. - reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c) + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key) logger.Debugf("%s begin", reqDesc) defer logger.Debugf("%s end", reqDesc) // check if we have this value, to add ourselves as provider. - has, err := dht.datastore.Has(convertToDsKey(c.Bytes())) + has, err := dht.datastore.Has(convertToDsKey(key)) if err != nil && err != ds.ErrNotFound { logger.Debugf("unexpected datastore error: %v\n", err) has = false } // setup providers - providers := dht.providers.GetProviders(ctx, c) + providers := dht.providers.GetProviders(ctx, key) if has { providers = append(providers, dht.self) logger.Debugf("%s have the value. added self as provider", reqDesc) @@ -366,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M defer func() { logger.FinishWithErr(ctx, _err) }() logger.SetTag(ctx, "peer", p) - c, err := cid.Cast([]byte(pmes.GetKey())) - if err != nil { - return nil, err + key := pmes.GetKey() + if len(key) > 80 { + return nil, fmt.Errorf("handleAddProviders key size too large") } - logger.SetTag(ctx, "key", c) + logger.SetTag(ctx, "key", key) - logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c) + logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) // add provider should use the address given in the message pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) @@ -389,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M continue } - logger.Debugf("received provider %s for %s (addrs: %s)", p, c, pi.Addrs) + logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs) if pi.ID != dht.self { // don't add own addrs. // add the received addresses to our peerstore. dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL) } - dht.providers.AddProvider(ctx, c, p) + dht.providers.AddProvider(ctx, key, p) } return nil, nil diff --git a/lookup.go b/lookup.go index ee1552360..48c23e205 100644 --- a/lookup.go +++ b/lookup.go @@ -13,6 +13,8 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" notif "github.com/libp2p/go-libp2p-routing/notifications" + "github.com/multiformats/go-base32" + "github.com/multiformats/go-multihash" ) func tryFormatLoggableKey(k string) (string, error) { @@ -33,11 +35,15 @@ func tryFormatLoggableKey(k string) (string, error) { cstr = k } + var encStr string c, err := cid.Cast([]byte(cstr)) - if err != nil { - return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err) + if err == nil { + encStr = c.String() + } else { + encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr)) } - return fmt.Sprintf("/%s/%s", proto, c.String()), nil + + return fmt.Sprintf("/%s/%s", proto, encStr), nil } func loggableKey(k string) logging.LoggableMap { @@ -53,6 +59,12 @@ func loggableKey(k string) logging.LoggableMap { } } +func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap { + return logging.LoggableMap{ + "multihash": base32.RawStdEncoding.EncodeToString(mh), + } +} + // Kademlia 'node lookup' operation. Returns a channel of the K closest peers // to the given key func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { diff --git a/lookup_test.go b/lookup_test.go index 5dc5f29fa..6e29a8ea2 100644 --- a/lookup_test.go +++ b/lookup_test.go @@ -28,9 +28,15 @@ func TestLoggableKey(t *testing.T) { t.Error("expected cid to be formatted as a loggable key") } - for _, s := range []string{"bla bla", "/bla", "/bla/asdf", ""} { + for _, s := range []string{"/bla", ""} { if _, err := tryFormatLoggableKey(s); err == nil { t.Errorf("expected to fail formatting: %s", s) } } + + for _, s := range []string{"bla bla", "/bla/asdf"} { + if _, err := tryFormatLoggableKey(s); err != nil { + t.Errorf("expected to be formatable: %s", s) + } + } } diff --git a/providers/providers.go b/providers/providers.go index 2f98fc144..a446f87be 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" lru "github.com/hashicorp/golang-lru/simplelru" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" autobatch "github.com/ipfs/go-datastore/autobatch" dsq "github.com/ipfs/go-datastore/query" @@ -47,12 +46,12 @@ type providerSet struct { } type addProv struct { - k cid.Cid + k []byte val peer.ID } type getProv struct { - k cid.Cid + k []byte resp chan []peer.ID } @@ -76,15 +75,15 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) const providersKeyPrefix = "/providers/" -func mkProvKey(k cid.Cid) string { - return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) +func mkProvKey(k []byte) string { + return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k) } func (pm *ProviderManager) Process() goprocess.Process { return pm.proc } -func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) { +func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) { pset, err := pm.getProvSet(k) if err != nil { return nil, err @@ -92,8 +91,8 @@ func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) { return pset.providers, nil } -func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) { - cached, ok := pm.providers.Get(k) +func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) { + cached, ok := pm.providers.Get(string(k)) if ok { return cached.(*providerSet), nil } @@ -104,13 +103,13 @@ func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) { } if len(pset.providers) > 0 { - pm.providers.Add(k, pset) + pm.providers.Add(string(k), pset) } return pset, nil } -func loadProvSet(dstore ds.Datastore, k cid.Cid) (*providerSet, error) { +func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) { res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)}) if err != nil { return nil, err @@ -174,20 +173,20 @@ func readTimeValue(data []byte) (time.Time, error) { return time.Unix(0, nsec), nil } -func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error { +func (pm *ProviderManager) addProv(k []byte, p peer.ID) error { now := time.Now() - if provs, ok := pm.providers.Get(k); ok { + if provs, ok := pm.providers.Get(string(k)); ok { provs.(*providerSet).setVal(p, now) } // else not cached, just write through return writeProviderEntry(pm.dstore, k, p, now) } -func mkProvKeyFor(k cid.Cid, p peer.ID) string { +func mkProvKeyFor(k []byte, p peer.ID) string { return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) } -func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error { +func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { dsk := mkProvKeyFor(k, p) buf := make([]byte, 16) @@ -300,7 +299,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { } // AddProvider adds a provider. -func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer.ID) { +func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { prov := &addProv{ k: k, val: val, @@ -313,7 +312,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer. // GetProviders returns the set of providers for the given key. // This method _does not_ copy the set. Do not modify it. -func (pm *ProviderManager) GetProviders(ctx context.Context, k cid.Cid) []peer.ID { +func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { gp := &getProv{ k: k, resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking diff --git a/providers/providers_test.go b/providers/providers_test.go index 58756c55c..1d0d094a8 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -10,7 +10,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dssync "github.com/ipfs/go-datastore/sync" @@ -26,7 +27,7 @@ func TestProviderManager(t *testing.T) { mid := peer.ID("testing") p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) - a := cid.NewCidV0(u.Hash([]byte("test"))) + a := u.Hash([]byte("test")) p.AddProvider(ctx, a, peer.ID("testingprovider")) // Not cached @@ -56,14 +57,14 @@ func TestProvidersDatastore(t *testing.T) { defer p.proc.Close() friend := peer.ID("friend") - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 100; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) - p.AddProvider(ctx, c, friend) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) + p.AddProvider(ctx, h, friend) } - for _, c := range cids { + for _, c := range mhs { resp := p.GetProviders(ctx, c) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") @@ -77,7 +78,7 @@ func TestProvidersDatastore(t *testing.T) { func TestProvidersSerialization(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) - k := cid.NewCidV0(u.Hash(([]byte("my key!")))) + k := u.Hash(([]byte("my key!"))) p1 := peer.ID("peer one") p2 := peer.ID("peer two") pt1 := time.Now() @@ -135,26 +136,26 @@ func TestProvidesExpire(t *testing.T) { p := NewProviderManager(ctx, mid, ds) peers := []peer.ID{"a", "b"} - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 10; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) } - for _, c := range cids[:5] { - p.AddProvider(ctx, c, peers[0]) - p.AddProvider(ctx, c, peers[1]) + for _, h := range mhs[:5] { + p.AddProvider(ctx, h, peers[0]) + p.AddProvider(ctx, h, peers[1]) } time.Sleep(time.Second / 4) - for _, c := range cids[5:] { - p.AddProvider(ctx, c, peers[0]) - p.AddProvider(ctx, c, peers[1]) + for _, h := range mhs[5:] { + p.AddProvider(ctx, h, peers[0]) + p.AddProvider(ctx, h, peers[1]) } - for _, c := range cids { - out := p.GetProviders(ctx, c) + for _, h := range mhs { + out := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -162,15 +163,15 @@ func TestProvidesExpire(t *testing.T) { time.Sleep(3 * time.Second / 8) - for _, c := range cids[:5] { - out := p.GetProviders(ctx, c) + for _, h := range mhs[:5] { + out := p.GetProviders(ctx, h) if len(out) > 0 { t.Fatal("expected providers to be cleaned up, got: ", out) } } - for _, c := range cids[5:] { - out := p.GetProviders(ctx, c) + for _, h := range mhs[5:] { + out := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -201,30 +202,34 @@ func TestProvidesExpire(t *testing.T) { var _ = ioutil.NopCloser var _ = os.DevNull -/* This can be used for profiling. Keeping it commented out for now to avoid incurring extra CI time +// TestLargeProvidersSet can be used for profiling. +// The datastore can be switched to levelDB by uncommenting the section below and the import above func TestLargeProvidersSet(t *testing.T) { + t.Skip("This can be used for profiling. Skipping it for now to avoid incurring extra CI time") old := lruCacheSize lruCacheSize = 10 defer func() { lruCacheSize = old }() - dirn, err := ioutil.TempDir("", "provtest") - if err != nil { - t.Fatal(err) - } - - opts := &lds.Options{ - NoSync: true, - Compression: 1, - } - lds, err := lds.NewDatastore(dirn, opts) - if err != nil { - t.Fatal(err) - } - _ = lds + dstore := ds.NewMapDatastore() - defer func() { - os.RemoveAll(dirn) - }() + //dirn, err := ioutil.TempDir("", "provtest") + //if err != nil { + // t.Fatal(err) + //} + // + //opts := &lds.Options{ + // NoSync: true, + // Compression: 1, + //} + //lds, err := lds.NewDatastore(dirn, opts) + //if err != nil { + // t.Fatal(err) + //} + //dstore = lds + // + //defer func() { + // os.RemoveAll(dirn) + //}() ctx := context.Background() var peers []peer.ID @@ -233,28 +238,27 @@ func TestLargeProvidersSet(t *testing.T) { } mid := peer.ID("myself") - p := NewProviderManager(ctx, mid, lds) + p := NewProviderManager(ctx, mid, dstore) defer p.proc.Close() - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 1000; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) for _, pid := range peers { - p.AddProvider(ctx, c, pid) + p.AddProvider(ctx, h, pid) } } for i := 0; i < 5; i++ { start := time.Now() - for _, c := range cids { - _ = p.GetProviders(ctx, c) + for _, h := range mhs { + _ = p.GetProviders(ctx, h) } elapsed := time.Since(start) fmt.Printf("query %f ms\n", elapsed.Seconds()*1000) } } -*/ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { old := lruCacheSize @@ -264,20 +268,20 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) - c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2"))) + h1 := u.Hash([]byte("1")) + h2 := u.Hash([]byte("2")) pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) // add provider - pm.AddProvider(ctx, c1, p1) - // make the cached provider for c1 go to datastore - pm.AddProvider(ctx, c2, p1) + pm.AddProvider(ctx, h1, p1) + // make the cached provider for h1 go to datastore + pm.AddProvider(ctx, h2, p1) // now just offloaded record should be brought back and joined with p2 - pm.AddProvider(ctx, c1, p2) + pm.AddProvider(ctx, h1, p2) - c1Provs := pm.GetProviders(ctx, c1) - if len(c1Provs) != 2 { - t.Fatalf("expected c1 to be provided by 2 peers, is by %d", len(c1Provs)) + h1Provs := pm.GetProviders(ctx, h1) + if len(h1Provs) != 2 { + t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(h1Provs)) } } @@ -286,18 +290,18 @@ func TestWriteUpdatesCache(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) + h1 := u.Hash([]byte("1")) pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) // add provider - pm.AddProvider(ctx, c1, p1) + pm.AddProvider(ctx, h1, p1) // force into the cache - pm.GetProviders(ctx, c1) + pm.GetProviders(ctx, h1) // add a second provider - pm.AddProvider(ctx, c1, p2) + pm.AddProvider(ctx, h1, p2) - c1Provs := pm.GetProviders(ctx, c1) + c1Provs := pm.GetProviders(ctx, h1) if len(c1Provs) != 2 { - t.Fatalf("expected c1 to be provided by 2 peers, is by %d", len(c1Provs)) + t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(c1Provs)) } } diff --git a/routing.go b/routing.go index 970d52159..69a6f35ef 100644 --- a/routing.go +++ b/routing.go @@ -19,6 +19,7 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" + "github.com/multiformats/go-multihash" ) // asyncQueryBuffer is the size of buffered channels in async queries. This @@ -415,7 +416,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err if !dht.enableProviders { return routing.ErrNotSupported } - eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) + keyMH := key.Hash() + eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst}) defer func() { if err != nil { eip.SetError(err) @@ -424,7 +426,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err }() // add self locally - dht.providers.AddProvider(ctx, key, dht.self) + dht.providers.AddProvider(ctx, keyMH, dht.self) if !brdcst { return nil } @@ -450,12 +452,12 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err defer cancel() } - peers, err := dht.GetClosestPeers(closerCtx, key.KeyString()) + peers, err := dht.GetClosestPeers(closerCtx, string(keyMH)) if err != nil { return err } - mes, err := dht.makeProvRecord(key) + mes, err := dht.makeProvRecord(keyMH) if err != nil { return err } @@ -465,7 +467,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err wg.Add(1) go func(p peer.ID) { defer wg.Done() - logger.Debugf("putProvider(%s, %s)", key, p) + logger.Debugf("putProvider(%s, %s)", keyMH, p) err := dht.sendMessage(ctx, p, mes) if err != nil { logger.Debug(err) @@ -475,7 +477,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err wg.Wait() return nil } -func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { +func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) { pi := peer.AddrInfo{ ID: dht.self, Addrs: dht.host.Addrs(), @@ -487,7 +489,7 @@ func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { return nil, fmt.Errorf("no known addresses for self. cannot put provider.") } - pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0) + pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0) pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi}) return pmes, nil } @@ -514,14 +516,15 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return peerOut } - logger.Event(ctx, "findProviders", key) + keyMH := key.Hash() + logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) + go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut) return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) { - defer logger.EventBegin(ctx, "findProvidersAsync", key).Done() +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { + defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done() defer close(peerOut) ps := peer.NewLimitedSet(count) @@ -544,7 +547,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue) if len(peers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -555,7 +558,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, // setup the Query parent := ctx - query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { routing.PublishQueryEvent(parent, &routing.QueryEvent{ Type: routing.SendingQuery, ID: p, @@ -623,7 +626,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } // refresh the cpl for this key after the query is run - dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key.KeyString()), time.Now()) + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now()) } // FindPeer searches for a peer with given ID.