Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Ethereum into Mobile #2044

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ipfs/pointers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func PutPointerToPeer(dht *routing.IpfsDHT, ctx context.Context, peer peer.ID, p

func GetPointersFromPeer(dht *routing.IpfsDHT, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) {
pmes := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, key.Bytes(), 0)
log.Debugf("Fetching pointers from: %v\n", p.Pretty())
resp, err := dht.SendRequest(ctx, p, pmes)
if err != nil {
return []*ps.PeerInfo{}, err
Expand Down
3 changes: 2 additions & 1 deletion mobile/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,13 @@ func (n *Node) start() error {
SendAck: n.OpenBazaarNode.SendOfflineAck,
SendError: n.OpenBazaarNode.SendError,
})
go MR.ResetPointerList()
go MR.Run()
n.OpenBazaarNode.MessageRetriever = MR
PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator)
go PR.Run()
n.OpenBazaarNode.PointerRepublisher = PR
MR.Wait()
// MR.Wait()

n.OpenBazaarNode.PublishLock.Unlock()
publishUnlocked = true
Expand Down
31 changes: 27 additions & 4 deletions net/retriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ import (

const DefaultPointerPrefixLength = 14

var log = logging.MustGetLogger("retriever")
var (
// Initialize a clear pointerList for the DHT on start
pointerList = []string{}
log = logging.MustGetLogger("retriever")
)

type MRConfig struct {
Db repo.Datastore
Expand Down Expand Up @@ -66,6 +70,20 @@ type offlineMessage struct {
env pb.Envelope
}

func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}

// Reset on startup
func (m *MessageRetriever) ResetPointerList() {
pointerList = []string{}
}

func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
var client *http.Client
if cfg.Dialer != nil {
Expand Down Expand Up @@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() {
peers := time.NewTicker(time.Minute)
defer dht.Stop()
defer peers.Stop()
go m.fetchPointersFromDHT()
go m.fetchPointersFromPushNodes()
go m.fetchPointersFromDHT()
for {
select {
case <-dht.C:
Expand Down Expand Up @@ -159,7 +177,9 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
inFlight := make(map[string]bool)
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found
for p := range peerOut {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !inFlight[p.Addrs[0].String()] {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] {
pointerList = append(pointerList, p.Addrs[0].String())
log.Debugf("Looking for pointer [%v] at %v\n", p.ID.Pretty(), p.Addrs)
inFlight[p.Addrs[0].String()] = true
log.Debugf("Found pointer with location %s", p.Addrs[0].String())
// IPFS
Expand Down Expand Up @@ -215,12 +235,15 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI
wg.Add(1)
go func(pid peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*35)
defer cancel()
time.Sleep(time.Second * 15)
provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k)
if err != nil {
log.Errorf("Could not get pointers from push node because: %v", err)
return
}
log.Debugf("Successfully queried %s for pointers", pid.Pretty())
for _, pi := range provs {
peerOut <- *pi
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
pb "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
Expand Down Expand Up @@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
Expand Down