Skip to content

Commit

Permalink
fix: Send problemReport for duplicate message (#157)
Browse files Browse the repository at this point in the history
fix: Sendproblem report for duplicate message

Signed-off-by: mineme0110 <shailesh.patil@iohk.io>
  • Loading branch information
mineme0110 authored Oct 27, 2023
1 parent 40f2fdb commit df522cf
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 10 deletions.
6 changes: 6 additions & 0 deletions mediator/src/main/scala/io/iohk/atala/mediator/Error.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ object StorageThrowable {
def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage)
}

final case class DuplicateMessage(val error: String) extends StorageError
object DuplicateMessage {
val code = 11000
def apply(throwable: Throwable) = new DuplicateMessage(throwable.getClass.getName() + ":" + throwable.getMessage)
}

// ProtocolError
sealed trait ProtocolError extends MediatorError {
def piuri: PIURI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ case class MediatorAgent(
maybeSyncReplyMsg <- receiveMessage(msg)
} yield (maybeSyncReplyMsg)

def receiveMessage(msg: EncryptedMessage): ZIO[
private def receiveMessage(msg: EncryptedMessage): ZIO[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
MediatorError | StorageError,
Option[SignedMessage | EncryptedMessage]
Expand All @@ -122,9 +122,7 @@ case class MediatorAgent(
plaintextMessage <- decrypt(msg)
maybeActionStorageError <- messageItemRepo
.insert(msg) // store all message
.map(_ /*WriteResult*/ => None
// TODO messages already on the database -> so this might be a replay attack
)
.map(_ /*WriteResult*/ => None)
.catchSome {
case StorageCollection(error) =>
// This deals with connection errors to the database.
Expand Down Expand Up @@ -163,6 +161,24 @@ case class MediatorAgent(
)
)
)
case DuplicateMessage(error) =>
ZIO.logWarning(s"Error DuplicateMessageError: $error") *>
ZIO
.service[Agent]
.map(agent =>
Some(
Reply(
Problems
.dejavuError(
to = plaintextMessage.from.map(_.asTO).toSet,
from = agent.id,
pthid = plaintextMessage.id,
piuri = plaintextMessage.`type`,
)
.toPlaintextMessage
)
)
)
}
// TODO Store context of the decrypt unwarping
// TODO SreceiveMessagetore context with MsgID and PIURI
Expand All @@ -173,10 +189,11 @@ case class MediatorAgent(
}.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex"))
} yield ret
}.catchAll {
case ex: MediatorError => ZIO.fail(ex)
case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage))
case ex: StorageCollection => ZIO.fail(ex)
case ex: StorageThrowable => ZIO.fail(ex)
case ex: MediatorError => ZIO.fail(ex)
case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage))
case ex: StorageCollection => ZIO.fail(ex)
case ex: StorageThrowable => ZIO.fail(ex)
case ex: DuplicateMessage => ZIO.fail(ex)
}
} yield maybeSyncReplyMsg
}
Expand Down Expand Up @@ -267,6 +284,9 @@ object MediatorAgent {
case StorageThrowable(error) =>
ZIO.logError(s"Error StorageThrowable: $error") *>
ZIO.succeed(Response.status(Status.BadRequest))
case DuplicateMessage(error) =>
ZIO.logError(s"Error DuplicateKeyError: $error") *>
ZIO.succeed(Response.status(Status.BadRequest))
case MissingProtocolError(piuri) =>
ZIO.logError(s"MissingProtocolError ('$piuri')") *>
ZIO.succeed(Response.status(Status.BadRequest)) // TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package io.iohk.atala.mediator.db

import fmgp.did.*
import fmgp.did.comm.EncryptedMessage
import io.iohk.atala.mediator.{StorageCollection, StorageError, StorageThrowable}
import io.iohk.atala.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable}
import reactivemongo.api.bson.*
import reactivemongo.api.bson.collection.BSONCollection
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.{Cursor, CursorProducer}
import zio.*
import reactivemongo.core.errors.DatabaseException

import scala.concurrent.ExecutionContext
object MessageItemRepo {
Expand All @@ -34,7 +35,10 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId)))
.tapError(err => ZIO.logError(s"insert : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
.mapError {
case ex: DatabaseException if (ex.code.contains(DuplicateMessage.code)) => DuplicateMessage(ex)
case ex => StorageThrowable(ex)
}
} yield result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ object Problems {
escalate_to = email,
)

def dejavuError(
to: Set[TO],
from: FROM,
pthid: MsgID,
piuri: PIURI,
) = ProblemReport(
// id: MsgID = MsgID(),
to = to,
from = from,
pthid = pthid,
ack = None,
code = ProblemCode.ErroFail("crypto", "message", "dejavu"),
comment = None,
args = None,
escalate_to = email,
)

def notEnroledError(
to: Option[TO],
from: FROM,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.iohk.atala.mediator.db

import fmgp.did.comm.EncryptedMessage
import io.iohk.atala.mediator.{DuplicateMessage, StorageError, StorageThrowable}
import zio.*
import zio.ExecutionStrategy.Sequential
import zio.json.*
import zio.test.*
import zio.test.Assertion.*
import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.*
import reactivemongo.core.errors.DatabaseException

object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup {

Expand All @@ -23,7 +25,18 @@ object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup {
assertTrue(result.n == 1)
}
}
},
test("insert same message again") {
ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") {
for {
messageItem <- ZIO.service[MessageItemRepo]

msg <- ZIO.fromEither(encryptedMessageAlice)
result <- messageItem.insert(msg).exit
} yield {
assert(result)(fails(isSubtype[DuplicateMessage](anything)))
}
}
},
test("findById message") {
for {
Expand Down

0 comments on commit df522cf

Please sign in to comment.