Skip to content

Commit

Permalink
fix(event-processor): add handling for non text messages
Browse files Browse the repository at this point in the history
  • Loading branch information
vincejv committed Nov 12, 2022
1 parent 769f634 commit ca7dc81
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public Uni<Void> process(MetaMsgEvtDto evt) {
})
.chain(() -> msgrApi.toggleTyping(evt.getSender(), false)).replaceWithVoid()
.onFailure().invoke(this::handleMsgEx);
} else {
Log.warn("Skipping unsupported event");
}
return Uni.createFrom().voidItem();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.abavilla.fpi.msgr.ext.dto.MsgrMsgReqDto;
import com.abavilla.fpi.msgr.ext.rest.TelegramReqApi;
import com.abavilla.fpi.telco.ext.enums.BotSource;
import com.mongodb.ErrorCategory;
import com.mongodb.MongoWriteException;
import com.pengrad.telegrambot.model.Update;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.quarkus.logging.Log;
Expand Down Expand Up @@ -62,22 +64,35 @@ public class TgMsgEvtPcsr {
public Uni<Void> process(Update evt) {
Log.info("Received telegram update event: " + evt);
var log = mapToEntity(evt);
return repo.persist(log).chain(savedLog -> {
Log.info("Logged to db: " + savedLog);
return telegramApi.toggleTyping(String.valueOf(savedLog.getSenderId())).chain(() -> {
var login = new WebhookLoginDto();
login.setUsername(String.valueOf(savedLog.getSenderId()));
login.setBotSource(BotSource.TELEGRAM.getValue());
Log.info("Authenticating user: " + login);
return loginApi.webhookAuthenticate(login)
// process load
.chain(session -> processLoadQuery(login, session, evt)
.onFailure().recoverWithUni(ex -> handleApiEx(evt, session.getResp().getUsername(), ex)))
// login failures/query exceptions
.onFailure().recoverWithUni(ex -> handleApiEx(evt, ex));
});
}
);
if (StringUtils.isNotBlank(log.getContent())) {
return repo.persist(log).chain(savedLog -> {
Log.info("Logged to db: " + savedLog);
return telegramApi.toggleTyping(String.valueOf(savedLog.getSenderId())).chain(() -> {
var login = new WebhookLoginDto();
login.setUsername(String.valueOf(savedLog.getSenderId()));
login.setBotSource(BotSource.TELEGRAM.getValue());
Log.info("Authenticating user: " + login);
return loginApi.webhookAuthenticate(login)
// process load
.chain(session -> processLoadQuery(login, session, evt)
.onFailure().recoverWithUni(ex -> handleApiEx(evt, session.getResp().getUsername(), ex)))
// login failures/query exceptions
.onFailure().recoverWithUni(ex -> handleApiEx(evt, ex));
});
}
)
.onFailure(ex -> ex instanceof MongoWriteException wEx &&
wEx.getError().getCategory().equals(ErrorCategory.DUPLICATE_KEY))
.recoverWithItem(() -> {
Log.warn("Received duplicate updateId: %d, senderId: %d, messageId: %d"
.formatted(log.getUpdateId(), log.getSenderId(), log.getMessageId()));
return null;
})
.onFailure().invoke(this::handleTgMsgFailed);
} else {
Log.warn("Skipping unsupported event");
}
return Uni.createFrom().voidItem();
}

private Uni<Void> processLoadQuery(WebhookLoginDto login, RespDto<SessionDto> session, Update evt) {
Expand All @@ -96,6 +111,10 @@ private Uni<Void> processLoadQuery(WebhookLoginDto login, RespDto<SessionDto> se
return sendMsgrMsg(evt, session.getResp().getUsername(), session.getStatus());
}

private void handleTgMsgFailed(Throwable sendMsgEx) {
Log.error("Telegram message sending failed: " + sendMsgEx.getMessage(), sendMsgEx);
}

private Uni<Void> handleApiEx(Update evt, Throwable ex) {
return handleApiEx(evt, null, ex);
}
Expand Down

0 comments on commit ca7dc81

Please sign in to comment.