diff --git a/native_tests/linux/tests/unit/type/stream.tcl b/native_tests/linux/tests/unit/type/stream.tcl index e8e1328eb..053965b64 100644 --- a/native_tests/linux/tests/unit/type/stream.tcl +++ b/native_tests/linux/tests/unit/type/stream.tcl @@ -372,7 +372,6 @@ start_server { $rd close } -# TODO move to java # test "Blocking XREAD for stream that ran dry (issue 5299)" { # set rd [redis_deferring_client] # @@ -432,34 +431,36 @@ start_server { # $rd close # } # -# test {XREAD with same stream name multiple times should work} { -# r XADD s2 * old abcd1234 -# set rd [redis_deferring_client] -# $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ -# wait_for_blocked_clients_count 1 -# r XADD s2 * new abcd1234 -# set res [$rd read] -# assert {[lindex $res 0 0] eq {s2}} -# assert {[lindex $res 0 1 0 1] eq {new abcd1234}} -# $rd close -# } -# -# test {XREAD + multiple XADD inside transaction} { -# r XADD s2 * old abcd1234 -# set rd [redis_deferring_client] -# $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ -# wait_for_blocked_clients_count 1 -# r MULTI -# r XADD s2 * field one -# r XADD s2 * field two -# r XADD s2 * field three -# r EXEC -# set res [$rd read] -# assert {[lindex $res 0 0] eq {s2}} -# assert {[lindex $res 0 1 0 1] eq {field one}} -# assert {[lindex $res 0 1 1 1] eq {field two}} -# $rd close -# } + test {XREAD with same stream name multiple times should work} { + r XADD s2 * old abcd1234 + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + # wait_for_blocked_clients_count 1 -> wait_for_blocked_client + wait_for_blocked_clients_count 1 + r XADD s2 * new abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s2}} + assert {[lindex $res 0 1 0 1] eq {new abcd1234}} + $rd close + } + + test {XREAD + multiple XADD inside transaction} { + r XADD s2 * old abcd1234 + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + # wait_for_blocked_clients_count 1 -> wait_for_blocked_client + wait_for_blocked_clients_count 1 + r MULTI + r XADD s2 * field one + r XADD s2 * field two + r XADD s2 * field three + r EXEC + set res [$rd read] + assert {[lindex $res 0 0] eq {s2}} + assert {[lindex $res 0 1 0 1] eq {field one}} + assert {[lindex $res 0 1 1 1] eq {field two}} + $rd close + } test {XDEL basic test} { r del somestream @@ -569,19 +570,19 @@ start_server { assert {[lindex $res 0 1 0] == {2-1 {f v}}} } -# TODO move to JAVA -# test {XREAD streamID edge (blocking)} { -# r del x -# set rd [redis_deferring_client] -# $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 -# wait_for_blocked_clients_count 1 -# r XADD x 1-1 f v -# r XADD x 1-18446744073709551615 f v -# r XADD x 2-1 f v -# set res [$rd read] -# assert {[lindex $res 0 1 0] == {2-1 {f v}}} -# $rd close -# } + test {XREAD streamID edge (blocking)} { + r del x + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + # wait_for_blocked_clients_count 1 -> wait_for_blocked_client + wait_for_blocked_clients_count 1 + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [$rd read] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + $rd close + } test {XADD streamID edge} { r del x diff --git a/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java b/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java index 150d37be6..2bc592685 100644 --- a/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java +++ b/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java @@ -153,7 +153,8 @@ && getStreamFromBaseOrCreateEmpty(entry.getKey()) return; // skip } - if (id.compareTo(map.getTail()) >= 0) { + + if (map.getTail() == null || id.compareTo(map.getTail()) >= 0) { return; // skip } diff --git a/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java b/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java new file mode 100644 index 000000000..782644b6e --- /dev/null +++ b/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java @@ -0,0 +1,156 @@ +package com.github.fppt.jedismock.comparisontests.streams; + +import com.github.fppt.jedismock.comparisontests.ComparisonBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.resps.StreamEntry; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@ExtendWith(ComparisonBase.class) +public class XReadTests { + private ExecutorService blockingThread; + private Jedis blockedClient; + + @BeforeEach + public void setUp(Jedis jedis, HostAndPort hostAndPort) { + jedis.flushAll(); + blockedClient = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); + blockingThread = Executors.newSingleThreadExecutor(); + } + + @AfterEach + public void tearDown() { + blockedClient.close(); + blockingThread.shutdownNow(); + } + + @TestTemplate + void blockingXREADforStreamThatRanDry(Jedis jedis) throws ExecutionException, InterruptedException { + jedis.xadd("s", XAddParams.xAddParams().id("666"), ImmutableMap.of("a", "b")); + jedis.xdel("s", new StreamEntryID(666)); + + jedis.xread(XReadParams.xReadParams().block(10),ImmutableMap.of("s", new StreamEntryID(665))); + + assertThatThrownBy( + () -> jedis.xadd("s", XAddParams.xAddParams().id("665"), ImmutableMap.of("a", "b")) + ) + .isInstanceOf(JedisDataException.class) + .hasMessageMatching("ERR.*equal.*smaller.*"); + + assertThatThrownBy( + () -> jedis.xadd("s", XAddParams.xAddParams().id("665"), ImmutableMap.of("a", "b")) + ) + .isInstanceOf(JedisDataException.class) + .hasMessageMatching("ERR.*equal.*smaller.*"); + + + Future future = blockingThread.submit(() -> { + List>> data = blockedClient.xread( + XReadParams.xReadParams().block(0), + ImmutableMap.of("s", new StreamEntryID(665)) + ); + + assertThat(data) + .hasSize(1) + .first() + .extracting(Map.Entry::getValue) + .asList() + .hasSize(1) + .first() + .usingRecursiveComparison() + .isEqualTo( + new StreamEntry( + new StreamEntryID(667), + ImmutableMap.of("a", "b") + ) + ); + }); + + jedis.xadd("s", XAddParams.xAddParams().id("667"), ImmutableMap.of("a", "b")); + + future.get(); + } + + @TestTemplate + void xaddWithDelShouldNotAwakeClient(Jedis jedis) throws ExecutionException, InterruptedException { + Future future = blockingThread.submit(() -> { + List>> data = blockedClient.xread( + XReadParams.xReadParams().block(3000), + ImmutableMap.of("s", StreamEntryID.LAST_ENTRY) + ); + + assertThat(data) + .hasSize(1) + .first() + .extracting(Map.Entry::getValue) + .asList() + .hasSize(1) + .first() + .usingRecursiveComparison() + .isEqualTo( + new StreamEntry( + new StreamEntryID(12), + ImmutableMap.of("new", "123") + ) + ); + }); + + jedis.xadd("s", XAddParams.xAddParams().id("11"), ImmutableMap.of("old", "123")); + jedis.del("s"); + jedis.xadd("s", XAddParams.xAddParams().id("12"), ImmutableMap.of("new", "123")); + + future.get(); + } + + + @TestTemplate + void xaddWithDelAndLpushShouldNotAwakeClient(Jedis jedis) throws ExecutionException, InterruptedException { + Future future = blockingThread.submit(() -> { + List>> data = blockedClient.xread( + XReadParams.xReadParams().block(7000), + ImmutableMap.of("s", StreamEntryID.LAST_ENTRY) + ); + + assertThat(data) + .hasSize(1) + .first() + .extracting(Map.Entry::getValue) + .asList() + .hasSize(1) + .first() + .usingRecursiveComparison() + .isEqualTo( + new StreamEntry( + new StreamEntryID(12), + ImmutableMap.of("new", "123") + ) + ); + }); + + jedis.xadd("s", XAddParams.xAddParams().id("11"), ImmutableMap.of("old", "123")); + jedis.del("s"); + jedis.lpush("s", "foo", "bar"); + jedis.del("s"); + jedis.xadd("s", XAddParams.xAddParams().id("12"), ImmutableMap.of("new", "123")); + + future.get(); + } +}