Skip to content

Commit

Permalink
[streams] XREAD tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Iван committed Dec 23, 2023
1 parent 5b4b7ea commit 2145348
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 43 deletions.
85 changes: 43 additions & 42 deletions native_tests/linux/tests/unit/type/stream.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ start_server {
$rd close
}

# TODO move to java
# test "Blocking XREAD for stream that ran dry (issue 5299)" {
# set rd [redis_deferring_client]
#
Expand Down Expand Up @@ -432,34 +431,36 @@ start_server {
# $rd close
# }
#
# test {XREAD with same stream name multiple times should work} {
# r XADD s2 * old abcd1234
# set rd [redis_deferring_client]
# $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
# wait_for_blocked_clients_count 1
# r XADD s2 * new abcd1234
# set res [$rd read]
# assert {[lindex $res 0 0] eq {s2}}
# assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
# $rd close
# }
#
# test {XREAD + multiple XADD inside transaction} {
# r XADD s2 * old abcd1234
# set rd [redis_deferring_client]
# $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
# wait_for_blocked_clients_count 1
# r MULTI
# r XADD s2 * field one
# r XADD s2 * field two
# r XADD s2 * field three
# r EXEC
# set res [$rd read]
# assert {[lindex $res 0 0] eq {s2}}
# assert {[lindex $res 0 1 0 1] eq {field one}}
# assert {[lindex $res 0 1 1 1] eq {field two}}
# $rd close
# }
test {XREAD with same stream name multiple times should work} {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
# wait_for_blocked_clients_count 1 -> wait_for_blocked_client
wait_for_blocked_clients_count 1
r XADD s2 * new abcd1234
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
$rd close
}

test {XREAD + multiple XADD inside transaction} {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
# wait_for_blocked_clients_count 1 -> wait_for_blocked_client
wait_for_blocked_clients_count 1
r MULTI
r XADD s2 * field one
r XADD s2 * field two
r XADD s2 * field three
r EXEC
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
assert {[lindex $res 0 1 0 1] eq {field one}}
assert {[lindex $res 0 1 1 1] eq {field two}}
$rd close
}

test {XDEL basic test} {
r del somestream
Expand Down Expand Up @@ -569,19 +570,19 @@ start_server {
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
}

# TODO move to JAVA
# test {XREAD streamID edge (blocking)} {
# r del x
# set rd [redis_deferring_client]
# $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
# wait_for_blocked_clients_count 1
# r XADD x 1-1 f v
# r XADD x 1-18446744073709551615 f v
# r XADD x 2-1 f v
# set res [$rd read]
# assert {[lindex $res 0 1 0] == {2-1 {f v}}}
# $rd close
# }
test {XREAD streamID edge (blocking)} {
r del x
set rd [redis_deferring_client]
$rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
# wait_for_blocked_clients_count 1 -> wait_for_blocked_client
wait_for_blocked_clients_count 1
r XADD x 1-1 f v
r XADD x 1-18446744073709551615 f v
r XADD x 2-1 f v
set res [$rd read]
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
$rd close
}

test {XADD streamID edge} {
r del x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ && getStreamFromBaseOrCreateEmpty(entry.getKey())
return; // skip
}

if (id.compareTo(map.getTail()) >= 0) {

if (map.getTail() == null || id.compareTo(map.getTail()) >= 0) {
return; // skip
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.github.fppt.jedismock.comparisontests.streams;

import com.github.fppt.jedismock.comparisontests.ComparisonBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@ExtendWith(ComparisonBase.class)
public class XReadTests {
private ExecutorService blockingThread;
private Jedis blockedClient;

@BeforeEach
public void setUp(Jedis jedis, HostAndPort hostAndPort) {
jedis.flushAll();
blockedClient = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
blockingThread = Executors.newSingleThreadExecutor();
}

@AfterEach
public void tearDown() {
blockedClient.close();
blockingThread.shutdownNow();
}

@TestTemplate
void blockingXREADforStreamThatRanDry(Jedis jedis) throws ExecutionException, InterruptedException {
jedis.xadd("s", XAddParams.xAddParams().id("666"), ImmutableMap.of("a", "b"));
jedis.xdel("s", new StreamEntryID(666));

jedis.xread(XReadParams.xReadParams().block(10),ImmutableMap.of("s", new StreamEntryID(665)));

assertThatThrownBy(
() -> jedis.xadd("s", XAddParams.xAddParams().id("665"), ImmutableMap.of("a", "b"))
)
.isInstanceOf(JedisDataException.class)
.hasMessageMatching("ERR.*equal.*smaller.*");

assertThatThrownBy(
() -> jedis.xadd("s", XAddParams.xAddParams().id("665"), ImmutableMap.of("a", "b"))
)
.isInstanceOf(JedisDataException.class)
.hasMessageMatching("ERR.*equal.*smaller.*");


Future<?> future = blockingThread.submit(() -> {
List<Map.Entry<String, List<StreamEntry>>> data = blockedClient.xread(
XReadParams.xReadParams().block(0),
ImmutableMap.of("s", new StreamEntryID(665))
);

assertThat(data)
.hasSize(1)
.first()
.extracting(Map.Entry::getValue)
.asList()
.hasSize(1)
.first()
.usingRecursiveComparison()
.isEqualTo(
new StreamEntry(
new StreamEntryID(667),
ImmutableMap.of("a", "b")
)
);
});

jedis.xadd("s", XAddParams.xAddParams().id("667"), ImmutableMap.of("a", "b"));

future.get();
}

@TestTemplate
void xaddWithDelShouldNotAwakeClient(Jedis jedis) throws ExecutionException, InterruptedException {
Future<?> future = blockingThread.submit(() -> {
List<Map.Entry<String, List<StreamEntry>>> data = blockedClient.xread(
XReadParams.xReadParams().block(3000),
ImmutableMap.of("s", StreamEntryID.LAST_ENTRY)
);

assertThat(data)
.hasSize(1)
.first()
.extracting(Map.Entry::getValue)
.asList()
.hasSize(1)
.first()
.usingRecursiveComparison()
.isEqualTo(
new StreamEntry(
new StreamEntryID(12),
ImmutableMap.of("new", "123")
)
);
});

jedis.xadd("s", XAddParams.xAddParams().id("11"), ImmutableMap.of("old", "123"));
jedis.del("s");
jedis.xadd("s", XAddParams.xAddParams().id("12"), ImmutableMap.of("new", "123"));

future.get();
}


@TestTemplate
void xaddWithDelAndLpushShouldNotAwakeClient(Jedis jedis) throws ExecutionException, InterruptedException {
Future<?> future = blockingThread.submit(() -> {
List<Map.Entry<String, List<StreamEntry>>> data = blockedClient.xread(
XReadParams.xReadParams().block(7000),
ImmutableMap.of("s", StreamEntryID.LAST_ENTRY)
);

assertThat(data)
.hasSize(1)
.first()
.extracting(Map.Entry::getValue)
.asList()
.hasSize(1)
.first()
.usingRecursiveComparison()
.isEqualTo(
new StreamEntry(
new StreamEntryID(12),
ImmutableMap.of("new", "123")
)
);
});

jedis.xadd("s", XAddParams.xAddParams().id("11"), ImmutableMap.of("old", "123"));
jedis.del("s");
jedis.lpush("s", "foo", "bar");
jedis.del("s");
jedis.xadd("s", XAddParams.xAddParams().id("12"), ImmutableMap.of("new", "123"));

future.get();
}
}

0 comments on commit 2145348

Please sign in to comment.