Kafka

Kafka #

Apache Kafka adalah platform streaming terdistribusi yang dirancang untuk menangani jutaan event per detik dengan latensi rendah dan durabilitas tinggi. Berbeda dari message queue tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan semua pesan dalam log yang tidak berubah — consumer bisa membaca ulang pesan dari titik manapun. Ini membuatnya ideal untuk event sourcing, audit trail, real-time analytics, dan menghubungkan banyak microservice secara loose-coupled. Di Ruby, ada tiga pilihan utama: ruby-kafka (gem matang dari Zendesk), rdkafka-ruby (wrapper librdkafka berbasis C yang lebih performan), dan Karafka (framework lengkap untuk aplikasi Kafka di Ruby/Rails). Artikel ini membahas semua lapisan ini dari konsep dasar hingga pola produksi.

Konsep Dasar Kafka #

flowchart LR
    P1[Producer\nAplikasi Ruby] --> T
    P2[Producer\nAplikasi Rails] --> T

    subgraph Kafka Cluster
        T[Topic: pesanan]
        T --> Par0["Partition 0\nmsg 0,3,6,9..."]
        T --> Par1["Partition 1\nmsg 1,4,7,10..."]
        T --> Par2["Partition 2\nmsg 2,5,8,11..."]
    end

    Par0 --> CG1[Consumer Group A\nLayanan Notifikasi]
    Par1 --> CG1
    Par2 --> CG1

    Par0 --> CG2[Consumer Group B\nLayanan Analitik]
    Par1 --> CG2
    Par2 --> CG2
Konsep utama Kafka:
  Topic      — kategori atau nama stream pesan (seperti "nama channel")
  Partition  — topic dibagi menjadi N partition untuk parallelism
  Offset     — posisi pesan dalam partition (0, 1, 2, ...)
  Producer   — aplikasi yang menulis pesan ke topic
  Consumer   — aplikasi yang membaca pesan dari topic
  Consumer Group — sekumpulan consumer yang berbagi beban membaca partition
  Broker     — server Kafka (biasanya 3+ untuk HA)
  Replication— setiap partition punya N replica di broker berbeda

Perbedaan kunci Kafka vs queue tradisional:
  ✓ Pesan tidak dihapus setelah dikonsumsi — tersimpan sesuai retention period
  ✓ Banyak consumer group bisa membaca topic yang sama secara independen
  ✓ Consumer bisa rewind/replay pesan dari offset manapun
  ✓ Throughput sangat tinggi — jutaan pesan per detik
  ✗ Lebih kompleks untuk di-setup dan di-operasikan
  ✗ Tidak mendukung routing kompleks seperti RabbitMQ exchange

Instalasi #

# ruby-kafka — gem Zendesk, lebih mudah digunakan
gem install ruby-kafka

# rdkafka-ruby — C extension berbasis librdkafka, lebih cepat
# Perlu install librdkafka terlebih dahulu
sudo apt install librdkafka-dev   # Ubuntu
brew install librdkafka           # macOS
gem install rdkafka

# Karafka — framework lengkap untuk Rails
gem install karafka
# Gemfile
gem 'ruby-kafka',  '~> 1.5'    # pilihan 1: ruby-kafka
gem 'rdkafka',     '~> 0.14'   # pilihan 2: rdkafka (lebih performan)
gem 'karafka',     '~> 2.3'    # pilihan 3: Karafka (framework)
gem 'avro_turf',   '~> 1.7'    # untuk Avro serialization (opsional)
gem 'waterdrop',   '~> 2.6'    # producer standalone dari tim Karafka

Producer — Mengirim Pesan #

Dengan ruby-kafka #

require 'kafka'
require 'json'

# Buat Kafka client
kafka = Kafka.new(
  ["localhost:9092"],           # daftar broker
  client_id:          "toko-app",
  logger:             Logger.new($stdout),
  ssl_ca_cert:        ENV["KAFKA_CA_CERT"],  # opsional, untuk SSL
  sasl_plain_username: ENV["KAFKA_USER"],    # opsional, untuk autentikasi
  sasl_plain_password: ENV["KAFKA_PASSWORD"]
)

