Redis Streams 通过充当类似日志的结构来支持数据流式传输,该结构实时记录和排序事件。 Redis Stream 中的每个条目都是一个键值对,具有唯一的、自动生成的时间戳 ID,确保严格的排序。生产者使用 XADD
命令将消息附加到流,消费者按顺序读取消息或通过查询一系列 ID 来读取消息。例如,传感器应用程序可以使用 XADD temperatures:stream * sensor_id 1234 temp 98.6
来记录温度数据,其中 *
自动生成条目的 ID。 Redis 在内存中维护流,从而实现快速写入和读取,并支持通过快照或仅附加文件 (AOF) 进行持久化以实现耐用性。
Redis Streams 支持消费者组来管理并行处理。消费者组允许多个消费者通过声明流的不同部分来分配工作负载。例如,一组三个处理支付交易的工作人员可以使用 XGROUP CREATE payments:stream mygroup $
创建一个组,然后使用 XREADGROUP GROUP mygroup consumer1
读取消息。每个消费者都使用 XACK
确认已处理的消息,确保除非显式重试,否则不会重新处理消息。如果消费者失败,则可以将挂起的消息重新分配给组中的其他消费者。这种设计可防止数据丢失,并能可扩展地处理高吞吐量流,而无需手动协调。
Redis Streams 还为不同的访问模式提供了灵活性。消费者可以使用阻塞操作实时读取新消息(例如,XREAD BLOCK 5000 STREAMS mystream $
最多等待 5 秒钟以获取新数据)或使用 ID 重播历史数据(例如,XRANGE mystream 1526569415632-0 +
)。流的上限长度(可以通过 XTRIM
或 XADD
中的 MAXLEN
配置)可防止无限制的内存使用。由于 Redis 是单线程的,因此流操作是原子的,避免了竞争条件。这些功能使 Redis Streams 适用于事件溯源、实时分析和活动源等用例,在这些用例中,有序、可靠和低延迟的消息处理至关重要。