Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use long values instead of int values #101

Merged
merged 2 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,27 @@ import reactor.core.publisher.Mono
}
}

def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Int] =
def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] =
stmt.execute().asFuture().flatMap { result =>
result.getRowsUpdated.asFuture().map(_.intValue())(ExecutionContexts.parasitic)
result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic)
}

def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Int] = {
val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem.intValue()
def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = {
val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem
Flux
.from[Result](stmt.execute())
.concatMap(_.getRowsUpdated)
.collect(() => 0L, consumer)
.asFuture()
.map(_.intValue())(ExecutionContexts.parasitic)
}

def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit
ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] =
ec: ExecutionContext): Future[immutable.IndexedSeq[Long]] =
// connection not intended for concurrent calls, make sure statements are executed one at a time
statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Int])) { (acc, stmt) =>
statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Long])) { (acc, stmt) =>
acc.flatMap { seq =>
stmt.execute().asFuture().flatMap { res =>
res.getRowsUpdated.asFuture().map(seq :+ _.intValue())(ExecutionContexts.parasitic)
res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContexts.parasitic)
}
}
}
Expand Down Expand Up @@ -165,15 +164,15 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb
/**
* One update statement with auto commit.
*/
def updateOne(logPrefix: String)(statementFactory: Connection => Statement): Future[Int] =
def updateOne(logPrefix: String)(statementFactory: Connection => Statement): Future[Long] =
withAutoCommitConnection(logPrefix) { connection =>
updateOneInTx(statementFactory(connection))
}

/**
* Update statement that is constructed by several statements combined with `add()`.
*/
def updateInBatch(logPrefix: String)(statementFactory: Connection => Statement): Future[Int] =
def updateInBatch(logPrefix: String)(statementFactory: Connection => Statement): Future[Long] =
withConnection(logPrefix) { connection =>
updateBatchInTx(statementFactory(connection))
}
Expand All @@ -182,7 +181,7 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb
* Several update statements in the same transaction.
*/
def update(logPrefix: String)(
statementsFactory: Connection => immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Int]] =
statementsFactory: Connection => immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Long]] =
withConnection(logPrefix) { connection =>
updateInTx(statementsFactory(connection))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private[projection] class R2dbcOffsetStore(
}
}

private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Int] = {
private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Long] = {
def bindRecord(stmt: Statement, record: Record): Statement = {
val slice = persistenceExt.sliceForPersistenceId(record.pid)
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
Expand Down Expand Up @@ -796,7 +796,7 @@ private[projection] class R2dbcOffsetStore(
}
}

def deleteOldTimestampOffsets(): Future[Int] = {
def deleteOldTimestampOffsets(): Future[Long] = {
if (idle.getAndSet(true)) {
// no new offsets stored since previous delete
Future.successful(0)
Expand Down Expand Up @@ -833,7 +833,7 @@ private[projection] class R2dbcOffsetStore(
result.foreach { rows =>
logger.debug(
"Deleted [{}] timestamp offset rows until [{}] for projection [{}].",
rows: java.lang.Integer,
rows: java.lang.Long,
until,
projectionId.id)
}
Expand Down Expand Up @@ -873,7 +873,7 @@ private[projection] class R2dbcOffsetStore(
}
}

private def deleteNewTimestampOffsetsInTx(conn: Connection, timestamp: Instant): Future[Int] = {
private def deleteNewTimestampOffsetsInTx(conn: Connection, timestamp: Instant): Future[Long] = {
val currentState = getState()
if (timestamp.isAfter(currentState.latestTimestamp)) {
// nothing to delete
Expand All @@ -896,7 +896,7 @@ private[projection] class R2dbcOffsetStore(
result.foreach { rows =>
logger.debug(
"Deleted [{}] timestamp offset rows >= [{}] for projection [{}].",
rows: java.lang.Integer,
rows: java.lang.Long,
timestamp,
projectionId.id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ final class R2dbcSession(connection: Connection)(implicit ec: ExecutionContext,
def createStatement(sql: String): Statement =
connection.createStatement(sql)

def updateOne(statement: Statement): CompletionStage[Integer] =
R2dbcExecutor.updateOneInTx(statement).map(Integer.valueOf)(ExecutionContexts.parasitic).asJava
def updateOne(statement: Statement): CompletionStage[java.lang.Long] =
R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).asJava

def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[Integer]] =
R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results => results.map(Integer.valueOf).asJava).asJava
def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[java.lang.Long]] =
R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results =>
results.map(java.lang.Long.valueOf).asJava).asJava

def selectOne[A](statement: Statement)(mapRow: Row => A): CompletionStage[Optional[A]] =
R2dbcExecutor.selectOneInTx(statement, mapRow).map(_.toJava)(ExecutionContexts.parasitic).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ final class R2dbcSession(val connection: Connection)(implicit val ec: ExecutionC
def createStatement(sql: String): Statement =
connection.createStatement(sql)

def updateOne(statement: Statement): Future[Int] =
def updateOne(statement: Statement): Future[Long] =
R2dbcExecutor.updateOneInTx(statement)

def update(statements: immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Int]] =
def update(statements: immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Long]] =
R2dbcExecutor.updateInTx(statements)

def selectOne[A](statement: Statement)(mapRow: Row => A): Future[Option[A]] =
Expand Down
Loading