Skip to content

Commit

Permalink
adjusted history writing
Browse files Browse the repository at this point in the history
  • Loading branch information
sterlp committed Jan 8, 2025
1 parent 628e83d commit f70458e
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
Expand All @@ -23,18 +24,18 @@
@TransactionalService
@RequiredArgsConstructor
public class HistoryService {
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
private final TriggerService triggerService;

public Optional<TriggerHistoryLastStateEntity> findStatus(long triggerId) {
return triggerHistoryDetailRepository.findById(triggerId);
return triggerHistoryLastStateRepository.findById(triggerId);
}

public Optional<TriggerHistoryLastStateEntity> findLastKnownStatus(TriggerKey triggerKey) {
PageRequest page = PageRequest.of(0, 1).withSort(Direction.DESC, "e.data.createdTime", "id");
var result = triggerHistoryDetailRepository.listKnownStatusFor(triggerKey, page);
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0));
final var page = PageRequest.of(0, 1).withSort(Direction.DESC, "data.createdTime", "id");
final var result = triggerHistoryLastStateRepository.listKnownStatusFor(triggerKey, page);
return result.isEmpty() ? Optional.empty() : Optional.of(result.getContent().get(0));
}

public void deleteAll() {
Expand All @@ -54,14 +55,22 @@ public long countTriggers(TriggerStatus status) {
return triggerHistoryDetailRepository.countByStatus(status);
}

public List<TriggerHistoryDetailEntity> findAllForInstance(long instanceId) {
return triggerHistoryLastStateRepository.findAllByInstanceId(instanceId);
public List<TriggerHistoryDetailEntity> findAllDetailsForInstance(long instanceId) {
return triggerHistoryDetailRepository.findAllByInstanceId(instanceId);
}

public Page<TriggerHistoryDetailEntity> findAllDetailsForKey(TriggerKey key) {
return findAllDetailsForKey(key, PageRequest.of(0, 100));
}
public Page<TriggerHistoryDetailEntity> findAllDetailsForKey(TriggerKey key, Pageable page) {
page = sortByIdIfNeeded(page);
return triggerHistoryDetailRepository.listKnownStatusFor(key, page);
}

public Optional<TriggerEntity> reQueue(Long id, OffsetDateTime runAt) {
final var lastState = triggerHistoryDetailRepository.findById(id);
final var lastState = triggerHistoryLastStateRepository.findById(id);
if (lastState.isEmpty()) return Optional.empty();

final var data = lastState.get().getData();
final var trigger = lastState.get().newTaskId().newTrigger()
.state(data.getState())
Expand All @@ -74,19 +83,29 @@ public Optional<TriggerEntity> reQueue(Long id, OffsetDateTime runAt) {
}

public long countTriggers(TriggerKey key) {
return triggerHistoryDetailRepository.countByKey(key);
return triggerHistoryLastStateRepository.countByKey(key);
}

public Page<TriggerHistoryLastStateEntity> findTriggerState(
TriggerKey key, Pageable page) {
if (key == null) return triggerHistoryDetailRepository.findAll(page);
if (key.getId() == null && key.getTaskName() == null) return triggerHistoryDetailRepository.findAll(page);

page = sortByIdIfNeeded(page);
if (key == null) return triggerHistoryLastStateRepository.findAll(page);
if (key.getId() == null && key.getTaskName() == null) return triggerHistoryLastStateRepository.findAll(page);
if (key.getId() == null && key.getTaskName() != null) {
return triggerHistoryDetailRepository.findAll(key.getTaskName(), page);
return triggerHistoryLastStateRepository.findAll(key.getTaskName(), page);
}
return triggerHistoryDetailRepository.findAll(
return triggerHistoryLastStateRepository.findAll(
key.getId(),
key.getTaskName(),
page);
}

private Pageable sortByIdIfNeeded(Pageable page) {
if (page.getSort() == Sort.unsorted()) {
return PageRequest.of(page.getPageNumber(), page.getPageSize(),
Sort.by(Direction.DESC, "id"));
}
return page;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TriggerHistoryResource {
@GetMapping("history/instance/{instanceId}")
public List<Trigger> listInstances(@PathVariable("instanceId") long instanceId) {
return FromTriggerStateDetailEntity.INSTANCE.convert( //
historyService.findAllForInstance(instanceId));
historyService.findAllDetailsForInstance(instanceId));
}

@GetMapping("history")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.time.OffsetDateTime;

import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.context.event.EventListener;
import org.springframework.transaction.annotation.Transactional;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository;
Expand All @@ -18,25 +18,26 @@
@RequiredArgsConstructor
public class TriggerHistoryComponent {

private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;

public void write(TriggerEntity e) {
var state = new TriggerHistoryLastStateEntity();
state.setId(e.getId());
state.setData(e.getData().toBuilder().build());
triggerHistoryDetailRepository.save(state);
triggerHistoryLastStateRepository.save(state);

var detail = new TriggerHistoryDetailEntity();
detail.setInstanceId(e.getId());
detail.setData(e.getData().toBuilder()
.state(null)
.build());
detail.getData().setCreatedTime(OffsetDateTime.now());
triggerHistoryLastStateRepository.save(detail);
triggerHistoryDetailRepository.save(detail);
}

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
@Transactional
@EventListener
public void onPersistentTaskEvent(TriggerLifeCycleEvent triggerLifeCycleEvent) {
write(triggerLifeCycleEvent.trigger());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.sterl.spring.persistent_tasks.history.repository;

import java.util.List;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.NoRepositoryBean;
Expand All @@ -17,5 +16,5 @@ public interface HistoryTriggerRepository<T extends HasTriggerData> extends Trig
SELECT e FROM #{#entityName} e
WHERE e.data.key = :key
""")
List<T> listKnownStatusFor(@Param("key") TriggerKey key, Pageable page);
Page<T> listKnownStatusFor(@Param("key") TriggerKey key, Pageable page);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,40 @@
package org.sterl.spring.persistent_tasks.history.repository;

import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import java.util.List;

public interface TriggerHistoryDetailRepository extends HistoryTriggerRepository<TriggerHistoryLastStateEntity> {
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.sterl.spring.persistent_tasks.api.HistoryOverview;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;

public interface TriggerHistoryDetailRepository extends HistoryTriggerRepository<TriggerHistoryDetailEntity> {

@Query("""
SELECT new org.sterl.spring.persistent_tasks.api.HistoryOverview(
e.instanceId,
e.data.key.taskName,
count(1) as entryCount,
MIN(e.data.start) as start,
MAX(e.data.end) as end,
MIN(e.data.createdTime) as createdTime,
MAX(e.data.executionCount) as executionCount,
AVG(e.data.runningDurationInMs) as runningDurationInMs
)
FROM #{#entityName} e
GROUP BY
e.instanceId,
e.data.key.taskName
ORDER BY end DESC, createdTime DESC
""")
Page<HistoryOverview> listHistoryOverview(Pageable page);

@Query("""
SELECT e
FROM #{#entityName} e
WHERE e.instanceId = :instanceId
ORDER BY e.id DESC
""")
List<TriggerHistoryDetailEntity> findAllByInstanceId(@Param("instanceId") long instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,9 @@
import org.springframework.data.repository.query.Param;
import org.sterl.spring.persistent_tasks.api.HistoryOverview;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;

public interface TriggerHistoryLastStateRepository extends HistoryTriggerRepository<TriggerHistoryDetailEntity> {
public interface TriggerHistoryLastStateRepository extends HistoryTriggerRepository<TriggerHistoryLastStateEntity> {

@Query("""
SELECT new org.sterl.spring.persistent_tasks.api.HistoryOverview(
e.instanceId,
e.data.key.taskName,
count(1) as entryCount,
MIN(e.data.start) as start,
MAX(e.data.end) as end,
MIN(e.data.createdTime) as createdTime,
MAX(e.data.executionCount) as executionCount,
AVG(e.data.runningDurationInMs) as runningDurationInMs
)
FROM #{#entityName} e
GROUP BY
e.instanceId,
e.data.key.taskName
ORDER BY end DESC, createdTime DESC
""")
Page<HistoryOverview> listHistoryOverview(Pageable page);

@Query("""
SELECT e
FROM #{#entityName} e
WHERE e.instanceId = :instanceId
ORDER BY e.data.createdTime ASC
""")
List<TriggerHistoryDetailEntity> findAllByInstanceId(@Param("instanceId") long instanceId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ SELECT COUNT(e.data.key)
long countByKey(@Param("key") TriggerKey key);

@Query("""
SELECT COUNT(e.data.key)
SELECT COUNT(e.id)
FROM #{#entityName} e
WHERE e.data.status = :status
""")
long countByStatus(@Param("status") TriggerStatus status);

@Query("""
SELECT COUNT(e.data.key)
SELECT COUNT(e.id)
FROM #{#entityName} e
WHERE e.data.status IN ( :status )
""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
@RequiredArgsConstructor
public class ReadTriggerComponent {
private final TriggerRepository triggerRepository;


public long countByTaskName(@NotNull String name) {
return triggerRepository.countByTaskName(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ protected Optional<TriggerEntity> runNextTrigger() {

protected void awaitRunningTasks() throws TimeoutException, InterruptedException {
final long start = System.currentTimeMillis();
if (triggerService.countTriggers(TriggerStatus.RUNNING) > 0) {
while (triggerService.countTriggers(TriggerStatus.RUNNING) > 0) {
if (System.currentTimeMillis() - start > 2000) {
throw new TimeoutException("Still running after 2s");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.sterl.spring.persistent_tasks.AbstractSpringTest;
import org.sterl.spring.persistent_tasks.AbstractSpringTest.TaskConfig.Task3;
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;

class HistoryServiceTest extends AbstractSpringTest {
Expand All @@ -35,4 +38,20 @@ void testReQueueTrigger() {
// AND
assertThat(subject.countTriggers(trigger.getKey())).isEqualTo(2);
}

@Test
void testTriggerHistory() throws TimeoutException, InterruptedException {
// GIVEN
final var trigger = Task3.ID.newUniqueTrigger("Hallo");
triggerService.queue(trigger);
persistentTaskService.executeTriggersAndWait();
// WHEN
var triggers = subject.findAllDetailsForKey(trigger.key(), PageRequest.of(0, 100)).getContent();

// AND
assertThat(triggers).hasSize(3);
assertThat(triggers.get(0).getData().getStatus()).isEqualTo(TriggerStatus.SUCCESS);
assertThat(triggers.get(1).getData().getStatus()).isEqualTo(TriggerStatus.RUNNING);
assertThat(triggers.get(2).getData().getStatus()).isEqualTo(TriggerStatus.WAITING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;

class TriggerHistoryLastStateRepositoryTest extends AbstractSpringTest {
class TriggerHistoryDetailRepositoryTest extends AbstractSpringTest {

@Autowired private TriggerHistoryLastStateRepository subject;
@Autowired private TriggerHistoryDetailRepository subject;

@Test
void testGrouping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ TransactionalTask<String> savePersonInTrx(PersonRepository personRepository) {
return new TransactionalTask<String>() {
@Override
public void accept(String name) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
personRepository.save(new PersonBE(name));
if (sendError.get()) {
throw new RuntimeException("Error requested for " + name);
Expand Down Expand Up @@ -85,7 +91,7 @@ void testSaveTransactions() throws Exception {

// THEN
// AND one the service, one the event and one more status update
hibernateAsserts.assertTrxCount(3);
hibernateAsserts.assertTrxCount(4);
assertThat(personRepository.count()).isOne();
}

Expand Down Expand Up @@ -152,8 +158,10 @@ void testRollbackAndRetry() throws Exception {

// THEN
awaitRunningTasks();
assertThat(persistentTaskService.getLastTriggerData(key).get().getStatus())
.isEqualTo(TriggerStatus.WAITING);
// AND the last status before we are back to running should be FAILED
assertThat(historyService.findAllDetailsForKey(key)
.getContent().get(0).getData().getStatus())
.isEqualTo(TriggerStatus.FAILED);

// WHEN
sendError.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void testRequiresNewHasOwnTransaction() {
triggerService.run(t).get();

// THEN
hibernateAsserts.assertTrxCount(3);
hibernateAsserts.assertTrxCount(4);
assertThat(personRepository.count()).isEqualTo(1);
}

Expand Down

0 comments on commit f70458e

Please sign in to comment.