Skip to content

Commit

Permalink
Merge pull request #2719 from ManfredKarrer/forward-broadast-msg
Browse files Browse the repository at this point in the history
Rebroadcast new blocks at lite nodes
  • Loading branch information
ripcurlx authored Apr 16, 2019
2 parents 6cceed2 + 51f9394 commit 652b5ab
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void requestStateHashes(int fromHeight) {
TIMEOUT);
}

log.info("We send to peer {} a {}.", nodeAddress, getStateHashesRequest);
log.debug("We send to peer {} a {}.", nodeAddress, getStateHashesRequest);
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getStateHashesRequest);
Futures.addCallback(future, new FutureCallback<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import bisq.core.dao.node.messages.GetBlocksResponse;
import bisq.core.dao.node.messages.NewBlockBroadcastMessage;
import bisq.core.dao.state.model.blockchain.BaseTx;

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.ConnectionListener;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.seed.SeedNodeRepository;

Expand All @@ -44,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -87,6 +90,7 @@ public interface Listener {

private final NetworkNode networkNode;
private final PeerManager peerManager;
private final Broadcaster broadcaster;
private final Collection<NodeAddress> seedNodeAddresses;

private final List<Listener> listeners = new CopyOnWriteArrayList<>();
Expand All @@ -95,6 +99,7 @@ public interface Listener {
private final Map<Tuple2<NodeAddress, Integer>, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>();
private Timer retryTimer;
private boolean stopped;
private Set<String> receivedBlocks = new HashSet<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -104,9 +109,11 @@ public interface Listener {
@Inject
public LiteNodeNetworkService(NetworkNode networkNode,
PeerManager peerManager,
Broadcaster broadcaster,
SeedNodeRepository seedNodesRepository) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.broadcaster = broadcaster;
// seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes)
this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
}
Expand Down Expand Up @@ -219,11 +226,23 @@ public void onAwakeFromStandby() {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof NewBlockBroadcastMessage) {
log.info("We received blocks from peer {}", connection.getPeersNodeAddressOptional());
listeners.forEach(listener -> listener.onNewBlockReceived((NewBlockBroadcastMessage) networkEnvelope));
NewBlockBroadcastMessage newBlockBroadcastMessage = (NewBlockBroadcastMessage) networkEnvelope;
// We combine blockHash and txId list in case we receive blocks with different transactions.
List<String> txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList());
String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds;
if (!receivedBlocks.contains(extBlockId)) {
log.info("We received a new message from peer {} and broadcast it to our peers. extBlockId={}",
connection.getPeersNodeAddressOptional(), extBlockId);
receivedBlocks.add(extBlockId);
broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null, false);
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
} else {
log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId);
}
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// RequestData
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 652b5ab

Please sign in to comment.