From 7871aae3d8b846a51e0c3ea063bb328c664380bc Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 15 Apr 2019 21:41:47 -0500 Subject: [PATCH 1/3] Rebroadcast new blocks at lite nodes --- .../lite/network/LiteNodeNetworkService.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index 25e85eb3159..7a3513d8233 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -19,6 +19,7 @@ import bisq.core.dao.node.messages.GetBlocksResponse; import bisq.core.dao.node.messages.NewBlockBroadcastMessage; +import bisq.core.dao.state.model.blockchain.BaseTx; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.CloseConnectionReason; @@ -26,6 +27,7 @@ import bisq.network.p2p.network.ConnectionListener; import bisq.network.p2p.network.MessageListener; import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; import bisq.network.p2p.seed.SeedNodeRepository; @@ -37,6 +39,8 @@ import javax.inject.Inject; +import com.google.common.base.Joiner; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -44,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -87,6 +92,7 @@ public interface Listener { private final NetworkNode networkNode; private final PeerManager peerManager; + private final Broadcaster broadcaster; private final Collection seedNodeAddresses; private final List listeners = new CopyOnWriteArrayList<>(); @@ -95,6 +101,7 @@ public interface Listener { private final Map, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>(); private Timer retryTimer; private boolean stopped; + private Set receivedBlocks = new HashSet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -104,9 +111,11 @@ public interface Listener { @Inject public LiteNodeNetworkService(NetworkNode networkNode, PeerManager peerManager, + Broadcaster broadcaster, SeedNodeRepository seedNodesRepository) { this.networkNode = networkNode; this.peerManager = peerManager; + this.broadcaster = broadcaster; // seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes) this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses()); } @@ -220,10 +229,23 @@ public void onAwakeFromStandby() { public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof NewBlockBroadcastMessage) { log.info("We received blocks from peer {}", connection.getPeersNodeAddressOptional()); - listeners.forEach(listener -> listener.onNewBlockReceived((NewBlockBroadcastMessage) networkEnvelope)); + NewBlockBroadcastMessage newBlockBroadcastMessage = (NewBlockBroadcastMessage) networkEnvelope; + + // We combine blockHash and txId list in case we receive blocks with different transactions. + List txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList()); + String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + "_" + Joiner.on(", ").join(txIds); + if (!receivedBlocks.contains(extBlockId)) { + log.error("We received a new message and broadcast it to our peers. extBlockId={}", extBlockId); + receivedBlocks.add(extBlockId); + broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null, false); + listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); + } else { + log.error("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); + } } } + /////////////////////////////////////////////////////////////////////////////////////////// // RequestData /////////////////////////////////////////////////////////////////////////////////////////// From 2fedc51a87194be1a0d893c3e707bddc01eb0723 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 15 Apr 2019 21:55:43 -0500 Subject: [PATCH 2/3] Change log levels --- .../dao/node/lite/network/LiteNodeNetworkService.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index 7a3513d8233..f19412a3e1e 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -39,8 +39,6 @@ import javax.inject.Inject; -import com.google.common.base.Joiner; - import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -228,19 +226,18 @@ public void onAwakeFromStandby() { @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof NewBlockBroadcastMessage) { - log.info("We received blocks from peer {}", connection.getPeersNodeAddressOptional()); NewBlockBroadcastMessage newBlockBroadcastMessage = (NewBlockBroadcastMessage) networkEnvelope; - // We combine blockHash and txId list in case we receive blocks with different transactions. List txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList()); - String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + "_" + Joiner.on(", ").join(txIds); + String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds; if (!receivedBlocks.contains(extBlockId)) { - log.error("We received a new message and broadcast it to our peers. extBlockId={}", extBlockId); + log.info("We received a new message from peer {} and broadcast it to our peers. extBlockId={}", + connection.getPeersNodeAddressOptional(), extBlockId); receivedBlocks.add(extBlockId); broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null, false); listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); } else { - log.error("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); + log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); } } } From 188f3eee6d612cfe74bba9c35d8ee9e73e4f24d9 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 15 Apr 2019 22:02:41 -0500 Subject: [PATCH 3/3] Change log levels --- .../core/dao/monitoring/network/RequestStateHashesHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java b/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java index ea59281d22f..fc7f23d176a 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java @@ -121,7 +121,7 @@ public void requestStateHashes(int fromHeight) { TIMEOUT); } - log.info("We send to peer {} a {}.", nodeAddress, getStateHashesRequest); + log.debug("We send to peer {} a {}.", nodeAddress, getStateHashesRequest); networkNode.addMessageListener(this); SettableFuture future = networkNode.sendMessage(nodeAddress, getStateHashesRequest); Futures.addCallback(future, new FutureCallback<>() {