RRedis Handbook

ORTA

Redis Streams

Persistent, consumer group destekli event stream. Kafka-light.

Kod örneği görünümü Bu sayfadaki eşleşen örnekleri seçilen istemciye göre gösterir.

Ne Zaman Streams Kullan / Kullanma

Kullan Kullanma Gerçek Hayat
Mesaj kaybı kabul edilemez + Redis zaten varsa Günlük milyonlarca event + uzun retention (>7 gün) Sipariş: order_created → payment → shipment event zinciri
Consumer group ile iş dağıtımı (competing consumers) Partition-level ordering + exactly-once gerekli Bildirim: 3 worker arasında push notification işlemlerini paylaştır
Event replay gerekli (yeni consumer eski event'leri alsın) Multi-region replication + schema registry gerekli Audit: Yeni "fraud-detection" servisini deploy et → tüm geçmiş siparişleri replay et
Küçük-orta ölçek (<100K msg/s) Yüksek throughput (>500K msg/s) → Kafka/Pulsar Startup: RabbitMQ/Kafka cluster yönetmeden event-driven mimari kur

Streams ≠ Kafka: Redis Streams tek node memory'sinde yaşar (cluster'da shard başına). 10M+ mesaj birikirse memory patlar. MAXLEN ~10000 veya MINID ile trim zorunlu. Kafka: disk-based, TB'larca retention ucuz.

Gerçek hayat senaryosu — Sipariş işleme pipeline'ı: XADD orders:stream → 3 consumer group: (1) payment-svc ödemeyi başlatır, (2) inventory-svc stok düşer, (3) notification-svc kullanıcıya e-posta atar. Her biri bağımsız ACK eder. Biri çökerse XAUTOCLAIM ile başka worker devralır. Dead letter'a 5 retry sonrası taşınır.

# Produce
XADD orders:stream * event_type "order_created" order_id "5432" user_id "1001"

# Consumer group oluştur
XGROUP CREATE orders:stream payment-svc 0 MKSTREAM

# Consume
XREADGROUP GROUP payment-svc worker-1 COUNT 5 BLOCK 2000 STREAMS orders:stream >

# Acknowledge
XACK orders:stream payment-svc <message-id>

# Pending (ACK edilmemiş)
XPENDING orders:stream payment-svc

# Claim (timeout olan mesajları devral)
XAUTOCLAIM orders:stream payment-svc worker-2 60000 0
public class OrderStreamProducer
{
    private readonly IDatabase _redis;
    private const string StreamKey = "orders:stream";

    public OrderStreamProducer(IConnectionMultiplexer mux)
        => _redis = mux.GetDatabase();

    public async Task<string> PublishOrderEventAsync(string eventType, int orderId, int userId)
    {
        var messageId = await _redis.StreamAddAsync(StreamKey, new NameValueEntry[]
        {
            new("event_type", eventType),
            new("order_id", orderId.ToString()),
            new("user_id", userId.ToString()),
            new("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())
        });
        return messageId.ToString();
    }
}

public class OrderStreamConsumer : BackgroundService
{
    private readonly IConnectionMultiplexer _mux;
    private readonly ILogger<OrderStreamConsumer> _logger;
    private const string StreamKey = "orders:stream";
    private const string GroupName = "payment-svc";
    private readonly string _consumerName;

    public OrderStreamConsumer(IConnectionMultiplexer mux, ILogger<OrderStreamConsumer> logger)
    {
        _mux = mux;
        _logger = logger;
        _consumerName = $"worker-{Environment.MachineName}";
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var db = _mux.GetDatabase();

        // Consumer group oluştur (idempotent)
        try
        {
            await db.StreamCreateConsumerGroupAsync(StreamKey, GroupName, "0", createStream: true);
        }
        catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP"))
        {
            // Zaten var — sorun yok
        }

        while (!ct.IsCancellationRequested)
        {
            try
            {
                var entries = await db.StreamReadGroupAsync(
                    StreamKey, GroupName, _consumerName,
                    position: ">", count: 10);

                if (entries.Length == 0)
                {
                    await Task.Delay(1000, ct);
                    continue;
                }

                foreach (var entry in entries)
                {
                    try
                    {
                        await ProcessMessageAsync(entry);
                        await db.StreamAcknowledgeAsync(StreamKey, GroupName, entry.Id);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Failed to process {Id}", entry.Id);
                        // ACK yapma → XPENDING'de kalır → retry/dead-letter
                    }
                }
            }
            catch (Exception ex) when (!ct.IsCancellationRequested)
            {
                _logger.LogError(ex, "Stream read error");
                await Task.Delay(5000, ct);
            }
        }
    }

