EElasticsearch Handbook

TEMEL

CRUD & Bulk İşlemleri

Elasticsearch'te tek doküman işlemleri basittir ama production'da Bulk API kritik öneme sahiptir — binlerce dokümanı tek HTTP request'te indexleyebilirsiniz.

Kod örneği tercihiBu sayfadaki istemci örneklerini birlikte değiştirir.
Single Document POST _doc/ Index (create/replace) GET _doc/{id} Get by ID POST _update/{id} Partial update Bulk API (Production) POST _bulk NDJSON format 1000-5000 docs/batch 5-15 MB per request _reindex Index to Index Mapping migration Data transformation _update_by_query In-place bulk update Performans refresh_interval = -1 (bulk sırasında) replica = 0 (initial load) 10x hız artışı

Karar Rehberi

DurumÖneriÖrnek veya gerekçe
Single index Uygun: Gerçek zamanlı event Kullanıcı arama log'u
Bulk API Uygun: Toplu veri yükleme (>100 doc) Ürün kataloğu sync
_update_by_query Uygun: Koşullu toplu güncelleme Fiyat artışı: category=X
_reindex Uygun: Mapping değişikliği, index migration v1 → v2 geçişi
_delete_by_query Uygun: Koşullu toplu silme GDPR silme
refresh: wait_for Uygun: Write sonrası okuma gerekli Sipariş → arama

Bulk API (Production Pattern)

# Bulk indexleme (NDJSON format)
curl -X POST "http://localhost:9200/_bulk" -H "Content-Type: application/x-ndjson" -d'
{"index":{"_index":"products","_id":"1"}}
{"name":"Nike Air Max","category":"spor-ayakkabi","price":3499.99,"stock":42}
{"index":{"_index":"products","_id":"2"}}
{"name":"Adidas Ultraboost","category":"spor-ayakkabi","price":4299.99,"stock":28}
{"index":{"_index":"products","_id":"3"}}
{"name":"Puma RS-X","category":"sneaker","price":2799.99,"stock":55}
{"delete":{"_index":"products","_id":"old-1"}}
{"update":{"_index":"products","_id":"2"}}
{"doc":{"stock":25}}
'

# Bulk performans optimizasyonu (initial load)
curl -X PUT "http://localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": {
    "refresh_interval": "-1",
    "number_of_replicas": 0
  }
}'

# ... bulk import ...

# Ayarları geri al
curl -X PUT "http://localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": {
    "refresh_interval": "1s",
    "number_of_replicas": 1
  }
}'

# Force merge (initial load sonrası)
curl -X POST "http://localhost:9200/products/_forcemerge?max_num_segments=1"
public class BulkIndexingService
{
    private readonly ElasticsearchClient _client;
    private const int BatchSize = 2000;

    public BulkIndexingService(ElasticsearchClient client) => _client = client;

    public async Task<BulkIndexResult> BulkIndexAsync(IEnumerable<Product> products)
    {
        var allProducts = products.ToList();
        var totalIndexed = 0;
        var errors = new List<string>();

        // Disable refresh during bulk
        await _client.Indices.PutSettingsAsync("products", s => s
            .IndexSettings(i => i.RefreshInterval(-1)));

        try
        {
            foreach (var batch in allProducts.Chunk(BatchSize))
            {
                var response = await _client.BulkAsync(b =>
                {
                    foreach (var product in batch)
                    {
                        b.Index<Product>(op => op
                            .Index("products")
                            .Id(product.Id)
                            .Document(product));
                    }
                    return b;
                });

                if (response.Errors)
                {
                    foreach (var item in response.ItemsWithErrors)
                        errors.Add(item.Id + ": " + item.Error?.Reason);
                }
                totalIndexed += batch.Length;
            }
        }
        finally
        {
            await _client.Indices.PutSettingsAsync("products", s => s
                .IndexSettings(i => i.RefreshInterval(TimeSpan.FromSeconds(1))));
            await _client.Indices.RefreshAsync("products");
        }

        return new BulkIndexResult(totalIndexed, errors);
    }

    public async Task UpdatePricesByCategoryAsync(string category, decimal multiplier)
    {
        await _client.UpdateByQueryAsync("products", u => u
            .Query(q => q
                .Term(t => t
                    .Field(f => f.Category)
                    .Value(category)))
            .Script(s => s
                .Source("ctx._source.price *= " + multiplier)
                .Lang("painless")));
    }

    public async Task ReindexAsync(string sourceIndex, string targetIndex)
    {
        var response = await _client.ReindexAsync(r => r
            .Source(s => s.Index(sourceIndex))
            .Dest(d => d.Index(targetIndex))
            .WaitForCompletion(false));
        var taskId = response.Task;
    }
}

public record BulkIndexResult(int TotalIndexed, List<string> Errors);

Bulk boyut limiti: Tek bir bulk request 5-15 MB arasında olmalıdır. Çok büyük batch'ler (>50MB) memory pressure yaratır ve 429 Too Many Requests alırsınız. Çok küçük batch'ler (<100 doc) ise HTTP overhead'den dolayı yavaştır.

Örnek: Bir e-ticaret platformunda gece 02:00'de ürün kataloğu ERP'den sync edilir: 500K ürün bulk API ile indexlenir. refresh_interval=-1 yapılır, 2000'lik batch'ler halinde gönderilir, sonunda replica'lar açılır ve force merge yapılır. Toplam süre: ~3 dakika (vs tek tek indexleme: ~45 dakika).