# Producer sederhana — satu pesan
kafka.deliver_message(
  JSON.generate({
    event:     "pesanan_dibuat",
    pesanan_id: 12345,
    pengguna_id: 99,
    total:      150_000,
    timestamp:  Time.now.iso8601
  }),
  topic: "pesanan",
  key:   "pesanan-12345"   # key menentukan partition yang digunakan
)

# Producer dengan batching — lebih efisien untuk throughput tinggi
producer = kafka.producer(
  required_acks:      :all,        # tunggu semua replica acknowledge
  ack_timeout:        5,           # timeout dalam detik
  max_buffer_size:    1000,        # maksimal pesan dalam buffer
  max_buffer_bytesize: 1_000_000,  # maksimal ukuran buffer dalam byte
  compression_codec:  :snappy      # kompres dengan snappy (hemat bandwidth)
)

begin
  # Tambahkan pesan ke buffer
  100.times do |i|
    producer.produce(
      JSON.generate({ event: "event_#{i}", data: "payload" }),
      topic: "events",
      key:   "key-#{i % 10}",          # distribute ke 10 partisi
      partition_key: "user-#{i % 5}"   # atau gunakan partition_key
    )
  end

  # Kirim semua pesan dalam buffer ke Kafka
  producer.deliver_messages
ensure
  producer.shutdown
end

Dengan rdkafka — Performa Lebih Tinggi #

require 'rdkafka'
require 'json'

config = Rdkafka::Config.new(
  "bootstrap.servers"        => "localhost:9092",
  "client.id"                => "toko-app",
  "acks"                     => "all",
  "enable.idempotence"       => true,        # idempotent producer
  "compression.type"         => "snappy",
  "batch.size"               => 65_536,      # 64KB per batch
  "linger.ms"                => 5,           # tunggu 5ms sebelum kirim
  "message.max.bytes"        => 1_048_576    # maksimal 1MB per pesan
)

producer = config.producer

# Kirim pesan secara asinkron
delivery_handle = producer.produce(
  topic:   "pesanan",
  key:     "pesanan-12345",
  payload: JSON.generate({
    event:      "pesanan_dibuat",
    pesanan_id: 12345,
    total:      150_000,
    timestamp:  Time.now.to_i
  })
)

# Wait untuk konfirmasi (opsional — bisa juga fire-and-forget)
delivery_report = delivery_handle.wait(max_wait_timeout: 10)
puts "Offset: #{delivery_report.offset}, Partition: #{delivery_report.partition}"

# Kirim banyak pesan
handles = []
1000.times do |i|
  handles << producer.produce(
    topic:   "events",
    key:     "user-#{i % 100}",
    payload: JSON.generate({ seq: i, ts: Time.now.to_i })
  )
end

# Flush — tunggu semua terkirim
producer.flush(10_000)
producer.close

Consumer — Membaca Pesan #

Dengan ruby-kafka #

require 'kafka'
require 'json'

kafka = Kafka.new(["localhost:9092"], client_id: "layanan-notifikasi")

# Consumer group — load balancing otomatis antar instance consumer
consumer = kafka.consumer(
  group_id:          "layanan-notifikasi",
  offset_commit_interval: 5,    # commit offset setiap 5 detik
  offset_commit_threshold: 100, # atau setiap 100 pesan
  heartbeat_interval: 10,       # heartbeat ke Kafka setiap 10 detik
  session_timeout:    30        # timeout session jika tidak heartbeat
)

# Subscribe ke satu atau banyak topic
consumer.subscribe("pesanan", start_from_beginning: false)
consumer.subscribe("pembayaran")

# Trap signal untuk graceful shutdown
trap("SIGTERM") { consumer.stop }
trap("SIGINT")  { consumer.stop }

