RabbitMQ

RabbitMQ #

RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) — standar terbuka untuk messaging yang didukung banyak bahasa dan platform. Berbeda dari Kafka yang berbasis log stream, RabbitMQ adalah message queue yang sesungguhnya: pesan dihapus setelah berhasil dikonsumsi, mendukung routing yang sangat fleksibel melalui exchange dan binding, serta cocok untuk task queue, notifikasi, dan pola request-reply (RPC). Di Ruby, gem Bunny adalah client AMQP paling matang dan banyak digunakan — ringan, thread-safe, dan punya API yang sangat bersih. Untuk integrasi dengan Rails, Sneakers menyediakan framework worker yang dibangun di atas Bunny.

Konsep Dasar RabbitMQ #

Memahami bagaimana pesan mengalir di RabbitMQ adalah kunci untuk merancang arsitektur messaging yang benar:

flowchart LR
    P[Producer] --> EX{Exchange}
    EX -->|routing key: pesanan.baru| Q1[Queue: pesanan_baru]
    EX -->|routing key: pesanan.bayar| Q2[Queue: pembayaran]
    EX -->|routing key: pesanan.*| Q3[Queue: audit_log]
    Q1 --> C1[Consumer A\nLayanan Pesanan]
    Q2 --> C2[Consumer B\nLayanan Pembayaran]
    Q3 --> C3[Consumer C\nLayanan Audit]
Komponen utama RabbitMQ:
  Producer    — aplikasi yang mengirim pesan ke exchange
  Exchange    — terima pesan dari producer, routing ke queue berdasarkan aturan
  Binding     — aturan yang menghubungkan exchange ke queue
  Queue       — buffer yang menyimpan pesan sampai dikonsumsi
  Consumer    — aplikasi yang membaca dan memproses pesan dari queue
  Routing Key — label yang digunakan exchange untuk menentukan tujuan

