Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Feat : Implements ConcurrentBatchCounter #34

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A clear and concise description of what the bug is.

**To Reproduce**
Steps to reproduce the behavior:

1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
Expand All @@ -24,15 +25,17 @@ A clear and concise description of what you expected to happen.
If applicable, add screenshots to help explain your problem.

**Desktop (please complete the following information):**
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]

- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]

**Smartphone (please complete the following information):**
- Device: [e.g. iPhone6]
- OS: [e.g. iOS8.1]
- Browser [e.g. stock browser, safari]
- Version [e.g. 22]

- Device: [e.g. iPhone6]
- OS: [e.g. iOS8.1]
- Browser [e.g. stock browser, safari]
- Version [e.g. 22]

**Additional context**
Add any other context about the problem here.
Original file line number Diff line number Diff line change
@@ -1,13 +1,81 @@
package com.thread.concurrency;

import com.thread.concurrency.counter.batch.BatchCounter;
import com.thread.concurrency.counter.batch.ConcurrentBatchingCounter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@SpringBootApplication
public class SpringThreadConcurrencyApplication {

public static void main(String[] args) {
SpringApplication.run(SpringThreadConcurrencyApplication.class, args);

MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage initialMemoryUsage = memoryMXBean.getHeapMemoryUsage();
long initialTime = System.currentTimeMillis();

// Run the test
int totalRequest = Integer.MAX_VALUE;
conditionalMultiThreading(totalRequest);

MemoryUsage finalMemoryUsage = memoryMXBean.getHeapMemoryUsage();
long finalTime = System.currentTimeMillis();
long elapsedTime = finalTime - initialTime;
long usedMemory = finalMemoryUsage.getUsed() - initialMemoryUsage.getUsed();

// request with comma
System.out.println("Total request: " + String.format("%,d", totalRequest));
// seconds
System.out.println("Elapsed time: " + elapsedTime / 1000 + " s");
// megabytes
System.out.println("Used memory: " + usedMemory / 1024 / 1024 + " MB");
}

private static void conditionalMultiThreading(int expected) {
BatchCounter counter = new ConcurrentBatchingCounter();

// given
int numberOfThreads = 128;
List<Integer> iterPerThread = range(numberOfThreads, expected);
Consumer<Integer> task = (Integer number) -> {
for (int i = 0; i < number; i++) {
counter.add(1);
}
counter.flush();
};
// when
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> futures = iterPerThread.stream().map(number -> CompletableFuture.runAsync(() -> task.accept(number), executor)).toList();
futures.forEach(CompletableFuture::join);
}
// then
assert expected == counter.show();
}

private static List<Integer> range(int numberOfThreads, int expected) {
int baseValue = expected / numberOfThreads;
int remainder = expected % numberOfThreads;

List<Integer> result = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
if (i < remainder) {
result.add(baseValue + 1);
} else {
result.add(baseValue);
}
}
return result;
}

}
38 changes: 0 additions & 38 deletions src/main/java/com/thread/concurrency/counter/BatchingCounter.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.thread.concurrency.counter.batch;

import com.thread.concurrency.counter.Counter;