# Loop membaca pesan
consumer.each_message do |message|
  puts "Topic: #{message.topic}"
  puts "Partition: #{message.partition}"
  puts "Offset: #{message.offset}"
  puts "Key: #{message.key}"

  begin
    data = JSON.parse(message.value, symbolize_names: true)
    proses_event(data)
  rescue JSON::ParserError => e
    puts "JSON tidak valid: #{e.message}"
    # Kirim ke dead letter queue atau log untuk analisis
  rescue => e
    puts "Error memproses pesan: #{e.message}"
    # Jangan raise — akan menyebabkan consumer berhenti
    # Kirim ke DLQ jika perlu
  end
end

Dengan rdkafka — Manual Commit #

require 'rdkafka'
require 'json'

config = Rdkafka::Config.new(
  "bootstrap.servers"  => "localhost:9092",
  "group.id"           => "layanan-analitik",
  "auto.offset.reset"  => "earliest",      # mulai dari awal jika group baru
  "enable.auto.commit" => false,           # manual commit untuk kontrol penuh
  "max.poll.interval.ms" => 300_000,       # maksimal 5 menit per poll
  "session.timeout.ms"   => 30_000         # 30 detik session timeout
)

consumer = config.consumer
consumer.subscribe("pesanan")

# Graceful shutdown
running = true
trap("SIGTERM") { running = false }
trap("SIGINT")  { running = false }

begin
  while running
    # Poll dengan timeout 1 detik
    message = consumer.poll(1000)
    next unless message

    begin
      data = JSON.parse(message.payload, symbolize_names: true)
      proses_event(data)

      # Commit offset SETELAH berhasil diproses
      # Ini memastikan pesan tidak hilang jika terjadi crash sebelum diproses
      consumer.commit
    rescue => e
      puts "Error: #{e.message}"
      # Jangan commit — pesan akan dibaca ulang saat consumer restart
      kirim_ke_dlq(message) if percobaan_ke?(message) >= 3
    end
  end
ensure
  consumer.close
end

Error Handling dan Dead Letter Queue #

Pesan yang gagal diproses berulang kali perlu dikirim ke Dead Letter Queue (DLQ) agar tidak memblokir consumer:

require 'kafka'
require 'json'

class KafkaConsumerDenganDLQ
  MAX_RETRY = 3
  DLQ_TOPIC = "pesanan.dlq"

  def initialize(kafka)
    @kafka    = kafka
    @consumer = kafka.consumer(group_id: "layanan-pesanan")
    @producer = kafka.producer
    @retry_counts = {}
  end

  def jalankan
    @consumer.subscribe("pesanan")

    @consumer.each_message do |message|
      proses_dengan_retry(message)
    end
  ensure
    @producer.shutdown
  end

  private

  def proses_dengan_retry(message)
    kunci = "#{message.topic}-#{message.partition}-#{message.offset}"
    @retry_counts[kunci] ||= 0

    begin
      data = JSON.parse(message.value, symbolize_names: true)
      proses_pesanan(data)
      @retry_counts.delete(kunci)

    rescue => e
      @retry_counts[kunci] += 1

      if @retry_counts[kunci] >= MAX_RETRY
        kirim_ke_dlq(message, e)
        @retry_counts.delete(kunci)
      else
        puts "Percobaan #{@retry_counts[kunci]}/#{MAX_RETRY}: #{e.message}"
        sleep(2 ** @retry_counts[kunci])  # exponential backoff: 2s, 4s, 8s
        retry
      end
    end
  end

  def kirim_ke_dlq(pesan_asli, error)
    @producer.produce(
      JSON.generate({
        original_topic:     pesan_asli.topic,
        original_partition: pesan_asli.partition,
        original_offset:    pesan_asli.offset,
        original_key:       pesan_asli.key,
        original_payload:   pesan_asli.value,
        error_class:        error.class.name,
        error_message:      error.message,
        failed_at:          Time.now.iso8601
      }),
      topic: DLQ_TOPIC,
      key:   pesan_asli.key
    )
    @producer.deliver_messages
    puts "Pesan dikirim ke DLQ: #{DLQ_TOPIC}"
  end
end

Karafka — Framework Kafka untuk Rails #

Karafka adalah framework yang menyederhanakan penggunaan Kafka di aplikasi Ruby/Rails dengan DSL yang ekspresif:

# Inisialisasi Karafka di proyek Rails
bundle exec karafka install
# karafka.rb — konfigurasi utama
class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka = {
      "bootstrap.servers"       => ENV.fetch("KAFKA_BROKERS", "localhost:9092"),
      "client.id"               => "toko-app",
      "group.id"                => "toko-consumers",
      "enable.idempotence"      => true,
      "compression.type"        => "snappy",
      "session.timeout.ms"      => 30_000,
      "max.poll.interval.ms"    => 300_000,
      "enable.auto.commit"      => false   # Karafka manage commit sendiri
    }

    config.consumer_persistence = !Rails.env.test?
    config.logger = Rails.logger
  end

  # Routing — mendefinisikan consumer untuk setiap topic
  routes.draw do
    topic :pesanan do
      consumer PesananConsumer
    end

    topic :pembayaran do
      consumer PembayaranConsumer
      max_messages  100          # proses maksimal 100 pesan per batch
      max_wait_time 5_000        # tunggu maksimal 5 detik untuk batch
    end

    topic :notifikasi do
      consumer NotifikasiConsumer
      manual_offset_management true  # commit manual
    end

    # Consumer group berbeda untuk topic yang sama
    consumer_group :analitik do
      topic :pesanan do
        consumer PesananAnalitikConsumer
      end
    end
  end
end
# app/consumers/pesanan_consumer.rb
class PesananConsumer < ApplicationConsumer
  # messages — array of Karafka::Messages::Message
  def consume
    messages.each do |message|
      data = message.payload   # Karafka otomatis parse JSON

      Rails.logger.info "Proses pesanan: #{data['pesanan_id']}"

      Pesanan.transaction do
        pesanan = Pesanan.find_or_create_by!(id: data['pesanan_id'])
        pesanan.update!(
          status:     data['status'],
          total:      data['total'],
          diproses_pada: Time.now
        )

        # Kirim notifikasi
        NotifikasiMailer.pesanan_dikonfirmasi(pesanan).deliver_later
      end
    end

    # Karafka otomatis commit setelah consume sukses
  end

  # Dipanggil jika ada exception di consume
  def handle_exception(exception)
    Rails.logger.error "Error consumer pesanan: #{exception.message}"
    # Karafka akan retry otomatis sesuai konfigurasi
  end
end

# app/consumers/pembayaran_consumer.rb
class PembayaranConsumer < ApplicationConsumer
  def consume
    # Batch processing — semua pesan diproses sekaligus
    pembayaran_ids = messages.payloads.map { |p| p['pembayaran_id'] }
    Rails.logger.info "Memproses #{messages.count} pembayaran: #{pembayaran_ids}"

    Pembayaran.where(id: pembayaran_ids).find_each do |pembayaran|
      pembayaran.konfirmasi!
    end
  end
end

Producer dengan WaterDrop (Karafka Producer) #

# config/initializers/waterdrop.rb
WaterDrop.setup do |config|
  config.logger = Rails.logger
  config.deliver = !Rails.env.test?

  config.kafka = {
    "bootstrap.servers": ENV.fetch("KAFKA_BROKERS", "localhost:9092"),
    "acks":              "all",
    "enable.idempotence": true,
    "compression.type":  "snappy"
  }
end

# Kirim pesan dari mana saja di Rails
class PesananService
  def buat_pesanan(pengguna, item)
    pesanan = Pesanan.create!(pengguna: pengguna, item: item)

    # Kirim event ke Kafka
    WaterDrop::Producer.new.call do |producer|
      producer.produce_sync(
        topic:   "pesanan",
        key:     pesanan.id.to_s,
        payload: {
          event:      "pesanan_dibuat",
          pesanan_id: pesanan.id,
          pengguna_id: pengguna.id,
          total:      pesanan.total,
          timestamp:  Time.now.iso8601
        }.to_json
      )
    end

    pesanan
  end
end

# Atau menggunakan producer yang dibuat sekali (lebih efisien)
KAFKA_PRODUCER = WaterDrop::Producer.new
KAFKA_PRODUCER.setup do |config|
  config.kafka = { "bootstrap.servers": "localhost:9092" }