Tipe Exchange:
  direct  — kirim ke queue yang binding key-nya cocok persis dengan routing key
  fanout  — broadcast ke SEMUA queue yang terikat, abaikan routing key
  topic   — routing key dengan wildcard (* = satu kata, # = banyak kata)
  headers — routing berdasarkan header pesan, bukan routing key

Instalasi #

gem install bunny

# Untuk Sneakers (worker framework untuk Rails)
gem install sneakers
# Gemfile
gem 'bunny',    '~> 2.22'   # AMQP client
gem 'sneakers', '~> 2.12'   # worker framework (opsional, untuk Rails)

Koneksi dan Channel #

require 'bunny'

# Koneksi dasar
koneksi = Bunny.new(
  host:     "localhost",
  port:     5672,
  vhost:    "/",
  username: "guest",
  password: "guest"
)

# Koneksi via URL (lebih ringkas)
koneksi = Bunny.new(
  ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672")
)

# Koneksi dengan TLS (production)
koneksi = Bunny.new(
  host:     "rabbitmq.example.com",
  port:     5671,                    # port TLS
  username: ENV["RABBITMQ_USER"],
  password: ENV["RABBITMQ_PASSWORD"],
  tls:      true,
  tls_ca_certificates: ["/path/to/ca-cert.pem"]
)

# Opsi tambahan
koneksi = Bunny.new(
  "amqp://localhost",
  heartbeat:              10,     # heartbeat setiap 10 detik
  connection_timeout:     5,      # timeout koneksi
  read_timeout:           30,     # timeout baca
  automatically_recover:  true,   # reconnect otomatis jika terputus
  recovery_attempts:      10,     # maksimal 10 percobaan reconnect
  recovery_delay:         2       # jeda 2 detik antar percobaan
)

koneksi.start

# Channel — unit komunikasi AMQP, satu koneksi bisa punya banyak channel
# Satu channel per thread adalah praktik yang direkomendasikan
channel = koneksi.create_channel

puts "Terhubung ke RabbitMQ #{koneksi.server_properties['version']}"

# Tutup koneksi
koneksi.close

Exchange Types dan Routing #

Direct Exchange — Routing Persis #

channel = koneksi.create_channel

# Deklarasi exchange
exchange = channel.direct(
  "pesanan_exchange",
  durable:     true,    # bertahan meski RabbitMQ restart
  auto_delete: false
)

# Deklarasi queue dan binding
queue_baru = channel.queue(
  "pesanan_baru",
  durable:    true,
  arguments: { "x-message-ttl" => 86_400_000 }  # TTL 24 jam
)

# Bind queue ke exchange dengan routing key
queue_baru.bind(exchange, routing_key: "pesanan.baru")

# Kirim pesan ke exchange dengan routing key
exchange.publish(
  JSON.generate({ pesanan_id: 123, total: 150_000 }),
  routing_key:  "pesanan.baru",
  persistent:   true,    # pesan tersimpan di disk (tidak hilang meski restart)
  content_type: "application/json"
)

Fanout Exchange — Broadcast ke Semua Queue #

# Fanout — kirim ke semua queue yang terikat, abaikan routing key
fanout = channel.fanout(
  "notifikasi_broadcast",
  durable: true
)

# Buat beberapa queue dan bind ke fanout
["email_notifikasi", "sms_notifikasi", "push_notifikasi"].each do |nama_queue|
  q = channel.queue(nama_queue, durable: true)
  q.bind(fanout)   # tidak perlu routing key untuk fanout
end

# Satu publish → semua queue menerima
fanout.publish(
  JSON.generate({ event: "flash_sale", diskon: 50, berakhir_pada: "2024-08-15T23:59:59" }),
  persistent: true
)

Topic Exchange — Routing dengan Wildcard #

# Topic exchange — routing key dengan pattern
# * = tepat satu kata
# # = nol atau lebih kata
topic = channel.topic("event_bus", durable: true)

# Binding dengan pattern
channel.queue("semua_pesanan", durable: true)
  .bind(topic, routing_key: "pesanan.#")      # semua event pesanan

channel.queue("pesanan_baru_dan_bayar", durable: true)
  .bind(topic, routing_key: "pesanan.baru")
  .bind(topic, routing_key: "pesanan.bayar")

channel.queue("semua_notifikasi_email", durable: true)
  .bind(topic, routing_key: "*.email.#")      # email dari domain apapun

# Publish dengan routing key hierarkis
topic.publish(
  JSON.generate({ pesanan_id: 456 }),
  routing_key: "pesanan.baru",
  persistent:  true
)

topic.publish(
  JSON.generate({ user_id: 99, template: "selamat_datang" }),
  routing_key: "user.email.selamat_datang",
  persistent:  true
)

Producer — Mengirim Pesan #

require 'bunny'
require 'json'

class PesananProducer
  def initialize
    @koneksi = Bunny.new(ENV["RABBITMQ_URL"])
    @koneksi.start
    @channel  = @koneksi.create_channel

    # Aktifkan publisher confirms — konfirmasi dari RabbitMQ bahwa pesan diterima
    @channel.confirm_select

    @exchange = @channel.topic("event_bus", durable: true)
  end

  def kirim_pesanan_dibuat(pesanan)
    payload = JSON.generate({
      event:       "pesanan_dibuat",
      pesanan_id:  pesanan.id,
      pengguna_id: pesanan.pengguna_id,
      total:       pesanan.total,
      item:        pesanan.item.map { |i| { produk_id: i.produk_id, jumlah: i.jumlah } },
      timestamp:   Time.now.iso8601
    })

    @exchange.publish(
      payload,
      routing_key:  "pesanan.baru",
      persistent:   true,
      content_type: "application/json",
      message_id:   SecureRandom.uuid,   # ID unik untuk idempotency
      timestamp:    Time.now.to_i
    )

    # Tunggu konfirmasi dari RabbitMQ (publisher confirms)
    unless @channel.wait_for_confirms
      raise "RabbitMQ tidak mengkonfirmasi pengiriman pesan!"
    end

    true
  rescue Bunny::Exception => e
    Rails.logger.error "Gagal kirim pesan ke RabbitMQ: #{e.message}"
    false
  end

  def tutup
    @koneksi.close
  end
end

Consumer — Membaca Pesan #

Consumer Dasar #

require 'bunny'
require 'json'

koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start

channel = koneksi.create_channel

# Prefetch count — berapa pesan yang dikirim ke consumer sebelum ada ack
# Nilai 1 = consumer hanya menerima 1 pesan, proses, ack, baru terima berikutnya
# Penting untuk distribusi beban yang adil antar consumer
channel.prefetch(1)

queue = channel.queue("pesanan_baru", durable: true)

puts "Menunggu pesan di queue '#{queue.name}'..."

# Subscribe — consumer loop (blocking)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
  begin
    data = JSON.parse(body, symbolize_names: true)
    puts "Memproses pesanan #{data[:pesanan_id]}"

    # Proses pesan
    proses_pesanan(data)

    # ACK — beritahu RabbitMQ pesan berhasil diproses, hapus dari queue
    channel.ack(delivery_info.delivery_tag)
    puts "ACK: pesanan #{data[:pesanan_id]} selesai"

  rescue JSON::ParserError => e
    puts "Pesan tidak valid: #{e.message}"
    # NACK tanpa requeue — kirim ke Dead Letter Exchange jika ada
    channel.nack(delivery_info.delivery_tag, false, false)

  rescue => e
    puts "Error: #{e.message}"
    # NACK dengan requeue — kembalikan ke queue untuk dicoba lagi
    channel.nack(delivery_info.delivery_tag, false, true)
  end
end

Consumer Multi-Thread #

require 'bunny'

koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start

# Buat N thread worker, masing-masing dengan channel sendiri
JUMLAH_WORKER = 5

workers = JUMLAH_WORKER.times.map do |i|
  Thread.new do
    ch = koneksi.create_channel
    ch.prefetch(1)
    q  = ch.queue("pesanan_baru", durable: true)

    q.subscribe(manual_ack: true) do |delivery_info, _properties, body|
      begin
        data = JSON.parse(body, symbolize_names: true)
        puts "[Worker #{i}] Proses pesanan #{data[:pesanan_id]}"

        proses_pesanan(data)
        ch.ack(delivery_info.delivery_tag)

      rescue => e
        puts "[Worker #{i}] Error: #{e.message}"
        ch.nack(delivery_info.delivery_tag, false, false)
      end
    end

    # Thread berjalan terus sampai sinyal berhenti
    loop { sleep 1 }
  end
end

# Tunggu semua worker
trap("SIGTERM") { koneksi.close; exit }
trap("SIGINT")  { koneksi.close; exit }
workers.each(&:join)

Acknowledgment — ACK, NACK, Reject #

Acknowledgment adalah mekanisme yang memastikan pesan tidak hilang saat consumer crash:

# Tiga pilihan setelah memproses pesan:

# 1. ACK — berhasil diproses, hapus dari queue
channel.ack(delivery_info.delivery_tag)

# 2. NACK dengan requeue: true — kembalikan ke queue (consumer lain bisa ambil)
channel.nack(delivery_info.delivery_tag, false, true)
# HATI-HATI: bisa menyebabkan infinite loop jika pesan selalu gagal!

# 3. NACK dengan requeue: false — hapus dari queue
#    Jika ada Dead Letter Exchange → masuk ke DLX
#    Jika tidak ada → pesan hilang
channel.nack(delivery_info.delivery_tag, false, false)

# 4. Reject — sama seperti NACK tapi hanya untuk satu pesan
channel.reject(delivery_info.delivery_tag, false)  # false = tidak requeue

# Strategi yang direkomendasikan:
# - Berhasil → ACK
# - Error sementara (network, timeout) → NACK requeue: true (dengan backoff)
# - Error permanen (data tidak valid) → NACK requeue: false → masuk DLX

Dead Letter Exchange — Tangani Pesan Gagal #

DLX (Dead Letter Exchange) adalah exchange tempat pesan “mati” dikirim — baik karena di-NACK, TTL habis, atau queue penuh:

channel = koneksi.create_channel

# 1. Buat Dead Letter Exchange dan Queue
dlx = channel.direct("pesanan_dlx", durable: true)
dlq = channel.queue("pesanan_dead_letter", durable: true)
dlq.bind(dlx, routing_key: "pesanan_baru")

# 2. Buat queue utama dengan konfigurasi DLX
queue_utama = channel.queue(
  "pesanan_baru",
  durable:   true,
  arguments: {
    "x-dead-letter-exchange"    => "pesanan_dlx",   # nama DLX
    "x-dead-letter-routing-key" => "pesanan_baru",  # routing key di DLX
    "x-message-ttl"             => 3_600_000,       # TTL 1 jam (opsional)
    "x-max-retries"             => 3                # custom header untuk tracking
  }
)

# 3. Bind queue utama ke exchange utama
exchange = channel.topic("event_bus", durable: true)
queue_utama.bind(exchange, routing_key: "pesanan.baru")

# 4. Consumer dengan logika retry dan DLX
queue_utama.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
  headers        = properties.headers || {}
  jumlah_kematian = headers["x-death"]&.first&.dig("count") || 0

  begin
    proses_pesanan(JSON.parse(body, symbolize_names: true))
    channel.ack(delivery_info.delivery_tag)

  rescue => e
    puts "Error (percobaan #{jumlah_kematian + 1}): #{e.message}"

    if jumlah_kematian >= 2   # sudah 3x gagal (0, 1, 2)
      puts "Menyerah setelah 3 percobaan, kirim ke DLX"
      channel.nack(delivery_info.delivery_tag, false, false)  # → DLX
    else
      # Kembali ke queue dengan delay (implementasi simple)
      sleep(2 ** jumlah_kematian)   # 1s, 2s, 4s
      channel.nack(delivery_info.delivery_tag, false, true)   # requeue
    end
  end
end

# 5. Consumer DLQ — monitor dan proses pesan yang benar-benar gagal
dlq.subscribe(manual_ack: true) do |delivery_info, properties, body|
  puts "Pesan di DLQ: #{body}"

  # Log ke sistem monitoring
  Monitoring.alert("Pesan di DLQ", body: body, headers: properties.headers)

  # Opsional: notifikasi ke developer atau simpan ke database untuk analisis
  PesananGagal.create!(payload: body, gagal_pada: Time.now)

  channel.ack(delivery_info.delivery_tag)
end

Priority Queue #

RabbitMQ mendukung queue dengan prioritas — pesan prioritas tinggi diproses lebih dulu:

# Buat priority queue dengan maksimal 10 level prioritas
priority_queue = channel.queue(
  "notifikasi_prioritas",
  durable:   true,
  arguments: { "x-max-priority" => 10 }
)

exchange = channel.direct("notifikasi_exchange", durable: true)
priority_queue.bind(exchange, routing_key: "notifikasi")

# Kirim pesan dengan prioritas berbeda
# Prioritas 0 (terendah) sampai 10 (tertinggi)
exchange.publish(
  JSON.generate({ pesan: "Info biasa", tipe: "info" }),
  routing_key: "notifikasi",
  priority:    1,
  persistent:  true
)

exchange.publish(
  JSON.generate({ pesan: "Server down!", tipe: "kritis" }),
  routing_key: "notifikasi",
  priority:    10,   # tertinggi — diproses paling dahulu
  persistent:  true
)

exchange.publish(
  JSON.generate({ pesan: "Stok menipis", tipe: "peringatan" }),
  routing_key: "notifikasi",
  priority:    5,
  persistent:  true
)

Request-Reply Pattern (RPC) #

RabbitMQ mendukung pola RPC di mana producer menunggu respons dari consumer:

# CLIENT — kirim request dan tunggu reply
class KalkulatorRPCClient
  def initialize
    @koneksi  = Bunny.new(ENV["RABBITMQ_URL"])
    @koneksi.start
    @channel  = @koneksi.create_channel

    # Queue eksklusif untuk menerima reply (otomatis dihapus saat disconnected)
    @reply_queue = @channel.queue("", exclusive: true)
    @exchange    = @channel.default_exchange

    # Hash untuk menyimpan respons yang menunggu
    @lock      = Mutex.new
    @kondisi   = ConditionVariable.new
    @respons   = {}

    # Subscribe ke reply queue
    @reply_queue.subscribe do |_di, properties, body|
      correlation_id = properties.correlation_id
      @lock.synchronize do
        @respons[correlation_id] = body
        @kondisi.signal
      end
    end
  end

  def faktorial(n)
    correlation_id = SecureRandom.uuid

    @exchange.publish(
      n.to_s,
      routing_key:    "rpc_faktorial",
      reply_to:       @reply_queue.name,
      correlation_id: correlation_id
    )

    # Tunggu respons dengan timeout 5 detik
    @lock.synchronize do
      @kondisi.wait(@lock, 5)
      hasil = @respons.delete(correlation_id)
      raise "RPC timeout!" unless hasil
      JSON.parse(hasil)
    end
  end

  def tutup
    @koneksi.close
  end
end

# SERVER — terima request, proses, kirim reply
def jalankan_rpc_server
  koneksi = Bunny.new(ENV["RABBITMQ_URL"])
  koneksi.start
  channel = koneksi.create_channel
  channel.prefetch(1)

  queue = channel.queue("rpc_faktorial", durable: false)

  queue.subscribe(manual_ack: true, block: true) do |di, properties, body|
    n      = body.to_i
    hasil  = faktorial_rekursif(n)

    # Kirim reply ke reply_to queue dengan correlation_id yang sama
    channel.default_exchange.publish(
      JSON.generate({ hasil: hasil, input: n }),
      routing_key:    properties.reply_to,
      correlation_id: properties.correlation_id
    )

    channel.ack(di.delivery_tag)
  end
end

# Penggunaan RPC
klien = KalkulatorRPCClient.new
puts klien.faktorial(10)   # => {"hasil"=>3628800, "input"=>10}
klien.tutup

Sneakers — Background Job untuk Rails #

Sneakers adalah framework worker RabbitMQ yang dibangun di atas Bunny, mirip Sidekiq tapi untuk RabbitMQ:

# Gemfile
# gem 'sneakers', '~> 2.12'

# config/initializers/sneakers.rb
Sneakers.configure(
  amqp:          ENV.fetch("RABBITMQ_URL", "amqp://localhost"),
  vhost:         "/",
  heartbeat:     10,
  prefetch:      10,
  workers:       4,
  threads:       4,
  log:           Rails.root.join("log", "sneakers.log"),
  pid_path:      Rails.root.join("tmp", "pids", "sneakers.pid"),
  daemonize:     false,
  metrics:       Sneakers::Metrics::LoggingMetrics.new
)
# app/workers/pesanan_worker.rb
class PesananWorker
  include Sneakers::Worker

  from_queue "pesanan_baru",
    durable:     true,
    ack:         :manual,    # manual acknowledgment
    timeout_job_after: 60,   # 60 detik timeout per job
    exchange:    "event_bus",
    exchange_type: :topic,
    routing_key: "pesanan.baru"

  def work(pesan)
    data = JSON.parse(pesan, symbolize_names: true)
    Rails.logger.info "Memproses pesanan: #{data[:pesanan_id]}"

    ActiveRecord::Base.transaction do
      pesanan = Pesanan.find(data[:pesanan_id])
      pesanan.update!(status: :diproses)
      KirimEmailPesanan.call(pesanan)
      PerbaruhiStok.call(pesanan)
    end

    ack!   # berhasil → ACK

  rescue ActiveRecord::RecordNotFound => e
    Rails.logger.error "Pesanan tidak ditemukan: #{e.message}"
    reject!   # data tidak valid → jangan requeue, kirim ke DLX

  rescue => e
    Rails.logger.error "Error: #{e.message}"
    requeue!  # error sementara → kembalikan ke queue
  end
end

# app/workers/notifikasi_worker.rb
class NotifikasiWorker
  include Sneakers::Worker

  from_queue "notifikasi_email",
    durable:    true,
    ack:        :manual

  def work(pesan)
    data = JSON.parse(pesan, symbolize_names: true)
    NotifikasiMailer.kirim(data[:email], data[:subjek], data[:isi]).deliver_now
    ack!
  rescue => e
    Rails.logger.error "Gagal kirim notifikasi: #{e.message}"
    reject!
  end
end
# Jalankan semua worker yang terdaftar
bundle exec rake sneakers:run

# Atau jalankan worker tertentu
WORKERS=PesananWorker,NotifikasiWorker bundle exec rake sneakers:run

# Sebagai daemon
bundle exec rake sneakers:run DAEMONIZE=true

Graceful Shutdown #

# Tangani SIGTERM untuk shutdown yang bersih
require 'bunny'

koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start
channel = koneksi.create_channel
channel.prefetch(1)

queue = channel.queue("pesanan_baru", durable: true)
sedang_berjalan = true
pesan_dalam_proses = 0

trap("SIGTERM") do
  puts "SIGTERM diterima, menunggu pesan selesai diproses..."
  sedang_berjalan = false
end

trap("SIGINT") do
  sedang_berjalan = false
end

queue.subscribe(manual_ack: true) do |delivery_info, properties, body|
  break unless sedang_berjalan

  pesan_dalam_proses += 1
  begin
    proses_pesanan(JSON.parse(body))
    channel.ack(delivery_info.delivery_tag)
  rescue => e
    channel.nack(delivery_info.delivery_tag, false, true)
  ensure
    pesan_dalam_proses -= 1
  end
end

# Tunggu semua pesan selesai sebelum tutup
sleep(0.1) while pesan_dalam_proses > 0
koneksi.close
puts "Shutdown selesai"

Monitoring via Management API #

RabbitMQ menyediakan HTTP API untuk monitoring:

require 'net/http'
require 'json'

class RabbitMQMonitor
  BASE_URL = "http://localhost:15672/api"

  def initialize(user: "guest", password: "guest")
    @user     = user
    @password = password
  end

  def statistik_queue(nama_queue, vhost: "%2F")
    respons = get("/queues/#{vhost}/#{nama_queue}")
    {
      pesan_menunggu: respons["messages"],
      pesan_diproses: respons["messages_unacknowledged"],
      consumer_aktif: respons["consumers"],
      publish_rate:   respons.dig("message_stats", "publish_details", "rate")&.round(2)
    }
  end

  def semua_queue(vhost: "%2F")
    get("/queues/#{vhost}").map do |q|
      { nama: q["name"], pesan: q["messages"], consumer: q["consumers"] }
    end
  end

  def health_check
    get("/healthchecks/node")["status"] == "ok"
  end

  private

  def get(path)
    uri  = URI("#{BASE_URL}#{path}")
    req  = Net::HTTP::Get.new(uri)
    req.basic_auth(@user, @password)
    resp = Net::HTTP.start(uri.host, uri.port) { |http| http.request(req) }
    JSON.parse(resp.body)
  end
end

monitor = RabbitMQMonitor.new
stats = monitor.statistik_queue("pesanan_baru")
puts "Pesan menunggu: #{stats[:pesan_menunggu]}"
puts "Consumer aktif: #{stats[:consumer_aktif]}"

if stats[:pesan_menunggu] > 1000
  Monitoring.alert("Queue penuh: pesanan_baru memiliki #{stats[:pesan_menunggu]} pesan!")
end

Ringkasan #

  • Pilih exchange type yang tepatdirect untuk routing spesifik, fanout untuk broadcast, topic untuk routing fleksibel dengan wildcard; salah pilih exchange type menyebabkan pesan tidak sampai tujuan.
  • manual_ack: true dan prefetch(1) selalu — auto-ack bisa menyebabkan kehilangan pesan jika consumer crash; prefetch(1) memastikan distribusi beban yang adil antar consumer.
  • Pesan persistent: true untuk durabilitas — tanpa ini, pesan hilang jika RabbitMQ restart; kombinasikan dengan queue yang durable: true.
  • Publisher confirms untuk keandalan producerchannel.confirm_select dan channel.wait_for_confirms memastikan RabbitMQ benar-benar menerima pesan sebelum lanjut.
  • Dead Letter Exchange untuk penanganan error — selalu konfigurasi DLX pada queue produksi; pesan yang gagal masuk DLX untuk analisis daripada hilang begitu saja.
  • Jangan NACK requeue: true tanpa backoff — requeue langsung bisa menyebabkan consumer memproses pesan yang sama ribuan kali per detik; implementasikan exponential backoff.
  • Satu channel per thread — Channel RabbitMQ tidak thread-safe; buat channel baru untuk setiap thread worker, jangan share channel antar thread.
  • automatically_recover: true untuk reconnect otomatis — koneksi ke RabbitMQ bisa terputus; Bunny bisa reconnect otomatis dan resubscribe queue secara transparan.
  • Sneakers untuk integrasi Rails — framework ini menangani worker lifecycle, prefork, dan integrasi ActiveRecord; lebih mudah dari mengelola Bunny secara manual di Rails.
  • Monitor queue depth sebagai metrik utama — queue yang terus bertambah tanpa berkurang menandakan consumer kewalahan; scale out consumer atau optimasi processing time.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact