Skip to content

Commit

Permalink
use long values instead of int values (#101)
Browse files Browse the repository at this point in the history
* use long values instead of int values

* scalafmt
  • Loading branch information
pjfanning authored Mar 24, 2024
1 parent ec7c1ef commit 583e576
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,27 @@ import reactor.core.publisher.Mono
}
}

def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Int] =
def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] =
stmt.execute().asFuture().flatMap { result =>
result.getRowsUpdated.asFuture().map(_.intValue())(ExecutionContexts.parasitic)
result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic)
}

def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Int] = {
val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem.intValue()
def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = {
val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem
Flux
.from[Result](stmt.execute())
.concatMap(_.getRowsUpdated)
.collect(() => 0L, consumer)
.asFuture()
.map(_.intValue())(ExecutionContexts.parasitic)
}

def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit
ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] =
ec: ExecutionContext): Future[immutable.IndexedSeq[Long]] =
// connection not intended for concurrent calls, make sure statements are executed one at a time
statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Int])) { (acc, stmt) =>
statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Long])) { (acc, stmt) =>
acc.flatMap { seq =>
stmt.execute().asFuture().flatMap { res =>
res.getRowsUpdated.asFuture().map(seq :+ _.intValue())(ExecutionContexts.parasitic)
res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContexts.parasitic)
}
}
}
Expand Down Expand Up @@ -165,15 +164,15 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb
/**
* One update statement with auto commit.
*/
def updateOne(logPrefix: String)(statementFactory: Connection => Statement): Future[Int] =
def updateOne(logPrefix: String)(statementFactory: Connection => Statement): Future[Long] =
withAutoCommitConnection(logPrefix) { connection =>
updateOneInTx(statementFactory(connection))
}

/**
* Update statement that is constructed by several statements combined with `add()`.
*/
def updateInBatch(logPrefix: String)(statementFactory: Connection => Statement): Future[Int] =
def updateInBatch(logPrefix: String)(statementFactory: Connection => Statement): Future[Long] =
withConnection(logPrefix) { connection =>
updateBatchInTx(statementFactory(connection))
}
Expand All @@ -182,7 +181,7 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb
* Several update statements in the same transaction.
*/
def update(logPrefix: String)(
statementsFactory: Connection => immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Int]] =
statementsFactory: Connection => immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Long]] =
withConnection(logPrefix) { connection =>
updateInTx(statementsFactory(connection))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private[projection] class R2dbcOffsetStore(
}
}

private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Int] = {
private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Long] = {
def bindRecord(stmt: Statement, record: Record): Statement = {
val slice = persistenceExt.sliceForPersistenceId(record.pid)
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
Expand Down Expand Up @@ -796,7 +796,7 @@ private[projection] class R2dbcOffsetStore(
}
}

def deleteOldTimestampOffsets(): Future[Int] = {
def deleteOldTimestampOffsets(): Future[Long] = {
if (idle.getAndSet(true)) {
// no new offsets stored since previous delete
Future.successful(0)
Expand Down Expand Up @@ -833,7 +833,7 @@ private[projection] class R2dbcOffsetStore(
result.foreach { rows =>
logger.debug(
"Deleted [{}] timestamp offset rows until [{}] for projection [{}].",
rows: java.lang.Integer,
rows: java.lang.Long,
until,
projectionId.id)
}
Expand Down Expand Up @@ -873,7 +873,7 @@ private[projection] class R2dbcOffsetStore(
}
}

private def deleteNewTimestampOffsetsInTx(conn: Connection, timestamp: Instant): Future[Int] = {
private def deleteNewTimestampOffsetsInTx(conn: Connection, timestamp: Instant): Future[Long] = {
val currentState = getState()
if (timestamp.isAfter(currentState.latestTimestamp)) {
// nothing to delete
Expand All @@ -896,7 +896,7 @@ private[projection] class R2dbcOffsetStore(
result.foreach { rows =>
logger.debug(
"Deleted [{}] timestamp offset rows >= [{}] for projection [{}].",
rows: java.lang.Integer,
rows: java.lang.Long,
timestamp,
projectionId.id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ final class R2dbcSession(connection: Connection)(implicit ec: ExecutionContext,
def createStatement(sql: String): Statement =
connection.createStatement(sql)

def updateOne(statement: Statement): CompletionStage[Integer] =
R2dbcExecutor.updateOneInTx(statement).map(Integer.valueOf)(ExecutionContexts.parasitic).asJava
def updateOne(statement: Statement): CompletionStage[java.lang.Long] =
R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).asJava

def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[Integer]] =
R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results => results.map(Integer.valueOf).asJava).asJava
def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[java.lang.Long]] =
R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results =>
results.map(java.lang.Long.valueOf).asJava).asJava

def selectOne[A](statement: Statement)(mapRow: Row => A): CompletionStage[Optional[A]] =
R2dbcExecutor.selectOneInTx(statement, mapRow).map(_.toJava)(ExecutionContexts.parasitic).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ final class R2dbcSession(val connection: Connection)(implicit val ec: ExecutionC
def createStatement(sql: String): Statement =
connection.createStatement(sql)

def updateOne(statement: Statement): Future[Int] =
def updateOne(statement: Statement): Future[Long] =
R2dbcExecutor.updateOneInTx(statement)

def update(statements: immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Int]] =
def update(statements: immutable.IndexedSeq[Statement]): Future[immutable.IndexedSeq[Long]] =
R2dbcExecutor.updateInTx(statements)

def selectOne[A](statement: Statement)(mapRow: Row => A): Future[Option[A]] =
Expand Down

0 comments on commit 583e576

Please sign in to comment.