Skip to content

Commit

Permalink
Merge pull request #126 from bittcrafter/dev/0.10.0
Browse files Browse the repository at this point in the history
At startup, if the Pulsar service is unavailable, it will retry every…
  • Loading branch information
bittcrafter authored Nov 7, 2024
2 parents 05bf3c3 + d576a52 commit 648e3b4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
11 changes: 10 additions & 1 deletion rmqtt-plugins/rmqtt-bridge-egress-pulsar/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use pulsar::{
authentication::oauth2::OAuth2Authentication, producer, Authentication, Error as PulsarError,
Expand Down Expand Up @@ -248,7 +249,15 @@ impl BridgeManager {
Ok(pulsar)
}

pub async fn start(&mut self) -> Result<()> {
pub async fn start(&mut self) {
while let Err(e) = self._start().await {
log::error!("start bridge-egress-pulsar error, {:?}", e);
self.stop().await;
tokio::time::sleep(Duration::from_millis(3000)).await;
}
}

async fn _start(&mut self) -> Result<()> {
let mut topics = self.topics.write().await;
let bridges = self.cfg.read().await.bridges.clone();
let mut bridge_names: HashSet<&str> = HashSet::default();
Expand Down
7 changes: 2 additions & 5 deletions rmqtt-plugins/rmqtt-bridge-egress-pulsar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ impl BridgePulsarEgressPlugin {
while let Some(cmd) = bridge_mgr_cmd_rx.recv().await {
match cmd {
Command::Start => {
if let Err(e) = bridge_mgr.start().await {
log::error!("start bridge-egress-pulsar error, {:?}", e);
} else {
log::info!("start bridge-egress-pulsar ok.");
}
bridge_mgr.start().await;
log::info!("start bridge-egress-pulsar ok.");
}
Command::Close => {
bridge_mgr.stop().await;
Expand Down

0 comments on commit 648e3b4

Please sign in to comment.