end

# Singleton producer — reuse di seluruh aplikasi
KAFKA_PRODUCER.produce_async(
  topic:   "events",
  payload: { event: "user_login", user_id: 1 }.to_json
)

# Batch produce — lebih efisien
messages = 1000.times.map do |i|
  { topic: "events", payload: { seq: i }.to_json }
end
KAFKA_PRODUCER.produce_many_async(messages)

Serialisasi dengan Avro #

Avro adalah format serialisasi binary yang efisien dan mendukung schema evolution — nilai yang sangat penting ketika producer dan consumer diupdate secara independen:

require 'avro_turf'
require 'avro_turf/messaging'

# Schema Registry — server yang menyimpan schema Avro
avro = AvroTurf::Messaging.new(
  registry_url: ENV.fetch("SCHEMA_REGISTRY_URL", "http://localhost:8081")
)

# Schema Avro (biasanya didefinisikan di Schema Registry)
# {
#   "type": "record",
#   "name": "Pesanan",
#   "fields": [
#     {"name": "pesanan_id", "type": "int"},
#     {"name": "total", "type": "double"},
#     {"name": "status", "type": "string"}
#   ]
# }

# Encode pesan dengan Avro
payload_avro = avro.encode(
  { "pesanan_id" => 12345, "total" => 150_000.0, "status" => "pending" },
  subject: "pesanan-value",
  version: :latest
)

# Kirim payload yang sudah di-encode
producer.produce(topic: "pesanan", payload: payload_avro, key: "12345")

# Decode di sisi consumer
message = consumer.poll(1000)
data = avro.decode(message.payload)
puts data["pesanan_id"]   # => 12345

Idempotent Producer dan Exactly-Once Semantics #

# Idempotent producer — Kafka menjamin tidak ada duplikat meski producer retry
config = Rdkafka::Config.new(
  "bootstrap.servers" => "localhost:9092",
  "enable.idempotence" => true,   # aktifkan idempotence
  # enable.idempotence otomatis set:
  # acks = all
  # max.in.flight.requests.per.connection = 5
  # retries = INT_MAX
)

# Exactly-once semantics — baca dari Kafka, proses, tulis ke Kafka
# hanya sekali (tidak duplikat, tidak hilang)
config_eos = Rdkafka::Config.new(
  "bootstrap.servers"            => "localhost:9092",
  "enable.idempotence"           => true,
  "transactional.id"             => "toko-processor-1",  # ID unik per instance
  "transaction.timeout.ms"       => 60_000
)

producer_eos = config_eos.producer
producer_eos.init_transactions

consumer_input  = config.consumer
consumer_input.subscribe("pesanan_mentah")

loop do
  message = consumer_input.poll(1000)
  next unless message

  begin
    producer_eos.begin_transaction

    # Proses dan transform pesan
    data_diproses = transformasi(JSON.parse(message.payload))

    # Tulis hasil ke topic lain dalam transaksi yang sama
    producer_eos.produce(
      topic:   "pesanan_diproses",
      payload: data_diproses.to_json,
      key:     message.key
    )

    # Commit offset consumer sebagai bagian dari transaksi
    producer_eos.send_offsets_to_transaction(
      consumer_input,
      "processor-group"
    )

    producer_eos.commit_transaction

  rescue => e
    producer_eos.abort_transaction
    puts "Transaksi dibatalkan: #{e.message}"
  end
end

Monitoring dan Observabilitas #

# Statistik dari rdkafka
config = Rdkafka::Config.new(
  "bootstrap.servers"  => "localhost:9092",
  "statistics.interval.ms" => 5_000  # laporan setiap 5 detik
)

