RRabbitMQ Handbook

UZMAN

Testing & Verification

RabbitMQ entegrasyonlarını test etmek "çalışıyor gibi görünüyor" ile "production'da kesin çalışır" arasındaki farktır. Testcontainers ile gerçek RabbitMQ instance'ı üzerinde integration test yapmak, mock'lardan çok daha güvenilirdir.

Seviye: Uzman — Integration test altyapısı ve CI/CD pipeline deneyimi gerektirir.

Test Piramidi — RabbitMQ Unit Tests (serialization, routing logic) Integration Tests (Testcontainers — gerçek RabbitMQ) E2E / Contract Tests (publish → consume → assert state)

Ne Zaman Hangi Test

Test Tipi Ne Test Eder Araç Hız
Unit Serialization, routing key generation, retry logic xUnit + Moq ⚡ ms
Integration Publish → queue → consume → ack/nack akışı Testcontainers + gerçek RabbitMQ 🐢 2-5s (container startup)
DLX Flow Rejected mesajın DLX'e düşmesi, retry count Testcontainers 🐢 3-10s
Idempotency Aynı mesajı 2x gönder, 1x işlendiğini doğrula Testcontainers + DB 🐢 5s
Load/Stress Throughput limitleri, prefetch etkileri k6 / bombardier + gerçek cluster 🐌 dakikalar
.NET — Testcontainers ile Integration Test
// NuGet: Testcontainers.RabbitMq, xUnit
public class RabbitMqIntegrationTests : IAsyncLifetime
{
    private readonly RabbitMqContainer _container = new RabbitMqBuilder()
        .WithImage("rabbitmq:4.3-management")
        .Build();

    private IConnection _connection = null!;
    private IChannel _channel = null!;

    public async Task InitializeAsync()
    {
        await _container.StartAsync();
        var factory = new ConnectionFactory
        {
            Uri = new Uri(_container.GetConnectionString())
        };
        _connection = await factory.CreateConnectionAsync();
        _channel = await _connection.CreateChannelAsync();
    }

    [Fact]
    public async Task Publish_And_Consume_ShouldDeliverMessage()
    {
        // Arrange
        await _channel.QueueDeclareAsync("test-queue", durable: false,
            exclusive: false, autoDelete: true);

        var receivedMessage = new TaskCompletionSource<string>();
        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            var body = Encoding.UTF8.GetString(ea.Body.ToArray());
            receivedMessage.SetResult(body);
            await _channel.BasicAckAsync(ea.DeliveryTag, false);
        };
        await _channel.BasicConsumeAsync("test-queue", false, consumer);

        // Act
        var body = Encoding.UTF8.GetBytes("hello-rabbitmq");
        await _channel.BasicPublishAsync("", "test-queue", body: body);

        // Assert
        var result = await receivedMessage.Task.WaitAsync(TimeSpan.FromSeconds(5));
        Assert.Equal("hello-rabbitmq", result);
    }

    [Fact]
    public async Task Nack_WithoutRequeue_ShouldRouteToDeadLetter()
    {
        // Arrange: Main queue with DLX
        await _channel.ExchangeDeclareAsync("test-dlx", ExchangeType.Direct);
        await _channel.QueueDeclareAsync("test-dead-letters", durable: false,
            exclusive: false, autoDelete: true);
        await _channel.QueueBindAsync("test-dead-letters", "test-dlx", "rejected");

        var args = new Dictionary<string, object?>
        {
            ["x-dead-letter-exchange"] = "test-dlx",
            ["x-dead-letter-routing-key"] = "rejected"
        };
        await _channel.QueueDeclareAsync("test-main", durable: false,
            exclusive: false, autoDelete: true, arguments: args);

        // DLX consumer
        var deadLetterReceived = new TaskCompletionSource<string>();
        var dlxConsumer = new AsyncEventingBasicConsumer(_channel);
        dlxConsumer.ReceivedAsync += async (_, ea) =>
        {
            deadLetterReceived.SetResult(Encoding.UTF8.GetString(ea.Body.ToArray()));
            await _channel.BasicAckAsync(ea.DeliveryTag, false);
        };
        await _channel.BasicConsumeAsync("test-dead-letters", false, dlxConsumer);

        // Main consumer: reject
        var mainConsumer = new AsyncEventingBasicConsumer(_channel);
        mainConsumer.ReceivedAsync += async (_, ea) =>
        {
            await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: false);
        };
        await _channel.BasicConsumeAsync("test-main", false, mainConsumer);

        // Act
        await _channel.BasicPublishAsync("", "test-main",
            body: Encoding.UTF8.GetBytes("fail-me"));

        // Assert: mesaj DLX queue'da olmalı
        var dlxResult = await deadLetterReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
        Assert.Equal("fail-me", dlxResult);
    }

    public async Task DisposeAsync()
    {
        await _channel.DisposeAsync();
        await _connection.DisposeAsync();
        await _container.DisposeAsync();
    }
}
.NET — Idempotency Integration Test
[Fact]
public async Task DuplicateMessage_ShouldBeProcessedOnlyOnce()
{
    // Arrange
    var messageId = Guid.NewGuid().ToString();
    var processCount = 0;

    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.ReceivedAsync += async (_, ea) =>
    {
        var msgId = ea.BasicProperties.MessageId;
        // Simulate idempotency check
        if (!_processedIds.Contains(msgId))
        {
            _processedIds.Add(msgId);
            Interlocked.Increment(ref processCount);
        }
        await _channel.BasicAckAsync(ea.DeliveryTag, false);
    };
    await _channel.BasicConsumeAsync("idempotency-test", false, consumer);

    // Act: Aynı mesajı 3 kez gönder
    var props = new BasicProperties { MessageId = messageId };
    for (int i = 0; i < 3; i++)
    {
        await _channel.BasicPublishAsync("", "idempotency-test",
            basicProperties: props, body: Encoding.UTF8.GetBytes("payment"));
    }

    await Task.Delay(1000); // Consumer'ın işlemesini bekle

    // Assert: Sadece 1 kez işlendi
    Assert.Equal(1, processCount);
}

Testcontainers Docker gerektirir: CI/CD pipeline'ında Docker-in-Docker veya Docker socket mount gerekir. GitHub Actions'da services: docker ile çalışır. Testler gerçek RabbitMQ başlattığı için unit test'lerden yavaştır (~2-5s container startup).

Edge Case Test Senaryoları

Aşağıdaki senaryolar "happy path çalışıyor ama production'da patladı" durumlarını yakalar:

Senaryo Ne Test Eder Nasıl Simüle Edersin
Broker restart mid-publish Publisher confirm timeout handling _container.StopAsync() → publish → assert retry
Consumer crash mid-processing Unacked mesaj requeue davranışı Exception throw in handler → mesajın tekrar geldiğini doğrula
Oversized message Max message size rejection 128MB+ body gönder → channel exception assert
TTL expiry → DLX Zamanında işlenmeyen mesaj DLX'e düşer x-message-ttl=100ms + slow consumer → DLX'te bul
Network partition (split-brain) pause_minority davranışı Testcontainers network disconnect → connection recovery doğrula
Duplicate delivery At-least-once altında idempotency BasicNack(requeue: true) → aynı mesajın 2x geldiğini doğrula, 1x işlendiğini assert
Queue full (max-length) Overflow policy davranışı x-max-length=5, 10 mesaj gönder → reject-publish veya drop-head doğrula
.NET — Broker Restart Recovery Test
[Fact]
public async Task PublisherConfirm_AfterBrokerRestart_ShouldRecover()
{
    // Arrange
    await _channel.ConfirmSelectAsync();
    await _channel.QueueDeclareAsync("recovery-test", durable: true,
        exclusive: false, autoDelete: false);

    // Act: Publish bir mesaj (confirm al)
    await _channel.BasicPublishAsync("", "recovery-test",
        mandatory: true, body: Encoding.UTF8.GetBytes("before-restart"));
    await _channel.WaitForConfirmsOrDieAsync(TimeSpan.FromSeconds(5));

    // Broker'ı restart et
    await _container.StopAsync();
    await _container.StartAsync();

    // Connection recovery bekle (AutomaticRecoveryEnabled=true)
    await Task.Delay(TimeSpan.FromSeconds(10));

    // Assert: Yeni connection ile durable queue'daki mesaj hâlâ orada
    var factory = new ConnectionFactory { Uri = new Uri(_container.GetConnectionString()) };
    await using var newConn = await factory.CreateConnectionAsync();
    await using var newChannel = await newConn.CreateChannelAsync();

    var result = await newChannel.BasicGetAsync("recovery-test", autoAck: true);
    Assert.NotNull(result);
    Assert.Equal("before-restart", Encoding.UTF8.GetString(result.Body.ToArray()));
}

