Kafka #
Apache Kafka adalah platform streaming terdistribusi yang dirancang untuk menangani jutaan event per detik dengan latensi rendah dan durabilitas tinggi. Berbeda dari message queue tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan semua pesan dalam log yang tidak berubah — consumer bisa membaca ulang pesan dari titik manapun. Ini membuatnya ideal untuk event sourcing, audit trail, real-time analytics, dan menghubungkan banyak microservice secara loose-coupled. Di Ruby, ada tiga pilihan utama: ruby-kafka (gem matang dari Zendesk), rdkafka-ruby (wrapper librdkafka berbasis C yang lebih performan), dan Karafka (framework lengkap untuk aplikasi Kafka di Ruby/Rails). Artikel ini membahas semua lapisan ini dari konsep dasar hingga pola produksi.
Konsep Dasar Kafka #
flowchart LR
P1[Producer\nAplikasi Ruby] --> T
P2[Producer\nAplikasi Rails] --> T
subgraph Kafka Cluster
T[Topic: pesanan]
T --> Par0["Partition 0\nmsg 0,3,6,9..."]
T --> Par1["Partition 1\nmsg 1,4,7,10..."]
T --> Par2["Partition 2\nmsg 2,5,8,11..."]
end
Par0 --> CG1[Consumer Group A\nLayanan Notifikasi]
Par1 --> CG1
Par2 --> CG1
Par0 --> CG2[Consumer Group B\nLayanan Analitik]
Par1 --> CG2
Par2 --> CG2Konsep utama Kafka:
Topic — kategori atau nama stream pesan (seperti "nama channel")
Partition — topic dibagi menjadi N partition untuk parallelism
Offset — posisi pesan dalam partition (0, 1, 2, ...)
Producer — aplikasi yang menulis pesan ke topic
Consumer — aplikasi yang membaca pesan dari topic
Consumer Group — sekumpulan consumer yang berbagi beban membaca partition
Broker — server Kafka (biasanya 3+ untuk HA)
Replication— setiap partition punya N replica di broker berbeda
Perbedaan kunci Kafka vs queue tradisional:
✓ Pesan tidak dihapus setelah dikonsumsi — tersimpan sesuai retention period
✓ Banyak consumer group bisa membaca topic yang sama secara independen
✓ Consumer bisa rewind/replay pesan dari offset manapun
✓ Throughput sangat tinggi — jutaan pesan per detik
✗ Lebih kompleks untuk di-setup dan di-operasikan
✗ Tidak mendukung routing kompleks seperti RabbitMQ exchange
Instalasi #
# ruby-kafka — gem Zendesk, lebih mudah digunakan
gem install ruby-kafka
# rdkafka-ruby — C extension berbasis librdkafka, lebih cepat
# Perlu install librdkafka terlebih dahulu
sudo apt install librdkafka-dev # Ubuntu
brew install librdkafka # macOS
gem install rdkafka
# Karafka — framework lengkap untuk Rails
gem install karafka
# Gemfile
gem 'ruby-kafka', '~> 1.5' # pilihan 1: ruby-kafka
gem 'rdkafka', '~> 0.14' # pilihan 2: rdkafka (lebih performan)
gem 'karafka', '~> 2.3' # pilihan 3: Karafka (framework)
gem 'avro_turf', '~> 1.7' # untuk Avro serialization (opsional)
gem 'waterdrop', '~> 2.6' # producer standalone dari tim Karafka
Producer — Mengirim Pesan #
Dengan ruby-kafka #
require 'kafka'
require 'json'
# Buat Kafka client
kafka = Kafka.new(
["localhost:9092"], # daftar broker
client_id: "toko-app",
logger: Logger.new($stdout),
ssl_ca_cert: ENV["KAFKA_CA_CERT"], # opsional, untuk SSL
sasl_plain_username: ENV["KAFKA_USER"], # opsional, untuk autentikasi
sasl_plain_password: ENV["KAFKA_PASSWORD"]
)
# Producer sederhana — satu pesan
kafka.deliver_message(
JSON.generate({
event: "pesanan_dibuat",
pesanan_id: 12345,
pengguna_id: 99,
total: 150_000,
timestamp: Time.now.iso8601
}),
topic: "pesanan",
key: "pesanan-12345" # key menentukan partition yang digunakan
)
# Producer dengan batching — lebih efisien untuk throughput tinggi
producer = kafka.producer(
required_acks: :all, # tunggu semua replica acknowledge
ack_timeout: 5, # timeout dalam detik
max_buffer_size: 1000, # maksimal pesan dalam buffer
max_buffer_bytesize: 1_000_000, # maksimal ukuran buffer dalam byte
compression_codec: :snappy # kompres dengan snappy (hemat bandwidth)
)
begin
# Tambahkan pesan ke buffer
100.times do |i|
producer.produce(
JSON.generate({ event: "event_#{i}", data: "payload" }),
topic: "events",
key: "key-#{i % 10}", # distribute ke 10 partisi
partition_key: "user-#{i % 5}" # atau gunakan partition_key
)
end
# Kirim semua pesan dalam buffer ke Kafka
producer.deliver_messages
ensure
producer.shutdown
end
Dengan rdkafka — Performa Lebih Tinggi #
require 'rdkafka'
require 'json'
config = Rdkafka::Config.new(
"bootstrap.servers" => "localhost:9092",
"client.id" => "toko-app",
"acks" => "all",
"enable.idempotence" => true, # idempotent producer
"compression.type" => "snappy",
"batch.size" => 65_536, # 64KB per batch
"linger.ms" => 5, # tunggu 5ms sebelum kirim
"message.max.bytes" => 1_048_576 # maksimal 1MB per pesan
)
producer = config.producer
# Kirim pesan secara asinkron
delivery_handle = producer.produce(
topic: "pesanan",
key: "pesanan-12345",
payload: JSON.generate({
event: "pesanan_dibuat",
pesanan_id: 12345,
total: 150_000,
timestamp: Time.now.to_i
})
)
# Wait untuk konfirmasi (opsional — bisa juga fire-and-forget)
delivery_report = delivery_handle.wait(max_wait_timeout: 10)
puts "Offset: #{delivery_report.offset}, Partition: #{delivery_report.partition}"
# Kirim banyak pesan
handles = []
1000.times do |i|
handles << producer.produce(
topic: "events",
key: "user-#{i % 100}",
payload: JSON.generate({ seq: i, ts: Time.now.to_i })
)
end
# Flush — tunggu semua terkirim
producer.flush(10_000)
producer.close
Consumer — Membaca Pesan #
Dengan ruby-kafka #
require 'kafka'
require 'json'
kafka = Kafka.new(["localhost:9092"], client_id: "layanan-notifikasi")
# Consumer group — load balancing otomatis antar instance consumer
consumer = kafka.consumer(
group_id: "layanan-notifikasi",
offset_commit_interval: 5, # commit offset setiap 5 detik
offset_commit_threshold: 100, # atau setiap 100 pesan
heartbeat_interval: 10, # heartbeat ke Kafka setiap 10 detik
session_timeout: 30 # timeout session jika tidak heartbeat
)
# Subscribe ke satu atau banyak topic
consumer.subscribe("pesanan", start_from_beginning: false)
consumer.subscribe("pembayaran")
# Trap signal untuk graceful shutdown
trap("SIGTERM") { consumer.stop }
trap("SIGINT") { consumer.stop }
# Loop membaca pesan
consumer.each_message do |message|
puts "Topic: #{message.topic}"
puts "Partition: #{message.partition}"
puts "Offset: #{message.offset}"
puts "Key: #{message.key}"
begin
data = JSON.parse(message.value, symbolize_names: true)
proses_event(data)
rescue JSON::ParserError => e
puts "JSON tidak valid: #{e.message}"
# Kirim ke dead letter queue atau log untuk analisis
rescue => e
puts "Error memproses pesan: #{e.message}"
# Jangan raise — akan menyebabkan consumer berhenti
# Kirim ke DLQ jika perlu
end
end
Dengan rdkafka — Manual Commit #
require 'rdkafka'
require 'json'
config = Rdkafka::Config.new(
"bootstrap.servers" => "localhost:9092",
"group.id" => "layanan-analitik",
"auto.offset.reset" => "earliest", # mulai dari awal jika group baru
"enable.auto.commit" => false, # manual commit untuk kontrol penuh
"max.poll.interval.ms" => 300_000, # maksimal 5 menit per poll
"session.timeout.ms" => 30_000 # 30 detik session timeout
)
consumer = config.consumer
consumer.subscribe("pesanan")
# Graceful shutdown
running = true
trap("SIGTERM") { running = false }
trap("SIGINT") { running = false }
begin
while running
# Poll dengan timeout 1 detik
message = consumer.poll(1000)
next unless message
begin
data = JSON.parse(message.payload, symbolize_names: true)
proses_event(data)
# Commit offset SETELAH berhasil diproses
# Ini memastikan pesan tidak hilang jika terjadi crash sebelum diproses
consumer.commit
rescue => e
puts "Error: #{e.message}"
# Jangan commit — pesan akan dibaca ulang saat consumer restart
kirim_ke_dlq(message) if percobaan_ke?(message) >= 3
end
end
ensure
consumer.close
end
Error Handling dan Dead Letter Queue #
Pesan yang gagal diproses berulang kali perlu dikirim ke Dead Letter Queue (DLQ) agar tidak memblokir consumer:
require 'kafka'
require 'json'
class KafkaConsumerDenganDLQ
MAX_RETRY = 3
DLQ_TOPIC = "pesanan.dlq"
def initialize(kafka)
@kafka = kafka
@consumer = kafka.consumer(group_id: "layanan-pesanan")
@producer = kafka.producer
@retry_counts = {}
end
def jalankan
@consumer.subscribe("pesanan")
@consumer.each_message do |message|
proses_dengan_retry(message)
end
ensure
@producer.shutdown
end
private
def proses_dengan_retry(message)
kunci = "#{message.topic}-#{message.partition}-#{message.offset}"
@retry_counts[kunci] ||= 0
begin
data = JSON.parse(message.value, symbolize_names: true)
proses_pesanan(data)
@retry_counts.delete(kunci)
rescue => e
@retry_counts[kunci] += 1
if @retry_counts[kunci] >= MAX_RETRY
kirim_ke_dlq(message, e)
@retry_counts.delete(kunci)
else
puts "Percobaan #{@retry_counts[kunci]}/#{MAX_RETRY}: #{e.message}"
sleep(2 ** @retry_counts[kunci]) # exponential backoff: 2s, 4s, 8s
retry
end
end
end
def kirim_ke_dlq(pesan_asli, error)
@producer.produce(
JSON.generate({
original_topic: pesan_asli.topic,
original_partition: pesan_asli.partition,
original_offset: pesan_asli.offset,
original_key: pesan_asli.key,
original_payload: pesan_asli.value,
error_class: error.class.name,
error_message: error.message,
failed_at: Time.now.iso8601
}),
topic: DLQ_TOPIC,
key: pesan_asli.key
)
@producer.deliver_messages
puts "Pesan dikirim ke DLQ: #{DLQ_TOPIC}"
end
end
Karafka — Framework Kafka untuk Rails #
Karafka adalah framework yang menyederhanakan penggunaan Kafka di aplikasi Ruby/Rails dengan DSL yang ekspresif:
# Inisialisasi Karafka di proyek Rails
bundle exec karafka install
# karafka.rb — konfigurasi utama
class KarafkaApp < Karafka::App
setup do |config|
config.kafka = {
"bootstrap.servers" => ENV.fetch("KAFKA_BROKERS", "localhost:9092"),
"client.id" => "toko-app",
"group.id" => "toko-consumers",
"enable.idempotence" => true,
"compression.type" => "snappy",
"session.timeout.ms" => 30_000,
"max.poll.interval.ms" => 300_000,
"enable.auto.commit" => false # Karafka manage commit sendiri
}
config.consumer_persistence = !Rails.env.test?
config.logger = Rails.logger
end
# Routing — mendefinisikan consumer untuk setiap topic
routes.draw do
topic :pesanan do
consumer PesananConsumer
end
topic :pembayaran do
consumer PembayaranConsumer
max_messages 100 # proses maksimal 100 pesan per batch
max_wait_time 5_000 # tunggu maksimal 5 detik untuk batch
end
topic :notifikasi do
consumer NotifikasiConsumer
manual_offset_management true # commit manual
end
# Consumer group berbeda untuk topic yang sama
consumer_group :analitik do
topic :pesanan do
consumer PesananAnalitikConsumer
end
end
end
end
# app/consumers/pesanan_consumer.rb
class PesananConsumer < ApplicationConsumer
# messages — array of Karafka::Messages::Message
def consume
messages.each do |message|
data = message.payload # Karafka otomatis parse JSON
Rails.logger.info "Proses pesanan: #{data['pesanan_id']}"
Pesanan.transaction do
pesanan = Pesanan.find_or_create_by!(id: data['pesanan_id'])
pesanan.update!(
status: data['status'],
total: data['total'],
diproses_pada: Time.now
)
# Kirim notifikasi
NotifikasiMailer.pesanan_dikonfirmasi(pesanan).deliver_later
end
end
# Karafka otomatis commit setelah consume sukses
end
# Dipanggil jika ada exception di consume
def handle_exception(exception)
Rails.logger.error "Error consumer pesanan: #{exception.message}"
# Karafka akan retry otomatis sesuai konfigurasi
end
end
# app/consumers/pembayaran_consumer.rb
class PembayaranConsumer < ApplicationConsumer
def consume
# Batch processing — semua pesan diproses sekaligus
pembayaran_ids = messages.payloads.map { |p| p['pembayaran_id'] }
Rails.logger.info "Memproses #{messages.count} pembayaran: #{pembayaran_ids}"
Pembayaran.where(id: pembayaran_ids).find_each do |pembayaran|
pembayaran.konfirmasi!
end
end
end
Producer dengan WaterDrop (Karafka Producer) #
# config/initializers/waterdrop.rb
WaterDrop.setup do |config|
config.logger = Rails.logger
config.deliver = !Rails.env.test?
config.kafka = {
"bootstrap.servers": ENV.fetch("KAFKA_BROKERS", "localhost:9092"),
"acks": "all",
"enable.idempotence": true,
"compression.type": "snappy"
}
end
# Kirim pesan dari mana saja di Rails
class PesananService
def buat_pesanan(pengguna, item)
pesanan = Pesanan.create!(pengguna: pengguna, item: item)
# Kirim event ke Kafka
WaterDrop::Producer.new.call do |producer|
producer.produce_sync(
topic: "pesanan",
key: pesanan.id.to_s,
payload: {
event: "pesanan_dibuat",
pesanan_id: pesanan.id,
pengguna_id: pengguna.id,
total: pesanan.total,
timestamp: Time.now.iso8601
}.to_json
)
end
pesanan
end
end
# Atau menggunakan producer yang dibuat sekali (lebih efisien)
KAFKA_PRODUCER = WaterDrop::Producer.new
KAFKA_PRODUCER.setup do |config|
config.kafka = { "bootstrap.servers": "localhost:9092" }
end
# Singleton producer — reuse di seluruh aplikasi
KAFKA_PRODUCER.produce_async(
topic: "events",
payload: { event: "user_login", user_id: 1 }.to_json
)
# Batch produce — lebih efisien
messages = 1000.times.map do |i|
{ topic: "events", payload: { seq: i }.to_json }
end
KAFKA_PRODUCER.produce_many_async(messages)
Serialisasi dengan Avro #
Avro adalah format serialisasi binary yang efisien dan mendukung schema evolution — nilai yang sangat penting ketika producer dan consumer diupdate secara independen:
require 'avro_turf'
require 'avro_turf/messaging'
# Schema Registry — server yang menyimpan schema Avro
avro = AvroTurf::Messaging.new(
registry_url: ENV.fetch("SCHEMA_REGISTRY_URL", "http://localhost:8081")
)
# Schema Avro (biasanya didefinisikan di Schema Registry)
# {
# "type": "record",
# "name": "Pesanan",
# "fields": [
# {"name": "pesanan_id", "type": "int"},
# {"name": "total", "type": "double"},
# {"name": "status", "type": "string"}
# ]
# }
# Encode pesan dengan Avro
payload_avro = avro.encode(
{ "pesanan_id" => 12345, "total" => 150_000.0, "status" => "pending" },
subject: "pesanan-value",
version: :latest
)
# Kirim payload yang sudah di-encode
producer.produce(topic: "pesanan", payload: payload_avro, key: "12345")
# Decode di sisi consumer
message = consumer.poll(1000)
data = avro.decode(message.payload)
puts data["pesanan_id"] # => 12345
Idempotent Producer dan Exactly-Once Semantics #
# Idempotent producer — Kafka menjamin tidak ada duplikat meski producer retry
config = Rdkafka::Config.new(
"bootstrap.servers" => "localhost:9092",
"enable.idempotence" => true, # aktifkan idempotence
# enable.idempotence otomatis set:
# acks = all
# max.in.flight.requests.per.connection = 5
# retries = INT_MAX
)
# Exactly-once semantics — baca dari Kafka, proses, tulis ke Kafka
# hanya sekali (tidak duplikat, tidak hilang)
config_eos = Rdkafka::Config.new(
"bootstrap.servers" => "localhost:9092",
"enable.idempotence" => true,
"transactional.id" => "toko-processor-1", # ID unik per instance
"transaction.timeout.ms" => 60_000
)
producer_eos = config_eos.producer
producer_eos.init_transactions
consumer_input = config.consumer
consumer_input.subscribe("pesanan_mentah")
loop do
message = consumer_input.poll(1000)
next unless message
begin
producer_eos.begin_transaction
# Proses dan transform pesan
data_diproses = transformasi(JSON.parse(message.payload))
# Tulis hasil ke topic lain dalam transaksi yang sama
producer_eos.produce(
topic: "pesanan_diproses",
payload: data_diproses.to_json,
key: message.key
)
# Commit offset consumer sebagai bagian dari transaksi
producer_eos.send_offsets_to_transaction(
consumer_input,
"processor-group"
)
producer_eos.commit_transaction
rescue => e
producer_eos.abort_transaction
puts "Transaksi dibatalkan: #{e.message}"
end
end
Monitoring dan Observabilitas #
# Statistik dari rdkafka
config = Rdkafka::Config.new(
"bootstrap.servers" => "localhost:9092",
"statistics.interval.ms" => 5_000 # laporan setiap 5 detik
)
config.statistics_callback = proc do |stats|
stats_hash = JSON.parse(stats)
# Log lag consumer — seberapa jauh consumer tertinggal
stats_hash["topics"]&.each do |topic, topic_data|
topic_data["partitions"]&.each do |partition, part_data|
lag = part_data["consumer_lag"]
next if lag < 0 # -1 berarti belum diketahui
puts "Lag #{topic}[#{partition}]: #{lag} pesan"
# Alert jika lag terlalu besar
if lag > 10_000
Monitoring.alert("Kafka consumer lag tinggi: #{topic}[#{partition}] = #{lag}")
end
end
end
end
# Metric yang penting untuk dipantau:
# consumer_lag — jumlah pesan yang belum diproses
# messages_per_sec — throughput producer
# bytes_per_sec — bandwidth
# error_count — jumlah error
Perbandingan Kafka vs RabbitMQ vs Amazon SQS #
| Aspek | Kafka | RabbitMQ | Amazon SQS |
|---|---|---|---|
| Model | Log/Stream | Queue/Exchange | Queue |
| Retensi pesan | Berdasarkan waktu/ukuran | Dihapus setelah dikonsumsi | Max 14 hari |
| Replay pesan | Ya — bisa rewind ke offset manapun | Tidak | Tidak |
| Throughput | Sangat tinggi (jutaan/detik) | Tinggi (ratusan ribu/detik) | Tinggi (managed) |
| Ordering | Per partition | Per queue | Best-effort (FIFO: per group) |
| Routing | Per topic | Flexible exchange | Simple |
| Kompleksitas | Tinggi | Sedang | Rendah (managed) |
| Setup | Self-managed / Confluent | Self-managed / CloudAMQP | Fully managed AWS |
| Cocok untuk | Event streaming, audit log, analytics | Task queue, RPC, routing kompleks | Simple queue AWS |
Pilih Kafka jika:
✓ Throughput sangat tinggi (> 100K pesan/detik)
✓ Perlu replay dan audit log
✓ Banyak consumer group baca topic yang sama
✓ Event sourcing dan CQRS
✓ Real-time stream processing
Pilih RabbitMQ jika:
✓ Routing pesan yang kompleks (exchange, binding)
✓ Request-reply pattern (RPC)
✓ Prioritas pesan
✓ Tim sudah familiar dengan AMQP
Pilih Amazon SQS jika:
✓ Sudah di AWS dan ingin managed service
✓ Kesederhanaan lebih penting dari fitur
✓ Tidak mau manage infra sendiri
Ringkasan #
- Kafka menyimpan semua pesan — tidak seperti queue tradisional yang menghapus pesan setelah dikonsumsi; konsep ini memungkinkan replay, audit, dan multiple consumer group membaca data yang sama secara independen.
- Key menentukan partition — semua pesan dengan key yang sama masuk ke partition yang sama, menjamin urutan per key; distribusikan key secara merata untuk menghindari hot partition.
- Manual commit untuk kontrol penuh —
enable.auto.commit = falsedan commit setelah pesan berhasil diproses; ini mencegah kehilangan pesan jika consumer crash sebelum selesai memproses.- Idempotent producer wajib — aktifkan
enable.idempotence: trueuntuk mencegah duplikat pesan saat retry; ini tidak berpengaruh pada performa tapi sangat penting untuk correctness.- Dead Letter Queue untuk pesan gagal — jangan biarkan pesan yang gagal memblokir seluruh consumer; setelah N percobaan, kirim ke DLQ untuk analisis dan reprocessing manual.
- Exponential backoff untuk retry — jangan retry langsung; tunggu 2s, 4s, 8s dst untuk menghindari membebani sistem yang sedang bermasalah.
- Karafka untuk proyek Rails — framework ini menangani lifecycle management, error handling, dan routing secara otomatis; gunakan
ruby-kafkaataurdkafkahanya untuk kebutuhan yang sangat spesifik.- Monitor consumer lag —
consumer_lagadalah metric paling penting; lag tinggi berarti consumer tidak bisa mengikuti producer dan perlu di-scale atau dioptimasi.- Avro untuk schema evolution — saat producer dan consumer diupdate secara independen, Avro dengan Schema Registry memastikan kompatibilitas backward/forward.
- Exactly-once memerlukan transaksi — kombinasi idempotent producer +
transactional.id+send_offsets_to_transactionmemberikan jaminan exactly-once end-to-end.