public interface BatchCounter extends Counter {
void flush();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.thread.concurrency.counter.batch;

import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

@Component
public class ConcurrentBatchingCounter implements BatchCounter {

private final AtomicLong counter = new AtomicLong();
private final ConcurrentMap<Long, LongAdder> batch = new ConcurrentHashMap<>();

@Override
public void add(int value) {
var threadId = Thread.currentThread().threadId();
batch.computeIfAbsent(threadId, k -> new LongAdder()).add(value);
}

@Override
public int show() {
return counter.intValue();
}

private void flush(long threadId) {
var value = batch.remove(threadId);
if (value != null) {
counter.addAndGet(value.longValue());
}
}

@Override
public void flush() {
var threadId = Thread.currentThread().threadId();
flush(threadId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.thread.concurrency.counter.batch;

import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Profile("dev")
public class ConcurrentParameterizedBatchingCounter implements BatchCounter {

Check warning on line 14 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L14

Added line #L14 was not covered by tests

private static final int BATCH_SIZE = 100;

private final AtomicLong counter = new AtomicLong();
private final ConcurrentMap<Long, List<Integer>> batch = new ConcurrentHashMap<>();

Check warning on line 19 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L18-L19

Added lines #L18 - L19 were not covered by tests

@Override
public void add(int value) {
var threadId = Thread.currentThread().threadId();
batch.computeIfAbsent(threadId, k -> new ArrayList<>()).add(value);

Check warning on line 24 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L23-L24

Added lines #L23 - L24 were not covered by tests
if (batch.get(threadId).size() >= BATCH_SIZE) {
flush(threadId);

Check warning on line 26 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L26

Added line #L26 was not covered by tests
}
}

Check warning on line 28 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L28

Added line #L28 was not covered by tests

@Override
public int show() {
return counter.intValue();

Check warning on line 32 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L32

Added line #L32 was not covered by tests
}

private void flush(long threadId) {
var list = batch.getOrDefault(threadId, null);

Check warning on line 36 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L36

Added line #L36 was not covered by tests
if (list != null && !list.isEmpty()) {
counter.addAndGet(list.stream().mapToLong(Integer::longValue).sum());
batch.remove(threadId);

Check warning on line 39 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L38-L39

Added lines #L38 - L39 were not covered by tests
}
}

Check warning on line 41 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L41

Added line #L41 was not covered by tests

@Override
public void flush() {
var threadId = Thread.currentThread().threadId();
flush(threadId);
}

Check warning on line 47 in src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java#L45-L47

Added lines #L45 - L47 were not covered by tests
}
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
spring.application.name=spring-thread-concurrency
spring.profiles.active=default
62 changes: 23 additions & 39 deletions src/test/java/com/thread/concurrency/counter/CounterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,44 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

import static java.lang.Thread.sleep;

@SpringBootTest
public class CounterTest {

public static Stream<Counter> counterProvider() {
return Stream.of(new BatchingCounter(), new LockCounter(), new PollingCounter(), new BasicCounter());
return Stream.of(new LockCounter(), new PollingCounter());
}

private static void assertThen(Counter counter, int expectedValue, int actualValue) {
System.out.println("Expected value: " + expectedValue);
System.out.println("Actual value: " + actualValue);
if (counter instanceof BasicCounter) {
System.out.println("BasicCounter is not thread-safe");
Assertions.assertNotEquals(expectedValue, actualValue);
} else {
System.out.println("Counter is thread-safe");
Assertions.assertEquals(expectedValue, actualValue);
private static void whenAdd(Counter counter, int nThreads, int addPerThread) {
try (ExecutorService executor = Executors.newFixedThreadPool(nThreads)) {
for (int i = 0; i < nThreads; i++) {
executor.submit(() -> {
for (int j = 0; j < addPerThread; j++) {
counter.add(1);
}
});
}
}
}

@ParameterizedTest
@MethodSource("counterProvider")
public void stressTest(Counter counter) throws InterruptedException {
int initialValue = counter.show();
public void stressTest(Counter counter) {
// given
int nThreads = 100;
int nAddsPerThread = 1000;
int valueToAdd = 1;
int expectedValue = initialValue + nThreads * nAddsPerThread * valueToAdd;


// define runnable job
CountDownLatch latch = new CountDownLatch(nThreads);
Runnable job = () -> {
try {
latch.countDown(); // decrease the count
latch.await(); // wait until the count reaches 0
for (int i = 0; i < nAddsPerThread; i++) {
counter.add(valueToAdd);
}
} catch (InterruptedException ignored) {
}
};

// start nThreads threads
for (int i = 0; i < nThreads; i++) {
Thread.ofVirtual().start(job);
}
int addPerThread = 1000;
int expectedValue = counter.show() + nThreads * addPerThread;

sleep(300); // wait for all threads to finish
// when
long start = System.currentTimeMillis();
whenAdd(counter, nThreads, addPerThread);
long end = System.currentTimeMillis();

assertThen(counter, expectedValue, counter.show());
// then
Assertions.assertEquals(expectedValue, counter.show());
System.out.println("Time elapsed: " + (end - start) + "ms");
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.thread.concurrency;
package com.thread.concurrency.counter;

import com.thread.concurrency.SpringThreadConcurrencyApplication;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

@SpringBootTest
class SpringThreadConcurrencyApplicationTests {

@Test
void contextLoads() {
assertDoesNotThrow(() -> SpringThreadConcurrencyApplication.main(new String[]{}));
}

}
Loading