Not içeriği yükleniyor...
// Markdown dosyası okunuyor
// İçerik işleniyor
// Syntax highlighting hazırlanıyor
// Markdown dosyası okunuyor
// İçerik işleniyor
// Syntax highlighting hazırlanıyor
Redis Streams ile real-time data processing, message passing ve event sourcing. XADD, XREAD komutları ile stream-based architectures.
Özellik | Açıklama | Örnek |
---|---|---|
Append-only structure | Immutable, sadece ekleme yapılabilen log yapısı | Events sonradan değiştirilemez |
Unique message IDs | Her message otomatik timestamp-sequence ID alır | 1642678800000-0 (timestamp-sequence) |
Consumer groups | Distributed message processing ve load balancing | XGROUP CREATE stream group $ MKSTREAM |
Range queries | Zaman bazlı message range'leri okuyabilme | XRANGE events 1640995200000 1640998800000 |
Blocking operations | Real-time event listening ve streaming | XREAD BLOCK 0 STREAMS events $ |
Message acknowledgment | Delivered message tracking ve redelivery | XACK group consumer message-id |
Persistent storage | Message'lar disk'te persist edilir | Restart sonrası messages korunur |
Memory optimization | Configurable trimming ve automatic cleanup | XTRIM events MAXLEN ~ 1000 |
Kullanım Alanı | Açıklama | Redis Komutu Örneği |
---|---|---|
Event sourcing | Domain events'lerin sequential saklanması | XADD events * event_type "user_created" |
Message queues | Asynchronous task processing | XADD tasks * task_type "send_email" |
Real-time analytics | Live data streaming ve processing | XADD metrics * cpu_usage "75.5" |
IoT data collection | Sensor verilerinin continuous collection | XADD sensors * temperature "23.5" |
Audit trails | System activity logging ve compliance | XADD audit * action "user_login" |
Chat applications | Real-time messaging systems | XADD chat:room123 * message "Hello" |
Financial transactions | Transaction log ve reconciliation | XADD transactions * amount "100.50" |
Game events | Player actions ve game state changes | XADD game:events * action "level_up" |
Log aggregation | Centralized logging from multiple services | XADD logs * service "api" level "error" |
CDC (Change Data Capture) | Database change tracking | XADD changes * table "users" op "UPDATE" |
XRANGE
ve XREVRANGE
komutları ile belirli zaman aralıklarındaki message'ları millisecond precision ile çekebilirsiniz. Bu özellik, time-series analytics, historical data analysis ve event replay senaryoları için muazzam avantaj sağlar. Örneğin, "son 1 saatteki tüm error eventleri" veya "belirli user'ın bugünkü tüm aktiviteleri" gibi sorguları nanosecond seviyesinde hızla çözebilirsiniz.Stream
, time-ordered, append-only log yapısıdır. Her entry (message) bir unique ID ve field-value çiftlerinden oluşur. Message ID'ler otomatik olarak timestamp-sequence
formatında generate edilir ve chronological ordering garanti edilir. Stream'ler immutable'dır, yani eklenen message'lar sonradan değiştirilemez.Komut | Açıklama | Örnek |
---|---|---|
XADD | Stream'e yeni message ekler | XADD events * type "login" |
XREAD | Stream'den message'ları okur | XREAD STREAMS events 0 |
XRANGE | Belirli ID range'indeki message'ları getirir | XRANGE events - + |
XREVRANGE | Reverse chronological order'da message'lar | XREVRANGE events + - COUNT 10 |
XLEN | Stream'deki message sayısını verir | XLEN events |
XGROUP | Consumer group'ları yönetir | XGROUP CREATE events group1 $ |
XREADGROUP | Consumer group ile message okur | XREADGROUP GROUP group1 c1 |
XACK | Message'ı acknowledge eder | XACK events group1 msg-id |
XPENDING | Acknowledge bekleyen message'ları listeler | XPENDING events group1 |
XTRIM | Stream'i belirli boyuta kırpar | XTRIM events MAXLEN 1000 |
XINFO | Stream hakkında bilgi verir | XINFO STREAM events |
XDEL | Message'ları siler | XDEL events message-id |
text127.0.0.1:6379> XADD events * event_type "user_login" user_id "123" timestamp "2024-01-20T10:30:00Z" // Events stream'ine login eventi ekliyoruz. "1642678200000-0" // Otomatik generate edilen unique message ID. 127.0.0.1:6379> XADD events * event_type "page_view" user_id "456" page "/homepage" // Page view eventi ekliyoruz. "1642678201000-0" // Yeni message ID (timestamp arttı). 127.0.0.1:6379> XADD events 1642678202000-0 event_type "purchase" user_id "123" amount "99.99" // Manuel ID ile message ekliyoruz. "1642678202000-0" // Belirtilen ID kullanıldı. 127.0.0.1:6379> XADD metrics * cpu_usage "75.5" memory_usage "60.2" disk_usage "45.8" // Metrics stream'ine sistem bilgileri. "1642678203000-0" // Metrics için yeni message ID. 127.0.0.1:6379> XADD chat:room1 * user "alice" message "Hello everyone!" timestamp "2024-01-20T10:35:00Z" // Chat message'ı. "1642678500000-0" // Chat stream'inde message ID.
*
kullandığınızda Redis otomatik olarak current timestamp ile unique ID generate eder.text127.0.0.1:6379> 127.0.0.1:6379> XADD events 1000000000000-0 type "test" // Manuel olarak ID belirtiyoruz. 127.0.0.1:6379>
millisecond_timestamp-sequence_number
text127.0.0.1:6379> 127.0.0.1:6379> XADD orders * customer_id "C123" product "laptop" quantity "2" total "2999.98" 127.0.0.1:6379>
XADD
komutu ile stream otomatik olarak oluşturulur.bash127.0.0.1:6379> XREAD STREAMS events 0 // events stream'ini baştan okuyoruz. 1) 1) "events" // Stream adı. 2) 1) 1) "1642678200000-0" // Message ID. 2) 1) "event_type" // Field 1. 2) "user_login" // Value 1. 3) "user_id" // Field 2. 4) "123" // Value 2. 5) "timestamp" // Field 3. 6) "2024-01-20T10:30:00Z" // Value 3. 2) 1) "1642678201000-0" // İkinci message ID. 2) 1) "event_type" 2) "page_view" 3) "user_id" 4) "456" 5) "page" 6) "/homepage" 127.0.0.1:6379> XREAD COUNT 1 STREAMS events 0 // Sadece 1 message okuyoruz. 1) 1) "events" 2) 1) 1) "1642678200000-0" 2) 1) "event_type" 2) "user_login" 3) "user_id" 4) "123" 5) "timestamp" 6) "2024-01-20T10:30:00Z" 127.0.0.1:6379> XREAD BLOCK 5000 STREAMS events $ // 5 saniye boyunca yeni message'ları bekle. (nil) // 5 saniyede yeni message gelmedi. 127.0.0.1:6379> XREAD STREAMS events metrics 0 0 // Birden fazla stream'i aynı anda okuyoruz. 1) 1) "events" 2) 1) 1) "1642678200000-0" 2) 1) "event_type" 2) "user_login" ... (events stream messages) 2) 1) "metrics" 2) 1) 1) "1642678203000-0" 2) 1) "cpu_usage" 2) "75.5" ... (metrics stream messages)
0
= en baştan oku, $
= en son message'dan sonra oku (yeni message'ları bekle).bash127.0.0.1:6379> 127.0.0.1:6379> XREAD BLOCK 0 STREAMS events $ // Sonsuza kadar yeni message bekle. 127.0.0.1:6379>
text127.0.0.1:6379> 127.0.0.1:6379> XREAD COUNT 5 STREAMS events 0 // Maksimum 5 message oku. 127.0.0.1:6379>
text127.0.0.1:6379> XRANGE events - + // Tüm message'ları kronolojik sırada getir. 1) 1) "1642678200000-0" 2) 1) "event_type" 2) "user_login" 3) "user_id" 4) "123" 5) "timestamp" 6) "2024-01-20T10:30:00Z" 2) 1) "1642678201000-0" 2) 1) "event_type" 2) "page_view" 3) "user_id" 4) "456" 5) "page" 6) "/homepage" 127.0.0.1:6379> XRANGE events 1642678200000 1642678201000 // Belirli timestamp aralığı. 1) 1) "1642678200000-0" 2) 1) "event_type" 2) "user_login" ... (message details) 2) 1) "1642678201000-0" 2) 1) "event_type" 2) "page_view" ... (message details) 127.0.0.1:6379> XRANGE events - + COUNT 2 // İlk 2 message'ı getir. 1) 1) "1642678200000-0" ... (first message) 2) 1) "1642678201000-0" ... (second message) 127.0.0.1:6379> XREVRANGE events + - COUNT 1 // En son message'ı getir (reverse order). 1) 1) "1642678202000-0" 2) 1) "event_type" 2) "purchase" 3) "user_id" 4) "123" 5) "amount" 6) "99.99"
-
= stream'in başlangıcı, +
= stream'in sonu.text127.0.0.1:6379> 127.0.0.1:6379> XRANGE events (1642678200000 1642678202000 // Exclusive range (1642678200000 hariç). 127.0.0.1:6379>
bash127.0.0.1:6379> XGROUP CREATE events processors $ MKSTREAM // "processors" consumer group'u oluşturuyoruz. OK // Group başarıyla oluşturuldu. 127.0.0.1:6379> XADD events * task_type "send_email" recipient "user@example.com" priority "high" // Yeni task ekliyoruz. "1642678300000-0" 127.0.0.1:6379> XREADGROUP GROUP processors worker1 COUNT 1 STREAMS events > // worker1 olarak message okuyoruz. 1) 1) "events" 2) 1) 1) "1642678300000-0" 2) 1) "task_type" 2) "send_email" 3) "recipient" 4) "user@example.com" 5) "priority" 6) "high" 127.0.0.1:6379> XADD events * task_type "generate_report" user_id "789" format "pdf" // Başka bir task. "1642678301000-0" 127.0.0.1:6379> XREADGROUP GROUP processors worker2 COUNT 1 STREAMS events > // worker2 olarak okuyoruz. 1) 1) "events" 2) 1) 1) "1642678301000-0" 2) 1) "task_type" 2) "generate_report" 3) "user_id" 4) "789" 5) "format" 6) "pdf" 127.0.0.1:6379> XACK events processors 1642678300000-0 // worker1'in task'ını acknowledge ediyoruz. (integer) 1 // 1 message acknowledge edildi. 127.0.0.1:6379> XPENDING events processors // Pending (acknowledge edilmemiş) message'ları kontrol ediyoruz. 1) (integer) 1 // 1 pending message var. 2) "1642678301000-0" // En eski pending message ID. 3) "1642678301000-0" // En yeni pending message ID. 4) 1) 1) "worker2" // Pending message'ı alan consumer. 2) "1" // Bu consumer'ın pending message sayısı.
$
ile group'u stream'in en sonundan başlatıyoruz (sadece yeni message'ları okur).XACK
ile message'ın işlendiğini belirtmek zorundasınız.XPENDING
ile takip edilir.text127.0.0.1:6379> 127.0.0.1:6379> XCLAIM events processors worker1 300000 1642678301000-0 // 5 dakika sonra message'ı başka worker'a transfer et. 127.0.0.1:6379>
text127.0.0.1:6379> XLEN events // events stream'indeki message sayısı. (integer) 4 // 4 message var. 127.0.0.1:6379> XADD events * test_type "load_test" // Load test için çok sayıda message ekliyoruz. 127.0.0.1:6379> XADD events * test_type "load_test" 127.0.0.1:6379> XADD events * test_type "load_test" 127.0.0.1:6379> XLEN events // Güncel message sayısı. (integer) 7 127.0.0.1:6379> XTRIM events MAXLEN 5 // Stream'i 5 message ile sınırla. (integer) 2 // 2 eski message silindi. 127.0.0.1:6379> XLEN events // Trimming sonrası message sayısı. (integer) 5 127.0.0.1:6379> XTRIM events MAXLEN ~ 3 // Approximately 3 message kalsın (~: yaklaşık). (integer) 2 // 2 message daha silindi. 127.0.0.1:6379> XLEN events (integer) 3
text127.0.0.1:6379> 127.0.0.1:6379> XADD events MAXLEN ~ 1000 * type "auto_trim" // Add sırasında otomatik trim. 127.0.0.1:6379>
bash127.0.0.1:6379> XGROUP CREATE orders order_processors $ MKSTREAM // Order processing group'u oluşturuyoruz. OK 127.0.0.1:6379> XADD orders * order_id "ORD001" customer_id "C123" total "299.99" status "pending" // Yeni order. "1642678400000-0" 127.0.0.1:6379> XADD orders * order_id "ORD002" customer_id "C456" total "149.50" status "pending" "1642678401000-0" 127.0.0.1:6379> XREADGROUP GROUP order_processors payment_service COUNT 1 STREAMS orders > // Payment service order alıyor. 1) 1) "orders" 2) 1) 1) "1642678400000-0" 2) 1) "order_id" 2) "ORD001" 3) "customer_id" 4) "C123" 5) "total" 6) "299.99" 7) "status" 8) "pending" # Payment service order'ı işledikten sonra: 127.0.0.1:6379> XACK orders order_processors 1642678400000-0 // Order acknowledge ediliyor. (integer) 1 127.0.0.1:6379> XADD orders * order_id "ORD001" customer_id "C123" total "299.99" status "paid" // Payment completed eventi. "1642678402000-0" 127.0.0.1:6379> XREADGROUP GROUP order_processors inventory_service COUNT 1 STREAMS orders > // Inventory service sıradaki order'ı alıyor. 1) 1) "orders" 2) 1) 1) "1642678401000-0" 2) 1) "order_id" 2) "ORD002" ... (order details) 127.0.0.1:6379> XINFO GROUPS orders // Consumer group bilgilerini kontrol ediyoruz. 1) 1) "name" 2) "order_processors" 3) "consumers" 4) (integer) 2 // 2 aktif consumer. 5) "pending" 6) (integer) 1 // 1 pending message.
bash127.0.0.1:6379> XGROUP CREATE sensor_data data_processors $ MKSTREAM // Sensor data processing group. OK 127.0.0.1:6379> XADD sensor_data * sensor_id "TEMP001" location "warehouse_a" temperature "23.5" humidity "65.2" // Sıcaklık sensörü. "1642678500000-0" 127.0.0.1:6379> XADD sensor_data * sensor_id "PRES001" location "tank_1" pressure "145.7" unit "psi" // Basınç sensörü. "1642678501000-0" 127.0.0.1:6379> XADD sensor_data * sensor_id "TEMP001" location "warehouse_a" temperature "24.1" humidity "66.8" // Temp update. "1642678502000-0" # Real-time monitoring: 127.0.0.1:6379> XREAD BLOCK 1000 STREAMS sensor_data $ // 1 saniye boyunca yeni sensor data bekle. # Analytics service specific sensor'ları okur: 127.0.0.1:6379> XREADGROUP GROUP data_processors analytics_worker COUNT 2 STREAMS sensor_data > 1) 1) "sensor_data" 2) 1) 1) "1642678500000-0" 2) 1) "sensor_id" 2) "TEMP001" ... (sensor data) 2) 1) "1642678501000-0" 2) 1) "sensor_id" 2) "PRES001" ... (pressure data) # Time-range analytics: 127.0.0.1:6379> XRANGE sensor_data 1642678500000 1642678502000 // Son 2 saniyedeki tüm sensor readings. 1) 1) "1642678500000-0" ... (temp data 1) 2) 1) "1642678501000-0" ... (pressure data) 3) 1) "1642678502000-0" ... (temp data 2) 127.0.0.1:6379> XACK sensor_data data_processors 1642678500000-0 1642678501000-0 // Batch acknowledge. (integer) 2
markdown127.0.0.1:6379> XADD chat:room1 * user_id "alice" username "Alice" message "Hello everyone!" timestamp "2024-01-20T11:00:00Z" // Chat message. "1642680000000-0" 127.0.0.1:6379> XADD chat:room1 * user_id "bob" username "Bob" message "Hi Alice!" timestamp "2024-01-20T11:00:15Z" "1642680015000-0" 127.0.0.1:6379> XADD chat:room1 * user_id "charlie" username "Charlie" message "Good morning!" timestamp "2024-01-20T11:00:30Z" "1642680030000-0" # Real-time message listening: 127.0.0.1:6379> XREAD BLOCK 0 STREAMS chat:room1 1642680030000-0 // Son message'dan sonraki yeni message'ları bekle. # Message history for new user: 127.0.0.1:6379> XREVRANGE chat:room1 + - COUNT 10 // Son 10 message'ı getir (history). 1) 1) "1642680030000-0" 2) 1) "user_id" 2) "charlie" 3) "username" 4) "Charlie" 5) "message" 6) "Good morning!" 7) "timestamp" 8) "2024-01-20T11:00:30Z" 2) 1) "1642680015000-0" ... (Bob's message) 3) 1) "1642680000000-0" ... (Alice's message) # Message count: 127.0.0.1:6379> XLEN chat:room1 // Toplam message sayısı. (integer) 3 # Auto-cleanup (retain last 100 messages): 127.0.0.1:6379> XTRIM chat:room1 MAXLEN ~ 100 // Son 100 message'ı tut. (integer) 0 // Silinecek message yok (henüz 3 tane var).
bash127.0.0.1:6379> XGROUP CREATE transactions audit_processors $ MKSTREAM // Audit processing group. OK 127.0.0.1:6379> XADD transactions * transaction_id "TXN001" account_from "ACC123" account_to "ACC456" amount "1000.00" type "transfer" // Transfer transaction. "1642680100000-0" 127.0.0.1:6379> XADD transactions * transaction_id "TXN002" account "ACC789" amount "500.00" type "deposit" source "external" "1642680101000-0" 127.0.0.1:6379> XADD transactions * transaction_id "TXN003" account "ACC123" amount "150.00" type "withdrawal" location "ATM_001" "1642680102000-0" # Fraud detection service: 127.0.0.1:6379> XREADGROUP GROUP audit_processors fraud_detector COUNT 5 STREAMS transactions > 1) 1) "transactions" 2) 1) 1) "1642680100000-0" 2) 1) "transaction_id" 2) "TXN001" ... (transaction details) 2) 1) "1642680101000-0" ... (deposit transaction) 3) 1) "1642680102000-0" ... (withdrawal transaction) # Account-specific transaction history: 127.0.0.1:6379> XRANGE transactions - + // Tüm transactions (audit için). # Compliance reporting (daily transactions): 127.0.0.1:6379> XRANGE transactions 1642636800000 1642723199999 // Belirli gün aralığındaki transactions. 127.0.0.1:6379> XACK transactions audit_processors 1642680100000-0 1642680101000-0 1642680102000-0 // Audit complete. (integer) 3 127.0.0.1:6379> XINFO STREAM transactions // Transaction stream bilgileri. 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1642680102000-0" 9) "groups" 10) (integer) 1 // 1 consumer group.
XADD
, XREAD
, XRANGE
, XGROUP
, XREADGROUP
gibi temel komutları ve gerçek dünya senaryolarında Stream kullanımını detaylı örneklerle öğrendik.
Stream'in append-only structure, consumer groups, time-based queries ve message acknowledgment özellikleri özellikle:
1642678200000-0
formatındauser_id: "123", action: "login", time: "10:30"
<timestamp>-<sequence>
XACK
komutu ile "işledim" der