Gerçek hayat senaryosu: Fintech ödeme sistemi CI pipeline: Her PR'da Testcontainers ile 4 integration test çalışır — publish/consume, DLX flow, idempotency, publisher confirms. Test süresi toplam 15s. Bu testler sayesinde "consumer'ı refactor ettik ama ack'ı unuttuk" gibi hatalar production'a çıkmadan yakalanır.

Contract Testing (AsyncAPI & Pact)

Integration test'ler tek servisin doğru çalıştığını doğrular. Ancak servisler arası mesaj kontratı değiştiğinde (field ekleme/silme, type değişikliği) downstream consumer'lar kırılır. Contract testing bunu CI'da yakalar.

Araç Ne Yapar Ne Zaman
AsyncAPI Mesaj şemasını (schema) dokümante eder, schema validation Event-driven API documentation + schema registry
Pact (Message) Consumer-driven contracts: consumer beklentisini tanımlar, producer doğrular Microservice'ler arası mesaj uyumluluğu
Schema Registry Avro/JSON Schema ile runtime validation Büyük ölçekli event-driven sistemler
.NET — Pact Message Contract Test (Consumer Side)
// NuGet: PactNet
// Consumer tarafı: "Ben bu mesajı bu formatta bekliyorum" tanımı
public class OrderCreatedConsumerPactTest
{
    private readonly IMessagePactBuilderV4 _pact;

    public OrderCreatedConsumerPactTest()
    {
        var config = new PactConfig { PactDir = "../../../pacts" };
        _pact = Pact.V4("OrderConsumer", "OrderService", config)
            .WithMessageInteractions();
    }

    [Fact]
    public async Task Expects_OrderCreated_WithRequiredFields()
    {
        await _pact
            .ExpectsToReceive("an OrderCreated event")
            .WithJsonContent(new
            {
                orderId = Match.Type(Guid.NewGuid()),
                amount = Match.Decimal(99.99m),
                currency = Match.Regex("TRY", "^[A-Z]{3}$"),
                timestamp = Match.Type(DateTime.UtcNow),
                items = Match.MinType(new { sku = "ABC-123", quantity = 1 }, 1)
            })
            .VerifyAsync<OrderCreatedEvent>(message =>
            {
                // Consumer handler'ının bu mesajı işleyebildiğini doğrula
                Assert.NotEqual(Guid.Empty, message.OrderId);
                Assert.True(message.Amount > 0);
                Assert.NotEmpty(message.Currency);
            });
    }
}
AsyncAPI — Mesaj Kontratı Tanımı
# asyncapi.yaml — Order Service event kontratı
asyncapi: '2.6.0'
info:
  title: Order Service Events
  version: '1.0.0'
channels:
  order.created:
    subscribe:
      operationId: onOrderCreated
      message:
        name: OrderCreated
        contentType: application/json
        payload:
          type: object
          required: [orderId, amount, currency, timestamp]
          properties:
            orderId:
              type: string
              format: uuid
            amount:
              type: number
              minimum: 0.01
            currency:
              type: string
              pattern: '^[A-Z]{3}$'
            timestamp:
              type: string
              format: date-time
            items:
              type: array
              minItems: 1
              items:
                type: object
                required: [sku, quantity]
                properties:
                  sku:
                    type: string
                  quantity:
                    type: integer
                    minimum: 1

Breaking change detection: Pact broker'a publish edilen contract'lar can-i-deploy komutu ile CI'da kontrol edilir. Producer mesaj şemasını değiştirirse ve consumer contract'ını kırarsa → pipeline FAIL. Bu sayede "field adını değiştirdik, 3 downstream servis kırıldı" senaryosu production'a çıkmaz.

Gerçek hayat senaryosu: E-ticaret platformu: Order Service OrderCreated event'ini 5 farklı consumer'a gönderir (notification, invoice, loyalty, analytics, warehouse). Pact ile her consumer kendi contract'ını tanımlar. Order Service yeni bir field eklediğinde → sorun yok (additive). Mevcut field'ı sildiğinde → Pact broker 3 consumer'ın kırıldığını bildirir → PR merge edilemez.