diff --git a/pom.xml b/pom.xml
index 7f8edc3cda..d1ed03a3b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.dataspring-data-redis
- 4.0.0-SNAPSHOT
+ 4.0.x-GH-2046-SNAPSHOTSpring Data RedisSpring Data module for Redis
diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
index 8dc6292220..754c2b8b29 100644
--- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
@@ -81,6 +81,7 @@
* @author ihaohong
* @author Dennis Neufeld
* @author Shyngys Sapraliyev
+ * @author Jeonggyu Choi
*/
@NullUnmarked
@SuppressWarnings({ "ConstantConditions", "deprecation" })
@@ -2921,12 +2922,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
Converters.identityConverter());
}
+ @Override
+ public PendingMessages xPending(String key, String groupName, String consumerName,
+ org.springframework.data.domain.Range range, Long count, Duration minIdleTime) {
+ return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime),
+ Converters.identityConverter());
+ }
+
@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range,
Long count) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
}
+ @Override
+ public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range,
+ Long count, Duration minIdleTime) {
+ return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime),
+ Converters.identityConverter());
+ }
+
@Override
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());
diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
index 2af6cb326a..9a013e408a 100644
--- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
@@ -49,7 +49,6 @@
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
/**
* Stream-specific Redis commands executed using reactive infrastructure.
@@ -60,6 +59,7 @@
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
+ * @author Jeonggyu Choi
* @since 2.2
*/
public interface ReactiveStreamCommands {
@@ -743,6 +743,27 @@ default Mono xPending(ByteBuffer key, String groupName, Range
.map(CommandResponse::getOutput);
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
+ * {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+ * transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ default Mono xPending(ByteBuffer key, String groupName, Range> range, Long count,
+ Duration minIdleTime) {
+ return xPending(
+ Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next()
+ .map(CommandResponse::getOutput);
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group}.
@@ -759,6 +780,24 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range<
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+ * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ default Mono xPending(ByteBuffer key, Consumer consumer, Range> range, Long count,
+ Duration minIdleTime) {
+ return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group}.
@@ -779,6 +818,27 @@ default Mono xPending(ByteBuffer key, String groupName, String
.next().map(CommandResponse::getOutput);
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+ * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+ * when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range> range,
+ Long count, Duration minIdleTime) {
+ return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)
+ .minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput);
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
@@ -794,24 +854,26 @@ default Mono xPending(ByteBuffer key, String groupName, String
* Value Object holding parameters for obtaining pending messages.
*
* @author Christoph Strobl
+ * @author Jeonggyu Choi
* @since 2.3
*/
class PendingRecordsCommand extends KeyCommand {
private final String groupName;
- private final @Nullable String consumerName;
- private final Range> range;
- private final @Nullable Long count;
+ private final XPendingOptions options;
private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nullable String consumerName, Range> range,
- @Nullable Long count) {
+ @Nullable Long count, @Nullable Duration minIdleTime) {
+
+ this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
+ }
+
+ private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) {
super(key);
this.groupName = groupName;
- this.consumerName = consumerName;
- this.range = range;
- this.count = count;
+ this.options = options;
}
/**
@@ -822,7 +884,7 @@ private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nulla
* @return new instance of {@link PendingRecordsCommand}.
*/
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
- return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
+ return new PendingRecordsCommand(key, groupName, XPendingOptions.unbounded());
}
/**
@@ -837,7 +899,7 @@ public PendingRecordsCommand range(Range> range, Long count) {
Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");
- return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
+ return new PendingRecordsCommand(getKey(), groupName, options.withRange(range, count));
}
/**
@@ -847,7 +909,21 @@ public PendingRecordsCommand range(Range> range, Long count) {
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand consumer(String consumerName) {
- return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
+ return new PendingRecordsCommand(getKey(), groupName, options.consumer(consumerName));
+ }
+
+ /**
+ * Append given minimum idle time.
+ *
+ * @param minIdleTime must not be {@literal null}.
+ * @return new instance of {@link PendingRecordsCommand}.
+ * @since 3.5
+ */
+ public PendingRecordsCommand minIdleTime(Duration minIdleTime) {
+
+ Assert.notNull(minIdleTime, "Idle must not be null");
+
+ return new PendingRecordsCommand(getKey(), groupName, options.minIdleTime(minIdleTime));
}
public String getGroupName() {
@@ -858,35 +934,50 @@ public String getGroupName() {
* @return can be {@literal null}.
*/
public @Nullable String getConsumerName() {
- return consumerName;
+ return options.getConsumerName();
}
/**
* @return never {@literal null}.
*/
public Range> getRange() {
- return range;
+ return options.getRange();
}
/**
* @return can be {@literal null}.
*/
public @Nullable Long getCount() {
- return count;
+ return options.getCount();
+ }
+
+ /**
+ * @return can be {@literal null}.
+ */
+ @Nullable
+ public Duration getMinIdleTime() {
+ return options.getMinIdleTime();
}
/**
* @return {@literal true} if a consumer name is present.
*/
public boolean hasConsumer() {
- return StringUtils.hasText(consumerName);
+ return options.hasConsumer();
}
/**
* @return {@literal true} count is set.
*/
public boolean isLimited() {
- return count != null;
+ return options.isLimited();
+ }
+
+ /**
+ * @return {@literal true} if idle is set.
+ */
+ public boolean hasMinIdleTime() {
+ return options.hasMinIdleTime();
}
}
diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
index f7f1fdd065..00fed96de2 100644
--- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
@@ -45,6 +45,7 @@
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
+ * @author Jeonggyu Choi
* @since 2.2
* @see RedisCommands
* @see Redis Documentation - Streams
@@ -692,6 +693,25 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName
return xPending(key, groupName, XPendingOptions.range(range, count));
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
+ * {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+ * transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ default PendingMessages xPending(byte[] key, String groupName, Range> range, Long count, Duration idle) {
+ return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(idle));
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group}.
@@ -709,6 +729,24 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull Consumer consume
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+ * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ default PendingMessages xPending(byte[] key, Consumer consumer, Range> range, Long count, Duration minIdleTime) {
+ return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group}.
@@ -728,6 +766,27 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName
return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName));
}
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+ * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+ * when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range> range, Long count,
+ Duration idle) {
+ return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(idle));
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
@@ -746,6 +805,7 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName
* Value Object holding parameters for obtaining pending messages.
*
* @author Christoph Strobl
+ * @author Jeonggyu Choi
* @since 2.3
*/
@NullMarked
@@ -754,12 +814,15 @@ class XPendingOptions {
private final @Nullable String consumerName;
private final Range> range;
private final @Nullable Long count;
+ private final @Nullable Duration minIdleTime;
- private XPendingOptions(@Nullable String consumerName, Range> range, @Nullable Long count) {
+ private XPendingOptions(@Nullable String consumerName, Range> range, @Nullable Long count,
+ @Nullable Duration minIdleTime) {
this.range = range;
this.count = count;
this.consumerName = consumerName;
+ this.minIdleTime = minIdleTime;
}
/**
@@ -768,7 +831,7 @@ private XPendingOptions(@Nullable String consumerName, Range> range, @Nullable
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions unbounded() {
- return new XPendingOptions(null, Range.unbounded(), null);
+ return new XPendingOptions(null, Range.unbounded(), null, null);
}
/**
@@ -781,7 +844,7 @@ public static XPendingOptions unbounded(Long count) {
Assert.isTrue(count > -1, "Count must not be negative");
- return new XPendingOptions(null, Range.unbounded(), count);
+ return new XPendingOptions(null, Range.unbounded(), count, null);
}
/**
@@ -796,7 +859,7 @@ public static XPendingOptions range(Range> range, Long count) {
Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");
- return new XPendingOptions(null, range, count);
+ return new XPendingOptions(null, range, count, null);
}
/**
@@ -806,7 +869,27 @@ public static XPendingOptions range(Range> range, Long count) {
* @return new instance of {@link XPendingOptions}.
*/
public XPendingOptions consumer(String consumerName) {
- return new XPendingOptions(consumerName, range, count);
+
+ Assert.notNull(consumerName, "Consumer name must not be null");
+
+ return new XPendingOptions(consumerName, range, count, minIdleTime);
+ }
+
+ /**
+ * Append given minimum idle time.
+ *
+ * @param minIdleTime must not be {@literal null}.
+ * @return new instance of {@link XPendingOptions}.
+ */
+ public XPendingOptions minIdleTime(Duration minIdleTime) {
+
+ Assert.notNull(minIdleTime, "Idle must not be null");
+
+ return new XPendingOptions(consumerName, range, count, minIdleTime);
+ }
+
+ XPendingOptions withRange(Range> range, Long count) {
+ return new XPendingOptions(consumerName, range, count, minIdleTime);
}
/**
@@ -830,6 +913,26 @@ public Range> getRange() {
return consumerName;
}
+ /**
+ * @return can be {@literal null}.
+ */
+ @Nullable
+ public Duration getMinIdleTime() {
+ return minIdleTime;
+ }
+
+ /**
+ * @return can be {@literal null}.
+ */
+ @Nullable
+ public Long getMinIdleTimeMillis() {
+ if (minIdleTime == null) {
+ return null;
+ }
+
+ return minIdleTime.toMillis();
+ }
+
/**
* @return {@literal true} if a consumer name is present.
*/
@@ -843,6 +946,13 @@ public boolean hasConsumer() {
public boolean isLimited() {
return count != null;
}
+
+ /**
+ * @return {@literal true} if idle time is set.
+ */
+ public boolean hasMinIdleTime() {
+ return minIdleTime != null;
+ }
}
/**
diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
index c29b9c1671..8c46ad52ae 100644
--- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
@@ -73,6 +73,7 @@
* @author Andrey Shlykov
* @author ihaohong
* @author Shyngys Sapraliyev
+ * @author Jeonggyu Choi
* @see RedisCallback
* @see RedisSerializer
* @see StringRedisTemplate
@@ -3140,6 +3141,35 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) {
*/
PendingMessagesSummary xPending(@NonNull String key, @NonNull String groupName);
+ // /**
+ // * Obtained detailed information about all pending messages for a given {@link Consumer}.
+ // *
+ // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ // * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
+ // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ // * @see Redis Documentation: xpending
+ // * @since 3.5
+ // */
+ // @Nullable
+ // default PendingMessages xPending(String key, Consumer consumer) {
+ // return xPending(key, consumer.getGroup(), consumer.getName());
+ // }
+
+ // /**
+ // * Obtained detailed information about all pending messages for a given {@literal consumer}.
+ // *
+ // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ // * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ // * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
+ // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ // * @see Redis Documentation: xpending
+ // * @since 3.5
+ // */
+ // @Nullable
+ // default PendingMessages xPending(String key, String groupName, String consumerName) {
+ // return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
+ // }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given
* {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
@@ -3157,6 +3187,64 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) {
PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNull String consumerName,
org.springframework.data.domain.@NonNull Range range, @NonNull Long count);
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given
+ * {@link org.springframework.data.domain.Range} and {@literal consumer} within a {@literal consumer group} and over a
+ * given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+ * when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ PendingMessages xPending(String key, String groupName, String consumerName,
+ org.springframework.data.domain.Range range, Long count, Duration minIdleTime);
+
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given
+ * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group}.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range,
+ Long count) {
+ return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
+ }
+
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given
+ * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group} and over a
+ * given {@link Duration} of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range,
+ Long count, Duration minIdleTime) {
+ return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+ }
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given
* {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
@@ -3173,6 +3261,25 @@ PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNul
PendingMessages xPending(@NonNull String key, @NonNull String groupName,
org.springframework.data.domain.@NonNull Range range, @NonNull Long count);
+ /**
+ * Obtain detailed information about pending {@link PendingMessage messages} for a given
+ * {@link org.springframework.data.domain.Range} within a {@literal consumer group} and over a given {@link Duration}
+ * of idle time.
+ *
+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+ * @param range the range of messages ids to search within. Must not be {@literal null}.
+ * @param count limit the number of results. Must not be {@literal null}.
+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+ * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+ * transaction.
+ * @see Redis Documentation: xpending
+ * @since 3.5
+ */
+ @Nullable
+ PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range,
+ Long count, Duration minIdleTime);
+
/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
index 9d26a6cd8d..5e6ab929d6 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
@@ -48,6 +48,7 @@
/**
* @author Dengliming
+ * @author Jeonggyu Choi
* @since 2.3
*/
class JedisClusterStreamCommands implements RedisStreamCommands {
@@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
pendingParams = pendingParams.consumer(consumerName);
}
+ if (options.hasMinIdleTime()) {
+ pendingParams = pendingParams.idle(options.getMinIdleTimeMillis());
+ }
+
List