    private Task ProcessMessageAsync(StreamEntry entry)
    {
        var values = entry.Values.ToDictionary(v => v.Name.ToString(), v => v.Value.ToString());
        _logger.LogInformation("Processing {EventType} for order {OrderId}",
            values["event_type"], values["order_id"]);
        return Task.CompletedTask;
    }
}
Özellik Pub/Sub Streams
Persistence Yok Evet
Consumer Groups Yok Evet
Replay (geçmiş) Hayır Evet
Delivery At-most-once At-least-once
Use case Notification, invalidation Event sourcing, task queue
Blocking read Evet Evet
Back-pressure Yok XLEN + trim

Dead Letter & Retry Pattern

Consumer mesajı işleyemezse: retry count kontrol et, eşik aşılırsa dead letter queue'ya taşı.

Producer Stream XADD → persistent log Consumer XREADGROUP + ACK ACK ✓ Retry delivery_count++ FAIL 1/3 2/3 3/3 ⚠ retry Dead Letter Queue orders:dead-letter max retry 🔔 Alert: XLEN > 0
# Pending mesajların retry sayısını gör
# XPENDING stream group - + 10 consumer
XPENDING orders:stream payment-svc - + 10
# <id> <consumer> <idle-ms> <delivery-count>

# 3+ kez denenmiş mesajları claim et (XAUTOCLAIM — Redis 6.2+)
XAUTOCLAIM orders:stream payment-svc dlq-worker 60000 0 COUNT 10
# Idle >60s olan mesajları dlq-worker'a ata

# Dead letter'a taşı (Lua ile)
EVAL "
    local pending = redis.call('XPENDING', KEYS[1], ARGV[1], '-', '+', '100')
    for _, msg in ipairs(pending) do
        if tonumber(msg[4]) >= tonumber(ARGV[2]) then
            local data = redis.call('XRANGE', KEYS[1], msg[1], msg[1])
            if #data > 0 then
                redis.call('XADD', KEYS[2], '*', unpack(data[1][2]))
                redis.call('XACK', KEYS[1], ARGV[1], msg[1])
            end
        end
    end
" 2 orders:stream orders:dead-letter payment-svc 3
public class DeadLetterProcessor
{
    private readonly IDatabase _redis;
    private readonly ILogger<DeadLetterProcessor> _logger;
    private const string StreamKey = "orders:stream";
    private const string DeadLetterKey = "orders:dead-letter";
    private const string GroupName = "payment-svc";
    private const int MaxRetries = 3;

    public DeadLetterProcessor(IConnectionMultiplexer mux,
        ILogger<DeadLetterProcessor> logger)
    {
        _redis = mux.GetDatabase();
        _logger = logger;
    }

    // Periyodik: retry count aşılan mesajları dead letter'a taşı
    public async Task ProcessPendingAsync(CancellationToken ct)
    {
        var pending = await _redis.StreamPendingMessagesAsync(
            StreamKey, GroupName, count: 50, RedisValue.Null);

        foreach (var msg in pending)
        {
            if (msg.DeliveryCount >= MaxRetries)
            {
                // Dead letter queue'ya kopyala
                var entries = await _redis.StreamRangeAsync(
                    StreamKey, msg.MessageId, msg.MessageId, count: 1);

                if (entries.Length > 0)
                {
                    await _redis.StreamAddAsync(DeadLetterKey, entries[0].Values);
                    await _redis.StreamAcknowledgeAsync(StreamKey, GroupName, msg.MessageId);
                    _logger.LogWarning(
                        "Message {Id} moved to dead letter after {Count} retries",
                        msg.MessageId, msg.DeliveryCount);
                }
            }
            else if (msg.IdleTimeInMilliseconds > 60_000)
            {
                // 60s+ idle → başka worker'a ata (XCLAIM)
                await _redis.StreamClaimAsync(StreamKey, GroupName,
                    Environment.MachineName, 60_000,
                    new[] { msg.MessageId });
            }
        }
    }
}

Dead letter monitoring: Dead letter queue büyüyorsa alarm kur. XLEN orders:dead-letter metriğini Prometheus'a ekle.