config.statistics_callback = proc do |stats|
  stats_hash = JSON.parse(stats)

  # Log lag consumer — seberapa jauh consumer tertinggal
  stats_hash["topics"]&.each do |topic, topic_data|
    topic_data["partitions"]&.each do |partition, part_data|
      lag = part_data["consumer_lag"]
      next if lag < 0   # -1 berarti belum diketahui

      puts "Lag #{topic}[#{partition}]: #{lag} pesan"

      # Alert jika lag terlalu besar
      if lag > 10_000
        Monitoring.alert("Kafka consumer lag tinggi: #{topic}[#{partition}] = #{lag}")
      end
    end
  end
end

# Metric yang penting untuk dipantau:
# consumer_lag      — jumlah pesan yang belum diproses
# messages_per_sec  — throughput producer
# bytes_per_sec     — bandwidth
# error_count       — jumlah error

Perbandingan Kafka vs RabbitMQ vs Amazon SQS #

AspekKafkaRabbitMQAmazon SQS
ModelLog/StreamQueue/ExchangeQueue
Retensi pesanBerdasarkan waktu/ukuranDihapus setelah dikonsumsiMax 14 hari
Replay pesanYa — bisa rewind ke offset manapunTidakTidak
ThroughputSangat tinggi (jutaan/detik)Tinggi (ratusan ribu/detik)Tinggi (managed)
OrderingPer partitionPer queueBest-effort (FIFO: per group)
RoutingPer topicFlexible exchangeSimple
KompleksitasTinggiSedangRendah (managed)
SetupSelf-managed / ConfluentSelf-managed / CloudAMQPFully managed AWS
Cocok untukEvent streaming, audit log, analyticsTask queue, RPC, routing kompleksSimple queue AWS
Pilih Kafka jika:
  ✓ Throughput sangat tinggi (> 100K pesan/detik)
  ✓ Perlu replay dan audit log
  ✓ Banyak consumer group baca topic yang sama
  ✓ Event sourcing dan CQRS
  ✓ Real-time stream processing

Pilih RabbitMQ jika:
  ✓ Routing pesan yang kompleks (exchange, binding)
  ✓ Request-reply pattern (RPC)
  ✓ Prioritas pesan
  ✓ Tim sudah familiar dengan AMQP

Pilih Amazon SQS jika:
  ✓ Sudah di AWS dan ingin managed service
  ✓ Kesederhanaan lebih penting dari fitur
  ✓ Tidak mau manage infra sendiri

Ringkasan #

  • Kafka menyimpan semua pesan — tidak seperti queue tradisional yang menghapus pesan setelah dikonsumsi; konsep ini memungkinkan replay, audit, dan multiple consumer group membaca data yang sama secara independen.
  • Key menentukan partition — semua pesan dengan key yang sama masuk ke partition yang sama, menjamin urutan per key; distribusikan key secara merata untuk menghindari hot partition.
  • Manual commit untuk kontrol penuhenable.auto.commit = false dan commit setelah pesan berhasil diproses; ini mencegah kehilangan pesan jika consumer crash sebelum selesai memproses.
  • Idempotent producer wajib — aktifkan enable.idempotence: true untuk mencegah duplikat pesan saat retry; ini tidak berpengaruh pada performa tapi sangat penting untuk correctness.
  • Dead Letter Queue untuk pesan gagal — jangan biarkan pesan yang gagal memblokir seluruh consumer; setelah N percobaan, kirim ke DLQ untuk analisis dan reprocessing manual.
  • Exponential backoff untuk retry — jangan retry langsung; tunggu 2s, 4s, 8s dst untuk menghindari membebani sistem yang sedang bermasalah.
  • Karafka untuk proyek Rails — framework ini menangani lifecycle management, error handling, dan routing secara otomatis; gunakan ruby-kafka atau rdkafka hanya untuk kebutuhan yang sangat spesifik.
  • Monitor consumer lagconsumer_lag adalah metric paling penting; lag tinggi berarti consumer tidak bisa mengikuti producer dan perlu di-scale atau dioptimasi.
  • Avro untuk schema evolution — saat producer dan consumer diupdate secara independen, Avro dengan Schema Registry memastikan kompatibilitas backward/forward.
  • Exactly-once memerlukan transaksi — kombinasi idempotent producer + transactional.id + send_offsets_to_transaction memberikan jaminan exactly-once end-to-end.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact