From df522cf57e550827dbfa3bff665c00b63051018e Mon Sep 17 00:00:00 2001 From: Shailesh Patil <53746241+mineme0110@users.noreply.github.com> Date: Fri, 27 Oct 2023 12:27:48 +0100 Subject: [PATCH] fix: Send problemReport for duplicate message (#157) fix: Sendproblem report for duplicate message Signed-off-by: mineme0110 --- .../scala/io/iohk/atala/mediator/Error.scala | 6 ++++ .../atala/mediator/app/MediatorAgent.scala | 36 ++++++++++++++----- .../atala/mediator/db/MessageItemRepo.scala | 8 +++-- .../atala/mediator/protocols/Problems.scala | 17 +++++++++ .../mediator/db/MessageItemRepoSpec.scala | 13 +++++++ 5 files changed, 70 insertions(+), 10 deletions(-) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala index a75f023b..c4aa84a0 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala @@ -36,6 +36,12 @@ object StorageThrowable { def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage) } +final case class DuplicateMessage(val error: String) extends StorageError +object DuplicateMessage { + val code = 11000 + def apply(throwable: Throwable) = new DuplicateMessage(throwable.getClass.getName() + ":" + throwable.getMessage) +} + // ProtocolError sealed trait ProtocolError extends MediatorError { def piuri: PIURI diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index ccaeaeab..dba3230f 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -102,7 +102,7 @@ case class MediatorAgent( maybeSyncReplyMsg <- receiveMessage(msg) } yield (maybeSyncReplyMsg) - def receiveMessage(msg: EncryptedMessage): ZIO[ + private def receiveMessage(msg: EncryptedMessage): ZIO[ Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, MediatorError | StorageError, Option[SignedMessage | EncryptedMessage] @@ -122,9 +122,7 @@ case class MediatorAgent( plaintextMessage <- decrypt(msg) maybeActionStorageError <- messageItemRepo .insert(msg) // store all message - .map(_ /*WriteResult*/ => None - // TODO messages already on the database -> so this might be a replay attack - ) + .map(_ /*WriteResult*/ => None) .catchSome { case StorageCollection(error) => // This deals with connection errors to the database. @@ -163,6 +161,24 @@ case class MediatorAgent( ) ) ) + case DuplicateMessage(error) => + ZIO.logWarning(s"Error DuplicateMessageError: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .dejavuError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) + ) } // TODO Store context of the decrypt unwarping // TODO SreceiveMessagetore context with MsgID and PIURI @@ -173,10 +189,11 @@ case class MediatorAgent( }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) } yield ret }.catchAll { - case ex: MediatorError => ZIO.fail(ex) - case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage)) - case ex: StorageCollection => ZIO.fail(ex) - case ex: StorageThrowable => ZIO.fail(ex) + case ex: MediatorError => ZIO.fail(ex) + case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage)) + case ex: StorageCollection => ZIO.fail(ex) + case ex: StorageThrowable => ZIO.fail(ex) + case ex: DuplicateMessage => ZIO.fail(ex) } } yield maybeSyncReplyMsg } @@ -267,6 +284,9 @@ object MediatorAgent { case StorageThrowable(error) => ZIO.logError(s"Error StorageThrowable: $error") *> ZIO.succeed(Response.status(Status.BadRequest)) + case DuplicateMessage(error) => + ZIO.logError(s"Error DuplicateKeyError: $error") *> + ZIO.succeed(Response.status(Status.BadRequest)) case MissingProtocolError(piuri) => ZIO.logError(s"MissingProtocolError ('$piuri')") *> ZIO.succeed(Response.status(Status.BadRequest)) // TODO diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala index f5143477..023e3ab7 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala @@ -2,12 +2,13 @@ package io.iohk.atala.mediator.db import fmgp.did.* import fmgp.did.comm.EncryptedMessage -import io.iohk.atala.mediator.{StorageCollection, StorageError, StorageThrowable} +import io.iohk.atala.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable} import reactivemongo.api.bson.* import reactivemongo.api.bson.collection.BSONCollection import reactivemongo.api.commands.WriteResult import reactivemongo.api.{Cursor, CursorProducer} import zio.* +import reactivemongo.core.errors.DatabaseException import scala.concurrent.ExecutionContext object MessageItemRepo { @@ -34,7 +35,10 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon result <- ZIO .fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId))) .tapError(err => ZIO.logError(s"insert : ${err.getMessage}")) - .mapError(ex => StorageThrowable(ex)) + .mapError { + case ex: DatabaseException if (ex.code.contains(DuplicateMessage.code)) => DuplicateMessage(ex) + case ex => StorageThrowable(ex) + } } yield result } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 7536834e..479356a9 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -94,6 +94,23 @@ object Problems { escalate_to = email, ) + def dejavuError( + to: Set[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to, + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("crypto", "message", "dejavu"), + comment = None, + args = None, + escalate_to = email, + ) + def notEnroledError( to: Option[TO], from: FROM, diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala index a40611b2..0840452c 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala @@ -1,12 +1,14 @@ package io.iohk.atala.mediator.db import fmgp.did.comm.EncryptedMessage +import io.iohk.atala.mediator.{DuplicateMessage, StorageError, StorageThrowable} import zio.* import zio.ExecutionStrategy.Sequential import zio.json.* import zio.test.* import zio.test.Assertion.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* +import reactivemongo.core.errors.DatabaseException object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup { @@ -23,7 +25,18 @@ object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup { assertTrue(result.n == 1) } } + }, + test("insert same message again") { + ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") { + for { + messageItem <- ZIO.service[MessageItemRepo] + msg <- ZIO.fromEither(encryptedMessageAlice) + result <- messageItem.insert(msg).exit + } yield { + assert(result)(fails(isSubtype[DuplicateMessage](anything))) + } + } }, test("findById message") { for {