RabbitMQ #
RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) — standar terbuka untuk messaging yang didukung banyak bahasa dan platform. Berbeda dari Kafka yang berbasis log stream, RabbitMQ adalah message queue yang sesungguhnya: pesan dihapus setelah berhasil dikonsumsi, mendukung routing yang sangat fleksibel melalui exchange dan binding, serta cocok untuk task queue, notifikasi, dan pola request-reply (RPC). Di Ruby, gem Bunny adalah client AMQP paling matang dan banyak digunakan — ringan, thread-safe, dan punya API yang sangat bersih. Untuk integrasi dengan Rails, Sneakers menyediakan framework worker yang dibangun di atas Bunny.
Konsep Dasar RabbitMQ #
Memahami bagaimana pesan mengalir di RabbitMQ adalah kunci untuk merancang arsitektur messaging yang benar:
flowchart LR
P[Producer] --> EX{Exchange}
EX -->|routing key: pesanan.baru| Q1[Queue: pesanan_baru]
EX -->|routing key: pesanan.bayar| Q2[Queue: pembayaran]
EX -->|routing key: pesanan.*| Q3[Queue: audit_log]
Q1 --> C1[Consumer A\nLayanan Pesanan]
Q2 --> C2[Consumer B\nLayanan Pembayaran]
Q3 --> C3[Consumer C\nLayanan Audit]Komponen utama RabbitMQ:
Producer — aplikasi yang mengirim pesan ke exchange
Exchange — terima pesan dari producer, routing ke queue berdasarkan aturan
Binding — aturan yang menghubungkan exchange ke queue
Queue — buffer yang menyimpan pesan sampai dikonsumsi
Consumer — aplikasi yang membaca dan memproses pesan dari queue
Routing Key — label yang digunakan exchange untuk menentukan tujuan
Tipe Exchange:
direct — kirim ke queue yang binding key-nya cocok persis dengan routing key
fanout — broadcast ke SEMUA queue yang terikat, abaikan routing key
topic — routing key dengan wildcard (* = satu kata, # = banyak kata)
headers — routing berdasarkan header pesan, bukan routing key
Instalasi #
gem install bunny
# Untuk Sneakers (worker framework untuk Rails)
gem install sneakers
# Gemfile
gem 'bunny', '~> 2.22' # AMQP client
gem 'sneakers', '~> 2.12' # worker framework (opsional, untuk Rails)
Koneksi dan Channel #
require 'bunny'
# Koneksi dasar
koneksi = Bunny.new(
host: "localhost",
port: 5672,
vhost: "/",
username: "guest",
password: "guest"
)
# Koneksi via URL (lebih ringkas)
koneksi = Bunny.new(
ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672")
)
# Koneksi dengan TLS (production)
koneksi = Bunny.new(
host: "rabbitmq.example.com",
port: 5671, # port TLS
username: ENV["RABBITMQ_USER"],
password: ENV["RABBITMQ_PASSWORD"],
tls: true,
tls_ca_certificates: ["/path/to/ca-cert.pem"]
)
# Opsi tambahan
koneksi = Bunny.new(
"amqp://localhost",
heartbeat: 10, # heartbeat setiap 10 detik
connection_timeout: 5, # timeout koneksi
read_timeout: 30, # timeout baca
automatically_recover: true, # reconnect otomatis jika terputus
recovery_attempts: 10, # maksimal 10 percobaan reconnect
recovery_delay: 2 # jeda 2 detik antar percobaan
)
koneksi.start
# Channel — unit komunikasi AMQP, satu koneksi bisa punya banyak channel
# Satu channel per thread adalah praktik yang direkomendasikan
channel = koneksi.create_channel
puts "Terhubung ke RabbitMQ #{koneksi.server_properties['version']}"
# Tutup koneksi
koneksi.close
Exchange Types dan Routing #
Direct Exchange — Routing Persis #
channel = koneksi.create_channel
# Deklarasi exchange
exchange = channel.direct(
"pesanan_exchange",
durable: true, # bertahan meski RabbitMQ restart
auto_delete: false
)
# Deklarasi queue dan binding
queue_baru = channel.queue(
"pesanan_baru",
durable: true,
arguments: { "x-message-ttl" => 86_400_000 } # TTL 24 jam
)
# Bind queue ke exchange dengan routing key
queue_baru.bind(exchange, routing_key: "pesanan.baru")
# Kirim pesan ke exchange dengan routing key
exchange.publish(
JSON.generate({ pesanan_id: 123, total: 150_000 }),
routing_key: "pesanan.baru",
persistent: true, # pesan tersimpan di disk (tidak hilang meski restart)
content_type: "application/json"
)
Fanout Exchange — Broadcast ke Semua Queue #
# Fanout — kirim ke semua queue yang terikat, abaikan routing key
fanout = channel.fanout(
"notifikasi_broadcast",
durable: true
)
# Buat beberapa queue dan bind ke fanout
["email_notifikasi", "sms_notifikasi", "push_notifikasi"].each do |nama_queue|
q = channel.queue(nama_queue, durable: true)
q.bind(fanout) # tidak perlu routing key untuk fanout
end
# Satu publish → semua queue menerima
fanout.publish(
JSON.generate({ event: "flash_sale", diskon: 50, berakhir_pada: "2024-08-15T23:59:59" }),
persistent: true
)
Topic Exchange — Routing dengan Wildcard #
# Topic exchange — routing key dengan pattern
# * = tepat satu kata
# # = nol atau lebih kata
topic = channel.topic("event_bus", durable: true)
# Binding dengan pattern
channel.queue("semua_pesanan", durable: true)
.bind(topic, routing_key: "pesanan.#") # semua event pesanan
channel.queue("pesanan_baru_dan_bayar", durable: true)
.bind(topic, routing_key: "pesanan.baru")
.bind(topic, routing_key: "pesanan.bayar")
channel.queue("semua_notifikasi_email", durable: true)
.bind(topic, routing_key: "*.email.#") # email dari domain apapun
# Publish dengan routing key hierarkis
topic.publish(
JSON.generate({ pesanan_id: 456 }),
routing_key: "pesanan.baru",
persistent: true
)
topic.publish(
JSON.generate({ user_id: 99, template: "selamat_datang" }),
routing_key: "user.email.selamat_datang",
persistent: true
)
Producer — Mengirim Pesan #
require 'bunny'
require 'json'
class PesananProducer
def initialize
@koneksi = Bunny.new(ENV["RABBITMQ_URL"])
@koneksi.start
@channel = @koneksi.create_channel
# Aktifkan publisher confirms — konfirmasi dari RabbitMQ bahwa pesan diterima
@channel.confirm_select
@exchange = @channel.topic("event_bus", durable: true)
end
def kirim_pesanan_dibuat(pesanan)
payload = JSON.generate({
event: "pesanan_dibuat",
pesanan_id: pesanan.id,
pengguna_id: pesanan.pengguna_id,
total: pesanan.total,
item: pesanan.item.map { |i| { produk_id: i.produk_id, jumlah: i.jumlah } },
timestamp: Time.now.iso8601
})
@exchange.publish(
payload,
routing_key: "pesanan.baru",
persistent: true,
content_type: "application/json",
message_id: SecureRandom.uuid, # ID unik untuk idempotency
timestamp: Time.now.to_i
)
# Tunggu konfirmasi dari RabbitMQ (publisher confirms)
unless @channel.wait_for_confirms
raise "RabbitMQ tidak mengkonfirmasi pengiriman pesan!"
end
true
rescue Bunny::Exception => e
Rails.logger.error "Gagal kirim pesan ke RabbitMQ: #{e.message}"
false
end
def tutup
@koneksi.close
end
end
Consumer — Membaca Pesan #
Consumer Dasar #
require 'bunny'
require 'json'
koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start
channel = koneksi.create_channel
# Prefetch count — berapa pesan yang dikirim ke consumer sebelum ada ack
# Nilai 1 = consumer hanya menerima 1 pesan, proses, ack, baru terima berikutnya
# Penting untuk distribusi beban yang adil antar consumer
channel.prefetch(1)
queue = channel.queue("pesanan_baru", durable: true)
puts "Menunggu pesan di queue '#{queue.name}'..."
# Subscribe — consumer loop (blocking)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
begin
data = JSON.parse(body, symbolize_names: true)
puts "Memproses pesanan #{data[:pesanan_id]}"
# Proses pesan
proses_pesanan(data)
# ACK — beritahu RabbitMQ pesan berhasil diproses, hapus dari queue
channel.ack(delivery_info.delivery_tag)
puts "ACK: pesanan #{data[:pesanan_id]} selesai"
rescue JSON::ParserError => e
puts "Pesan tidak valid: #{e.message}"
# NACK tanpa requeue — kirim ke Dead Letter Exchange jika ada
channel.nack(delivery_info.delivery_tag, false, false)
rescue => e
puts "Error: #{e.message}"
# NACK dengan requeue — kembalikan ke queue untuk dicoba lagi
channel.nack(delivery_info.delivery_tag, false, true)
end
end
Consumer Multi-Thread #
require 'bunny'
koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start
# Buat N thread worker, masing-masing dengan channel sendiri
JUMLAH_WORKER = 5
workers = JUMLAH_WORKER.times.map do |i|
Thread.new do
ch = koneksi.create_channel
ch.prefetch(1)
q = ch.queue("pesanan_baru", durable: true)
q.subscribe(manual_ack: true) do |delivery_info, _properties, body|
begin
data = JSON.parse(body, symbolize_names: true)
puts "[Worker #{i}] Proses pesanan #{data[:pesanan_id]}"
proses_pesanan(data)
ch.ack(delivery_info.delivery_tag)
rescue => e
puts "[Worker #{i}] Error: #{e.message}"
ch.nack(delivery_info.delivery_tag, false, false)
end
end
# Thread berjalan terus sampai sinyal berhenti
loop { sleep 1 }
end
end
# Tunggu semua worker
trap("SIGTERM") { koneksi.close; exit }
trap("SIGINT") { koneksi.close; exit }
workers.each(&:join)
Acknowledgment — ACK, NACK, Reject #
Acknowledgment adalah mekanisme yang memastikan pesan tidak hilang saat consumer crash:
# Tiga pilihan setelah memproses pesan:
# 1. ACK — berhasil diproses, hapus dari queue
channel.ack(delivery_info.delivery_tag)
# 2. NACK dengan requeue: true — kembalikan ke queue (consumer lain bisa ambil)
channel.nack(delivery_info.delivery_tag, false, true)
# HATI-HATI: bisa menyebabkan infinite loop jika pesan selalu gagal!
# 3. NACK dengan requeue: false — hapus dari queue
# Jika ada Dead Letter Exchange → masuk ke DLX
# Jika tidak ada → pesan hilang
channel.nack(delivery_info.delivery_tag, false, false)
# 4. Reject — sama seperti NACK tapi hanya untuk satu pesan
channel.reject(delivery_info.delivery_tag, false) # false = tidak requeue
# Strategi yang direkomendasikan:
# - Berhasil → ACK
# - Error sementara (network, timeout) → NACK requeue: true (dengan backoff)
# - Error permanen (data tidak valid) → NACK requeue: false → masuk DLX
Dead Letter Exchange — Tangani Pesan Gagal #
DLX (Dead Letter Exchange) adalah exchange tempat pesan “mati” dikirim — baik karena di-NACK, TTL habis, atau queue penuh:
channel = koneksi.create_channel
# 1. Buat Dead Letter Exchange dan Queue
dlx = channel.direct("pesanan_dlx", durable: true)
dlq = channel.queue("pesanan_dead_letter", durable: true)
dlq.bind(dlx, routing_key: "pesanan_baru")
# 2. Buat queue utama dengan konfigurasi DLX
queue_utama = channel.queue(
"pesanan_baru",
durable: true,
arguments: {
"x-dead-letter-exchange" => "pesanan_dlx", # nama DLX
"x-dead-letter-routing-key" => "pesanan_baru", # routing key di DLX
"x-message-ttl" => 3_600_000, # TTL 1 jam (opsional)
"x-max-retries" => 3 # custom header untuk tracking
}
)
# 3. Bind queue utama ke exchange utama
exchange = channel.topic("event_bus", durable: true)
queue_utama.bind(exchange, routing_key: "pesanan.baru")
# 4. Consumer dengan logika retry dan DLX
queue_utama.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
headers = properties.headers || {}
jumlah_kematian = headers["x-death"]&.first&.dig("count") || 0
begin
proses_pesanan(JSON.parse(body, symbolize_names: true))
channel.ack(delivery_info.delivery_tag)
rescue => e
puts "Error (percobaan #{jumlah_kematian + 1}): #{e.message}"
if jumlah_kematian >= 2 # sudah 3x gagal (0, 1, 2)
puts "Menyerah setelah 3 percobaan, kirim ke DLX"
channel.nack(delivery_info.delivery_tag, false, false) # → DLX
else
# Kembali ke queue dengan delay (implementasi simple)
sleep(2 ** jumlah_kematian) # 1s, 2s, 4s
channel.nack(delivery_info.delivery_tag, false, true) # requeue
end
end
end
# 5. Consumer DLQ — monitor dan proses pesan yang benar-benar gagal
dlq.subscribe(manual_ack: true) do |delivery_info, properties, body|
puts "Pesan di DLQ: #{body}"
# Log ke sistem monitoring
Monitoring.alert("Pesan di DLQ", body: body, headers: properties.headers)
# Opsional: notifikasi ke developer atau simpan ke database untuk analisis
PesananGagal.create!(payload: body, gagal_pada: Time.now)
channel.ack(delivery_info.delivery_tag)
end
Priority Queue #
RabbitMQ mendukung queue dengan prioritas — pesan prioritas tinggi diproses lebih dulu:
# Buat priority queue dengan maksimal 10 level prioritas
priority_queue = channel.queue(
"notifikasi_prioritas",
durable: true,
arguments: { "x-max-priority" => 10 }
)
exchange = channel.direct("notifikasi_exchange", durable: true)
priority_queue.bind(exchange, routing_key: "notifikasi")
# Kirim pesan dengan prioritas berbeda
# Prioritas 0 (terendah) sampai 10 (tertinggi)
exchange.publish(
JSON.generate({ pesan: "Info biasa", tipe: "info" }),
routing_key: "notifikasi",
priority: 1,
persistent: true
)
exchange.publish(
JSON.generate({ pesan: "Server down!", tipe: "kritis" }),
routing_key: "notifikasi",
priority: 10, # tertinggi — diproses paling dahulu
persistent: true
)
exchange.publish(
JSON.generate({ pesan: "Stok menipis", tipe: "peringatan" }),
routing_key: "notifikasi",
priority: 5,
persistent: true
)
Request-Reply Pattern (RPC) #
RabbitMQ mendukung pola RPC di mana producer menunggu respons dari consumer:
# CLIENT — kirim request dan tunggu reply
class KalkulatorRPCClient
def initialize
@koneksi = Bunny.new(ENV["RABBITMQ_URL"])
@koneksi.start
@channel = @koneksi.create_channel
# Queue eksklusif untuk menerima reply (otomatis dihapus saat disconnected)
@reply_queue = @channel.queue("", exclusive: true)
@exchange = @channel.default_exchange
# Hash untuk menyimpan respons yang menunggu
@lock = Mutex.new
@kondisi = ConditionVariable.new
@respons = {}
# Subscribe ke reply queue
@reply_queue.subscribe do |_di, properties, body|
correlation_id = properties.correlation_id
@lock.synchronize do
@respons[correlation_id] = body
@kondisi.signal
end
end
end
def faktorial(n)
correlation_id = SecureRandom.uuid
@exchange.publish(
n.to_s,
routing_key: "rpc_faktorial",
reply_to: @reply_queue.name,
correlation_id: correlation_id
)
# Tunggu respons dengan timeout 5 detik
@lock.synchronize do
@kondisi.wait(@lock, 5)
hasil = @respons.delete(correlation_id)
raise "RPC timeout!" unless hasil
JSON.parse(hasil)
end
end
def tutup
@koneksi.close
end
end
# SERVER — terima request, proses, kirim reply
def jalankan_rpc_server
koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start
channel = koneksi.create_channel
channel.prefetch(1)
queue = channel.queue("rpc_faktorial", durable: false)
queue.subscribe(manual_ack: true, block: true) do |di, properties, body|
n = body.to_i
hasil = faktorial_rekursif(n)
# Kirim reply ke reply_to queue dengan correlation_id yang sama
channel.default_exchange.publish(
JSON.generate({ hasil: hasil, input: n }),
routing_key: properties.reply_to,
correlation_id: properties.correlation_id
)
channel.ack(di.delivery_tag)
end
end
# Penggunaan RPC
klien = KalkulatorRPCClient.new
puts klien.faktorial(10) # => {"hasil"=>3628800, "input"=>10}
klien.tutup
Sneakers — Background Job untuk Rails #
Sneakers adalah framework worker RabbitMQ yang dibangun di atas Bunny, mirip Sidekiq tapi untuk RabbitMQ:
# Gemfile
# gem 'sneakers', '~> 2.12'
# config/initializers/sneakers.rb
Sneakers.configure(
amqp: ENV.fetch("RABBITMQ_URL", "amqp://localhost"),
vhost: "/",
heartbeat: 10,
prefetch: 10,
workers: 4,
threads: 4,
log: Rails.root.join("log", "sneakers.log"),
pid_path: Rails.root.join("tmp", "pids", "sneakers.pid"),
daemonize: false,
metrics: Sneakers::Metrics::LoggingMetrics.new
)
# app/workers/pesanan_worker.rb
class PesananWorker
include Sneakers::Worker
from_queue "pesanan_baru",
durable: true,
ack: :manual, # manual acknowledgment
timeout_job_after: 60, # 60 detik timeout per job
exchange: "event_bus",
exchange_type: :topic,
routing_key: "pesanan.baru"
def work(pesan)
data = JSON.parse(pesan, symbolize_names: true)
Rails.logger.info "Memproses pesanan: #{data[:pesanan_id]}"
ActiveRecord::Base.transaction do
pesanan = Pesanan.find(data[:pesanan_id])
pesanan.update!(status: :diproses)
KirimEmailPesanan.call(pesanan)
PerbaruhiStok.call(pesanan)
end
ack! # berhasil → ACK
rescue ActiveRecord::RecordNotFound => e
Rails.logger.error "Pesanan tidak ditemukan: #{e.message}"
reject! # data tidak valid → jangan requeue, kirim ke DLX
rescue => e
Rails.logger.error "Error: #{e.message}"
requeue! # error sementara → kembalikan ke queue
end
end
# app/workers/notifikasi_worker.rb
class NotifikasiWorker
include Sneakers::Worker
from_queue "notifikasi_email",
durable: true,
ack: :manual
def work(pesan)
data = JSON.parse(pesan, symbolize_names: true)
NotifikasiMailer.kirim(data[:email], data[:subjek], data[:isi]).deliver_now
ack!
rescue => e
Rails.logger.error "Gagal kirim notifikasi: #{e.message}"
reject!
end
end
# Jalankan semua worker yang terdaftar
bundle exec rake sneakers:run
# Atau jalankan worker tertentu
WORKERS=PesananWorker,NotifikasiWorker bundle exec rake sneakers:run
# Sebagai daemon
bundle exec rake sneakers:run DAEMONIZE=true
Graceful Shutdown #
# Tangani SIGTERM untuk shutdown yang bersih
require 'bunny'
koneksi = Bunny.new(ENV["RABBITMQ_URL"])
koneksi.start
channel = koneksi.create_channel
channel.prefetch(1)
queue = channel.queue("pesanan_baru", durable: true)
sedang_berjalan = true
pesan_dalam_proses = 0
trap("SIGTERM") do
puts "SIGTERM diterima, menunggu pesan selesai diproses..."
sedang_berjalan = false
end
trap("SIGINT") do
sedang_berjalan = false
end
queue.subscribe(manual_ack: true) do |delivery_info, properties, body|
break unless sedang_berjalan
pesan_dalam_proses += 1
begin
proses_pesanan(JSON.parse(body))
channel.ack(delivery_info.delivery_tag)
rescue => e
channel.nack(delivery_info.delivery_tag, false, true)
ensure
pesan_dalam_proses -= 1
end
end
# Tunggu semua pesan selesai sebelum tutup
sleep(0.1) while pesan_dalam_proses > 0
koneksi.close
puts "Shutdown selesai"
Monitoring via Management API #
RabbitMQ menyediakan HTTP API untuk monitoring:
require 'net/http'
require 'json'
class RabbitMQMonitor
BASE_URL = "http://localhost:15672/api"
def initialize(user: "guest", password: "guest")
@user = user
@password = password
end
def statistik_queue(nama_queue, vhost: "%2F")
respons = get("/queues/#{vhost}/#{nama_queue}")
{
pesan_menunggu: respons["messages"],
pesan_diproses: respons["messages_unacknowledged"],
consumer_aktif: respons["consumers"],
publish_rate: respons.dig("message_stats", "publish_details", "rate")&.round(2)
}
end
def semua_queue(vhost: "%2F")
get("/queues/#{vhost}").map do |q|
{ nama: q["name"], pesan: q["messages"], consumer: q["consumers"] }
end
end
def health_check
get("/healthchecks/node")["status"] == "ok"
end
private
def get(path)
uri = URI("#{BASE_URL}#{path}")
req = Net::HTTP::Get.new(uri)
req.basic_auth(@user, @password)
resp = Net::HTTP.start(uri.host, uri.port) { |http| http.request(req) }
JSON.parse(resp.body)
end
end
monitor = RabbitMQMonitor.new
stats = monitor.statistik_queue("pesanan_baru")
puts "Pesan menunggu: #{stats[:pesan_menunggu]}"
puts "Consumer aktif: #{stats[:consumer_aktif]}"
if stats[:pesan_menunggu] > 1000
Monitoring.alert("Queue penuh: pesanan_baru memiliki #{stats[:pesan_menunggu]} pesan!")
end
Ringkasan #
- Pilih exchange type yang tepat —
directuntuk routing spesifik,fanoutuntuk broadcast,topicuntuk routing fleksibel dengan wildcard; salah pilih exchange type menyebabkan pesan tidak sampai tujuan.manual_ack: truedanprefetch(1)selalu — auto-ack bisa menyebabkan kehilangan pesan jika consumer crash; prefetch(1) memastikan distribusi beban yang adil antar consumer.- Pesan
persistent: trueuntuk durabilitas — tanpa ini, pesan hilang jika RabbitMQ restart; kombinasikan dengan queue yangdurable: true.- Publisher confirms untuk keandalan producer —
channel.confirm_selectdanchannel.wait_for_confirmsmemastikan RabbitMQ benar-benar menerima pesan sebelum lanjut.- Dead Letter Exchange untuk penanganan error — selalu konfigurasi DLX pada queue produksi; pesan yang gagal masuk DLX untuk analisis daripada hilang begitu saja.
- Jangan NACK requeue: true tanpa backoff — requeue langsung bisa menyebabkan consumer memproses pesan yang sama ribuan kali per detik; implementasikan exponential backoff.
- Satu channel per thread — Channel RabbitMQ tidak thread-safe; buat channel baru untuk setiap thread worker, jangan share channel antar thread.
automatically_recover: trueuntuk reconnect otomatis — koneksi ke RabbitMQ bisa terputus; Bunny bisa reconnect otomatis dan resubscribe queue secara transparan.- Sneakers untuk integrasi Rails — framework ini menangani worker lifecycle, prefork, dan integrasi ActiveRecord; lebih mudah dari mengelola Bunny secara manual di Rails.
- Monitor queue depth sebagai metrik utama — queue yang terus bertambah tanpa berkurang menandakan consumer kewalahan; scale out consumer atau optimasi processing time.