diff --git a/pom.xml b/pom.xml index 7f8edc3cda..d1ed03a3b6 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 4.0.0-SNAPSHOT + 4.0.x-GH-2046-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index 8dc6292220..754c2b8b29 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -81,6 +81,7 @@ * @author ihaohong * @author Dennis Neufeld * @author Shyngys Sapraliyev + * @author Jeonggyu Choi */ @NullUnmarked @SuppressWarnings({ "ConstantConditions", "deprecation" }) @@ -2921,12 +2922,26 @@ public PendingMessages xPending(String key, String groupName, String consumer, Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count) { return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, XPendingOptions options) { return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter()); diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 2af6cb326a..9a013e408a 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -49,7 +49,6 @@ import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * Stream-specific Redis commands executed using reactive infrastructure. @@ -60,6 +59,7 @@ * @author Dengliming * @author Mark John Moreno * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ public interface ReactiveStreamCommands { @@ -743,6 +743,27 @@ default Mono xPending(ByteBuffer key, String groupName, RangeRedis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, + Duration minIdleTime) { + return xPending( + Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next() + .map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -759,6 +780,24 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, + Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -779,6 +818,27 @@ default Mono xPending(ByteBuffer key, String groupName, String .next().map(CommandResponse::getOutput); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, + Long count, Duration minIdleTime) { + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count) + .minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -794,24 +854,26 @@ default Mono xPending(ByteBuffer key, String groupName, String * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ class PendingRecordsCommand extends KeyCommand { private final String groupName; - private final @Nullable String consumerName; - private final Range range; - private final @Nullable Long count; + private final XPendingOptions options; private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count) { + @Nullable Long count, @Nullable Duration minIdleTime) { + + this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); + } + + private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) { super(key); this.groupName = groupName; - this.consumerName = consumerName; - this.range = range; - this.count = count; + this.options = options; } /** @@ -822,7 +884,7 @@ private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nulla * @return new instance of {@link PendingRecordsCommand}. */ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { - return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null); + return new PendingRecordsCommand(key, groupName, XPendingOptions.unbounded()); } /** @@ -837,7 +899,7 @@ public PendingRecordsCommand range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, options.withRange(range, count)); } /** @@ -847,7 +909,21 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, options.consumer(consumerName)); + } + + /** + * Append given minimum idle time. + * + * @param minIdleTime must not be {@literal null}. + * @return new instance of {@link PendingRecordsCommand}. + * @since 3.5 + */ + public PendingRecordsCommand minIdleTime(Duration minIdleTime) { + + Assert.notNull(minIdleTime, "Idle must not be null"); + + return new PendingRecordsCommand(getKey(), groupName, options.minIdleTime(minIdleTime)); } public String getGroupName() { @@ -858,35 +934,50 @@ public String getGroupName() { * @return can be {@literal null}. */ public @Nullable String getConsumerName() { - return consumerName; + return options.getConsumerName(); } /** * @return never {@literal null}. */ public Range getRange() { - return range; + return options.getRange(); } /** * @return can be {@literal null}. */ public @Nullable Long getCount() { - return count; + return options.getCount(); + } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getMinIdleTime() { + return options.getMinIdleTime(); } /** * @return {@literal true} if a consumer name is present. */ public boolean hasConsumer() { - return StringUtils.hasText(consumerName); + return options.hasConsumer(); } /** * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null; + return options.isLimited(); + } + + /** + * @return {@literal true} if idle is set. + */ + public boolean hasMinIdleTime() { + return options.hasMinIdleTime(); } } diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index f7f1fdd065..00fed96de2 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -45,6 +45,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 * @see RedisCommands * @see Redis Documentation - Streams @@ -692,6 +693,25 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName return xPending(key, groupName, XPendingOptions.range(range, count)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Duration idle) { + return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(idle)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -709,6 +729,24 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull Consumer consume return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -728,6 +766,27 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, + Duration idle) { + return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(idle)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -746,6 +805,7 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ @NullMarked @@ -754,12 +814,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Duration minIdleTime; - private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count) { + private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, + @Nullable Duration minIdleTime) { this.range = range; this.count = count; this.consumerName = consumerName; + this.minIdleTime = minIdleTime; } /** @@ -768,7 +831,7 @@ private XPendingOptions(@Nullable String consumerName, Range range, @Nullable * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded() { - return new XPendingOptions(null, Range.unbounded(), null); + return new XPendingOptions(null, Range.unbounded(), null, null); } /** @@ -781,7 +844,7 @@ public static XPendingOptions unbounded(Long count) { Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, Range.unbounded(), count); + return new XPendingOptions(null, Range.unbounded(), count, null); } /** @@ -796,7 +859,7 @@ public static XPendingOptions range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, range, count); + return new XPendingOptions(null, range, count, null); } /** @@ -806,7 +869,27 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count); + + Assert.notNull(consumerName, "Consumer name must not be null"); + + return new XPendingOptions(consumerName, range, count, minIdleTime); + } + + /** + * Append given minimum idle time. + * + * @param minIdleTime must not be {@literal null}. + * @return new instance of {@link XPendingOptions}. + */ + public XPendingOptions minIdleTime(Duration minIdleTime) { + + Assert.notNull(minIdleTime, "Idle must not be null"); + + return new XPendingOptions(consumerName, range, count, minIdleTime); + } + + XPendingOptions withRange(Range range, Long count) { + return new XPendingOptions(consumerName, range, count, minIdleTime); } /** @@ -830,6 +913,26 @@ public Range getRange() { return consumerName; } + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getMinIdleTime() { + return minIdleTime; + } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Long getMinIdleTimeMillis() { + if (minIdleTime == null) { + return null; + } + + return minIdleTime.toMillis(); + } + /** * @return {@literal true} if a consumer name is present. */ @@ -843,6 +946,13 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null; } + + /** + * @return {@literal true} if idle time is set. + */ + public boolean hasMinIdleTime() { + return minIdleTime != null; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index c29b9c1671..8c46ad52ae 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -73,6 +73,7 @@ * @author Andrey Shlykov * @author ihaohong * @author Shyngys Sapraliyev + * @author Jeonggyu Choi * @see RedisCallback * @see RedisSerializer * @see StringRedisTemplate @@ -3140,6 +3141,35 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) { */ PendingMessagesSummary xPending(@NonNull String key, @NonNull String groupName); + // /** + // * Obtained detailed information about all pending messages for a given {@link Consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, Consumer consumer) { + // return xPending(key, consumer.getGroup(), consumer.getName()); + // } + + // /** + // * Obtained detailed information about all pending messages for a given {@literal consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + // * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, String groupName, String consumerName) { + // return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName)); + // } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3157,6 +3187,64 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) { PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNull String consumerName, org.springframework.data.domain.@NonNull Range range, @NonNull Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@literal consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration minIdleTime); + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count); + } + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3173,6 +3261,25 @@ PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNul PendingMessages xPending(@NonNull String key, @NonNull String groupName, org.springframework.data.domain.@NonNull Range range, @NonNull Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group} and over a given {@link Duration} + * of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime); + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 9d26a6cd8d..5e6ab929d6 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -48,6 +48,7 @@ /** * @author Dengliming + * @author Jeonggyu Choi * @since 2.3 */ class JedisClusterStreamCommands implements RedisStreamCommands { @@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op pendingParams = pendingParams.consumer(consumerName); } + if (options.hasMinIdleTime()) { + pendingParams = pendingParams.idle(options.getMinIdleTimeMillis()); + } + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index f0f03accbc..7cfc1e28b1 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -56,6 +56,7 @@ * * @author dengliming * @author Mark Paluch + * @author Jeonggyu Choi * @since 2.3 */ class StreamConverters { @@ -312,6 +313,9 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption if (options.hasConsumer()) { xPendingParams.consumer(options.getConsumerName()); } + if (options.hasMinIdleTime()) { + xPendingParams.idle(options.getMinIdleTimeMillis()); + } return xPendingParams; } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 37b331a9e5..857bf8d129 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; @@ -56,6 +57,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ class LettuceReactiveStreamCommands implements ReactiveStreamCommands { @@ -238,9 +240,17 @@ public Flux> xPending( io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount()) : io.lettuce.core.Limit.unlimited(); - Flux publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), - io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) - : cmd.xpending(command.getKey(), groupName, range, limit); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit); + if (command.hasConsumer()) { + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(groupName, + ByteUtils.getByteBuffer(command.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (command.hasMinIdleTime()) { + xPendingArgs.idle(command.getMinIdleTime()); + } + + Flux publisher = cmd.xpending(command.getKey(), xPendingArgs); return publisher.collectList().map(it -> { @@ -334,7 +344,7 @@ public Flux>> xInfoConsumers(P Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getGroupName(), "Command.getGroupName() must not be null"); - + ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName()); return new CommandResponse<>(command, cmd.xinfoConsumers(command.getKey(), groupName) .map(it -> new XInfoConsumer(command.getGroupName(), (List) it))); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index c80610b58d..d663f17d86 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; @@ -52,6 +53,7 @@ * @author Dejan Jankov * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ @NullUnmarked @@ -224,15 +226,17 @@ public PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName, io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from(options.getCount()) : io.lettuce.core.Limit.unlimited(); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(group, range, limit); if (options.hasConsumer()) { - - return connection.invoke() - .from(RedisStreamAsyncCommands::xpending, key, - io.lettuce.core.Consumer.from(group, LettuceConverters.toBytes(options.getConsumerName())), range, limit) - .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(group, + LettuceConverters.toBytes(options.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (options.hasMinIdleTime()) { + xPendingArgs.idle(options.getMinIdleTime()); } - return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group, range, limit) + return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs) .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 9d1c998e59..4354e7ab54 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.core; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,6 +59,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ @NullUnmarked @@ -224,6 +226,13 @@ public PendingMessages pending(@NonNull K key, @NonNull String group, @NonNull R return execute(connection -> connection.xPending(rawKey, group, range, count)); } + @Override + public PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, group, range, count, minIdleTime)); + } + @Override public PendingMessages pending(@NonNull K key, @NonNull Consumer consumer, @NonNull Range range, long count) { @@ -231,6 +240,13 @@ public PendingMessages pending(@NonNull K key, @NonNull Consumer consumer, @NonN return execute(connection -> connection.xPending(rawKey, consumer, range, count)); } + @Override + public PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, minIdleTime)); + } + @Override public PendingMessagesSummary pending(@NonNull K key, @NonNull String group) { diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 35c81c9c48..d0c09fccf3 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -46,6 +46,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ @NullUnmarked @@ -350,6 +351,22 @@ default PendingMessages pending(@NonNull K key, @NonNull Consumer consumer) { */ PendingMessages pending(@NonNull K key, @NonNull String group, @NonNull Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param group the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -364,6 +381,21 @@ default PendingMessages pending(@NonNull K key, @NonNull Consumer consumer) { */ PendingMessages pending(@NonNull K key, @NonNull Consumer consumer, @NonNull Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime); + /** * Get the length of a stream. * diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 288aa202b0..2402767239 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -116,6 +116,7 @@ * @author Shyngys Sapraliyev * @author Roman Osadchuk * @author Tihomir Mateev + * @author Jeonggyu Choi */ public abstract class AbstractConnectionIntegrationTests { @@ -4114,16 +4115,16 @@ void xPendingShouldLoadEmptyOverviewCorrectly() { assertThat(info.getPendingMessagesPerConsumer()).isEmpty(); } - @Test // DATAREDIS-1084 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessages() { + void xPendingShouldLoadPendingMessagesForConsumer() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4136,16 +4137,34 @@ public void xPendingShouldLoadPendingMessages() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1207 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldWorkWithBoundedRange() { + void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2"), Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4158,9 +4177,27 @@ public void xPendingShouldWorkWithBoundedRange() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForConsumer() { + public void xPendingShouldLoadPendingMessagesForConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); @@ -4183,7 +4220,7 @@ public void xPendingShouldLoadPendingMessagesForConsumer() { @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { + public void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); @@ -4200,9 +4237,185 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + public void xPendingShouldLoadPendingMessagesForIdle() throws InterruptedException { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Thread.sleep(50); + + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, Duration.ofMillis(1))); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdle() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForConsumerWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer-2"), + org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + public void xPendingShouldLoadPendingMessagesForIdleWithConsumer() throws InterruptedException { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Thread.sleep(50); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, Duration.ofMillis(1))); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithConsumer() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // DATAREDIS-1207 + @EnabledOnCommand("XADD") + public void xPendingShouldWorkWithBoundedRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // DATAREDIS-1084 + @EnabledOnCommand("XADD") + public void xPendingShouldLoadPendingMessagesForGroupNameWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - void xPendingShouldLoadEmptyPendingMessages() { + void xPendingShouldLoadEmptyPendingMessagesForGroupNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); @@ -4216,6 +4429,49 @@ void xPendingShouldLoadEmptyPendingMessages() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") public void xClaim() throws InterruptedException { diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java index f1205d2848..14a1249a17 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java @@ -160,6 +160,16 @@ public void xClaim() throws InterruptedException { super.xClaim(); } + @Test + @Override + @Disabled + public void xPendingShouldLoadPendingMessagesForIdle() {} + + @Test + @Override + @Disabled + public void xPendingShouldLoadPendingMessagesForIdleWithConsumer() {} + @Override protected void initConnection() { connection.openPipeline(); diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionTransactionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionTransactionIntegrationTests.java index 52e8836655..c643ae7fc8 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionTransactionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionTransactionIntegrationTests.java @@ -148,6 +148,16 @@ public void xClaim() throws InterruptedException { super.xClaim(); } + @Test + @Override + @Disabled + public void xPendingShouldLoadPendingMessagesForIdle() {} + + @Test + @Override + @Disabled + public void xPendingShouldLoadPendingMessagesForIdleWithConsumer() {} + @Override protected void initConnection() { connection.multi(); diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java index dedaf0c84d..b5445247ef 100644 --- a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -28,6 +28,7 @@ * Unit tests for {@link ReactiveStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class ReactiveStreamCommandsUnitTests { @@ -53,4 +54,14 @@ void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> command.range(range, -1L)); } + + @Test // GH-2046 + void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() { + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + + assertThatIllegalArgumentException().isThrownBy(() -> command.minIdleTime(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index 74579d57a6..dbda89b861 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -26,6 +26,7 @@ * Unit tests for {@link RedisStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class RedisStreamCommandsUnitTests { @@ -46,4 +47,11 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> XPendingOptions.range(range, -1L)); } + + @Test // GH-2046 + void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { + XPendingOptions xPendingOptions = XPendingOptions.unbounded(); + + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.minIdleTime(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java index 2e34011f4f..c452e312f6 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java @@ -173,5 +173,4 @@ public void testGetTimeShouldRequestServerTimeAsMicros() {} @Override @Disabled public void testDbSize() {} - } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java new file mode 100644 index 0000000000..5d75615fee --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java @@ -0,0 +1,30 @@ +package org.springframework.data.redis.connection.jedis; + +import static org.assertj.core.api.Assertions.*; + +import redis.clients.jedis.params.XPendingParams; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; + +/** + * @author Jeonggyu Choi + */ +class StreamConvertersUnitTest { + + @Test // GH-2046 + void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException { + XPendingOptions options = XPendingOptions.unbounded(5L).minIdleTime(Duration.of(1, ChronoUnit.HOURS)); + + XPendingParams xPendingParams = StreamConverters.toXPendingParams(options); + + Field idle = XPendingParams.class.getDeclaredField("idle"); + idle.setAccessible(true); + Long idleValue = (Long) idle.get(xPendingParams); + assertThat(idleValue).isEqualTo(Duration.of(1, ChronoUnit.HOURS).toMillis()); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index 939061be37..955cda07bd 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -18,16 +18,16 @@ import static org.assertj.core.api.Assertions.*; import io.lettuce.core.XReadArgs; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; import reactor.test.StepVerifier; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collections; import org.assertj.core.data.Offset; import org.junit.Ignore; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedClass; - import org.springframework.data.domain.Range; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.Limit; @@ -45,6 +45,7 @@ * @author Christoph Strobl * @author Tugdual Grall * @author Dengliming + * @author Jeonggyu Choi */ @ParameterizedClass @EnabledOnCommand("XADD") @@ -342,6 +343,156 @@ void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { }).verifyComplete(); } + @Test // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .delayElements(Duration.ofSeconds(1)).then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @Test // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @Test // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerNameAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.ofMillis(1); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @Test // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupNameAndConsumerNameAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @Test // GH-2046 + void xPendingShouldLoadPendingMessageesForConsumerAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.ofMillis(1); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @Test // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + @Test // DATAREDIS-1084 void xPendingShouldLoadEmptyPendingMessages() { @@ -502,8 +653,8 @@ void xClaimJustId() { StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) // .delayElements(Duration.ofMillis(5)).next() // .flatMapMany(record -> connection.streamCommands().xClaimJustId(KEY_1_BBUFFER, "my-group", "my-consumer", - XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId())) - ).as(StepVerifier::create) // + XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()))) + .as(StepVerifier::create) // .assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) // .verifyComplete(); }