Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan message queue fully-managed dari AWS yang tidak memerlukan instalasi atau pengelolaan infra apapun. Berbeda dari RabbitMQ atau Kafka yang harus di-setup dan di-maintain sendiri, SQS cukup dibuat melalui console AWS atau Terraform dan langsung siap digunakan — tidak ada broker server, tidak ada connection pool, tidak ada patching. SQS sangat cocok untuk aplikasi yang sudah berjalan di ekosistem AWS dan ingin decoupling antar komponen dengan cara yang paling sederhana. Di Ruby, AWS SDK resmi (aws-sdk-sqs) menyediakan client yang lengkap, dan Shoryuken memberikan framework worker yang mirip Sidekiq tapi untuk SQS.
Standard Queue vs FIFO Queue #
SQS punya dua jenis queue yang berbeda karakteristiknya:
Standard Queue:
✓ Throughput nyaris tak terbatas (unlimited TPS)
✓ At-least-once delivery — pesan dijamin terkirim, tapi bisa duplikat
✗ Ordering best-effort — urutan tidak dijamin
✓ Harga lebih murah
Cocok untuk: task yang toleran duplikat, high-volume processing
FIFO Queue (First-In-First-Out):
✓ Exactly-once processing — tidak ada duplikat
✓ Ordering dijamin — pesan diproses sesuai urutan masuk
✗ Throughput terbatas: 300 TPS (atau 3000 dengan batching)
✗ Harga lebih mahal
Cocok untuk: pemrosesan order, financial transaction, event sourcing
Nama file FIFO harus berakhir dengan ".fifo": "pesanan.fifo"
flowchart LR
P[Producer\nAplikasi Ruby] -->|send_message| SQS
subgraph AWS SQS
SQS[Queue: pesanan-baru]
DLQ[DLQ: pesanan-baru-dlq]
SQS -->|max_receive_count = 3| DLQ
end
SQS -->|receive_message| C1[Consumer 1\nEC2 / Lambda]
SQS -->|receive_message| C2[Consumer 2\nEC2 / Lambda]
SQS -->|receive_message| C3[Consumer 3\nEC2 / Lambda]
DLQ --> Alert[CloudWatch Alarm\n→ Notifikasi]Instalasi dan Konfigurasi #
gem install aws-sdk-sqs
# Atau di Gemfile
# Gemfile
gem 'aws-sdk-sqs', '~> 1.70' # SQS client
gem 'shoryuken', '~> 6.2' # worker framework (opsional)
Autentikasi AWS #
Ada beberapa cara mengautentikasi ke AWS — pilih yang paling sesuai dengan environment:
require 'aws-sdk-sqs'
# Cara 1: Environment variables (paling umum untuk development)
# AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
# AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# AWS_REGION=ap-southeast-1
sqs = Aws::SQS::Client.new # otomatis baca dari ENV
# Cara 2: Eksplisit di kode (hindari di production)
sqs = Aws::SQS::Client.new(
region: "ap-southeast-1",
access_key_id: ENV["AWS_ACCESS_KEY_ID"],
secret_access_key: ENV["AWS_SECRET_ACCESS_KEY"]
)
# Cara 3: IAM Role (terbaik untuk EC2, ECS, Lambda)
# Tidak perlu set credential sama sekali — AWS SDK otomatis ambil dari instance metadata
sqs = Aws::SQS::Client.new(region: "ap-southeast-1")
# Cara 4: AWS Profile dari ~/.aws/credentials
sqs = Aws::SQS::Client.new(
region: "ap-southeast-1",
profile: "production" # nama profile di ~/.aws/credentials
)
# Cara 5: LocalStack untuk development lokal (tanpa koneksi ke AWS sungguhan)
sqs = Aws::SQS::Client.new(
region: "us-east-1",
endpoint: "http://localhost:4566",
access_key_id: "test",
secret_access_key: "test"
)
Manajemen Queue #
# Buat Standard Queue
resp = sqs.create_queue(
queue_name: "pesanan-baru",
attributes: {
"VisibilityTimeout" => "30", # detik
"MessageRetentionPeriod" => "86400", # 1 hari
"ReceiveMessageWaitTimeSeconds" => "20", # long polling
"RedrivePolicy" => JSON.generate({
deadLetterTargetArn: "arn:aws:sqs:ap-southeast-1:123456789:pesanan-baru-dlq",
maxReceiveCount: 3 # setelah 3x gagal → DLQ
})
}
)
queue_url = resp.queue_url
# Buat FIFO Queue (nama harus berakhir .fifo)
sqs.create_queue(
queue_name: "pembayaran.fifo",
attributes: {
"FifoQueue" => "true",
"ContentBasedDeduplication" => "true", # deduplication otomatis berdasarkan konten
"VisibilityTimeout" => "60",
"MessageRetentionPeriod" => "86400",
"ReceiveMessageWaitTimeSeconds" => "20"
}
)
# Dapatkan URL queue yang sudah ada
resp = sqs.get_queue_url(queue_name: "pesanan-baru")
queue_url = resp.queue_url
# => "https://sqs.ap-southeast-1.amazonaws.com/123456789/pesanan-baru"
# Dapatkan atribut queue (termasuk jumlah pesan)
attrs = sqs.get_queue_attributes(
queue_url: queue_url,
attribute_names: ["All"]
).attributes
puts "Pesan menunggu: #{attrs['ApproximateNumberOfMessages']}"
puts "Pesan tidak terlihat: #{attrs['ApproximateNumberOfMessagesNotVisible']}"
puts "Pesan tertunda: #{attrs['ApproximateNumberOfMessagesDelayed']}"
# Hapus queue (hati-hati!)
sqs.delete_queue(queue_url: queue_url)
# Daftar semua queue
sqs.list_queues(queue_name_prefix: "pesanan").queue_urls.each { |url| puts url }
Mengirim Pesan #
queue_url = sqs.get_queue_url(queue_name: "pesanan-baru").queue_url
# Kirim pesan sederhana
sqs.send_message(
queue_url: queue_url,
message_body: JSON.generate({
event: "pesanan_dibuat",
pesanan_id: 12345,
total: 150_000,
timestamp: Time.now.iso8601
})
)
# Kirim pesan dengan opsi tambahan
resp = sqs.send_message(
queue_url: queue_url,
message_body: JSON.generate({ pesanan_id: 12346, total: 250_000 }),
delay_seconds: 5, # tunda pengiriman 5 detik
message_attributes: {
"tipe_event" => {
data_type: "String",
string_value: "pesanan_baru"
},
"prioritas" => {
data_type: "Number",
string_value: "10"
},
"versi_schema" => {
data_type: "String",
string_value: "v2"
}
}
)
puts "Message ID: #{resp.message_id}"
# Untuk FIFO Queue — wajib sertakan MessageGroupId dan MessageDeduplicationId
sqs.send_message(
queue_url: "https://sqs.ap-southeast-1.amazonaws.com/123/pembayaran.fifo",
message_body: JSON.generate({ pembayaran_id: 9999, jumlah: 500_000 }),
message_group_id: "pengguna-#{pengguna_id}", # semua pesan grup ini diproses berurutan
message_deduplication_id: SecureRandom.uuid # ID unik untuk mencegah duplikat
)
Batch Send — Kirim Banyak Sekaligus #
# Batch send — maksimal 10 pesan per batch, hemat biaya dan latency
entries = pesanan_list.each_with_index.map do |pesanan, i|
{
id: "msg-#{i}", # ID unik dalam batch (bukan Message ID SQS)
message_body: JSON.generate({
pesanan_id: pesanan.id,
total: pesanan.total,
timestamp: Time.now.iso8601
})
}
end
# Kirim dalam batch 10
entries.each_slice(10) do |batch|
resp = sqs.send_message_batch(
queue_url: queue_url,
entries: batch
)
# Cek pesan yang gagal
resp.failed.each do |failure|
puts "Gagal kirim #{failure.id}: #{failure.message} (#{failure.code})"
end
puts "Berhasil kirim #{resp.successful.length} pesan"
end
Menerima dan Memproses Pesan #
Long Polling — Cara yang Efisien #
# Long polling — tunggu sampai 20 detik untuk pesan datang
# Lebih hemat biaya dari short polling (tidak terus-menerus polling)
loop do
resp = sqs.receive_message(
queue_url: queue_url,
max_number_of_messages: 10, # maks 10 pesan per receive
wait_time_seconds: 20, # long polling: tunggu maks 20 detik
visibility_timeout: 30, # sembunyikan pesan 30 detik dari consumer lain
message_attribute_names: ["All"], # ambil semua message attributes
attribute_names: ["All"] # ambil semua system attributes
)
resp.messages.each do |pesan|
puts "Message ID: #{pesan.message_id}"
puts "Body: #{pesan.body}"
puts "Atribut: #{pesan.message_attributes}"
puts "Diterima ke-#{pesan.attributes['ApproximateReceiveCount']} kali"
begin
data = JSON.parse(pesan.body, symbolize_names: true)
proses_pesanan(data)
# Hapus pesan setelah berhasil diproses
sqs.delete_message(
queue_url: queue_url,
receipt_handle: pesan.receipt_handle
)
puts "Pesan #{pesan.message_id} berhasil diproses dan dihapus"
rescue JSON::ParserError => e
puts "JSON tidak valid: #{e.message}"
# Hapus pesan tidak valid — jangan biarkan di queue
sqs.delete_message(queue_url: queue_url, receipt_handle: pesan.receipt_handle)
rescue => e
puts "Error: #{e.message}"
# Jangan hapus — visibility timeout akan habis dan pesan kembali ke queue
# Setelah maxReceiveCount kali → masuk DLQ
end
end
end
Ubah Visibility Timeout Secara Dinamis #
# Jika proses butuh waktu lebih lama dari visibility timeout,
# perpanjang sebelum habis
pesan = sqs.receive_message(
queue_url: queue_url,
visibility_timeout: 30
).messages.first
Thread.new do
loop do
sleep 20 # perpanjang setiap 20 detik (sebelum 30 detik habis)
sqs.change_message_visibility(
queue_url: queue_url,
receipt_handle: pesan.receipt_handle,
visibility_timeout: 30 # tambah 30 detik lagi
)
end
end
# Proses yang butuh waktu lama
proses_berat(JSON.parse(pesan.body))
sqs.delete_message(queue_url: queue_url, receipt_handle: pesan.receipt_handle)
Batch Delete — Hapus Banyak Sekaligus #
# Batch delete — lebih efisien dari delete satu per satu
pesan_list = sqs.receive_message(
queue_url: queue_url,
max_number_of_messages: 10,
wait_time_seconds: 20
).messages
# Proses semua pesan
diproses = []
pesan_list.each do |pesan|
begin
proses_pesanan(JSON.parse(pesan.body))
diproses << { id: pesan.message_id, receipt_handle: pesan.receipt_handle }
rescue => e
puts "Gagal proses #{pesan.message_id}: #{e.message}"
# Jangan tambahkan ke daftar yang akan dihapus
end
end
# Hapus semua yang berhasil dalam satu batch
unless diproses.empty?
resp = sqs.delete_message_batch(
queue_url: queue_url,
entries: diproses
)
resp.failed.each do |f|
puts "Gagal hapus #{f.id}: #{f.message}"
end
end
Dead Letter Queue (DLQ) #
DLQ menampung pesan yang gagal diproses setelah maxReceiveCount kali:
# 1. Buat DLQ terlebih dahulu
dlq_resp = sqs.create_queue(queue_name: "pesanan-baru-dlq")
dlq_url = dlq_resp.queue_url
# Dapatkan ARN dari DLQ
dlq_arn = sqs.get_queue_attributes(
queue_url: dlq_url,
attribute_names: ["QueueArn"]
).attributes["QueueArn"]
# 2. Buat queue utama dengan konfigurasi DLQ
sqs.create_queue(
queue_name: "pesanan-baru",
attributes: {
"RedrivePolicy" => JSON.generate({
deadLetterTargetArn: dlq_arn,
maxReceiveCount: 3 # setelah 3x receive tanpa delete → DLQ
})
}
)
# 3. Pantau dan proses pesan di DLQ
def monitor_dlq(sqs, dlq_url)
loop do
resp = sqs.receive_message(
queue_url: dlq_url,
max_number_of_messages: 10,
wait_time_seconds: 20,
attribute_names: ["All"]
)
resp.messages.each do |pesan|
terima_count = pesan.attributes["ApproximateReceiveCount"]
pertama_kali = pesan.attributes["SentTimestamp"].to_i / 1000
puts "DLQ: #{pesan.message_id} (diterima #{terima_count}x)"
puts "Dikirim pada: #{Time.at(pertama_kali)}"
puts "Body: #{pesan.body}"
# Simpan ke database untuk analisis
PesananGagal.create!(
message_id: pesan.message_id,
payload: pesan.body,
receive_count: terima_count.to_i,
gagal_pada: Time.now
)
# Notifikasi tim
Monitoring.alert("Pesan di DLQ: #{pesan.message_id}")
# Hapus dari DLQ setelah diproses
sqs.delete_message(queue_url: dlq_url, receipt_handle: pesan.receipt_handle)
end
sleep 60 # cek DLQ setiap 1 menit
end
end
Integrasi dengan SNS untuk Fan-out #
Pola SNS → SQS sangat umum di AWS — satu publish ke SNS topic di-forward ke banyak SQS queue:
require 'aws-sdk-sns'
require 'aws-sdk-sqs'
sns = Aws::SNS::Client.new(region: "ap-southeast-1")
sqs = Aws::SQS::Client.new(region: "ap-southeast-1")
# 1. Buat SNS Topic
topic_arn = sns.create_topic(name: "pesanan_events").topic_arn
# 2. Buat beberapa SQS Queue
["pesanan_notifikasi", "pesanan_analitik", "pesanan_inventory"].each do |nama|
url = sqs.create_queue(queue_name: nama).queue_url
arn = sqs.get_queue_attributes(
queue_url: url, attribute_names: ["QueueArn"]
).attributes["QueueArn"]
# 3. Subscribe queue ke SNS topic
sns.subscribe(
topic_arn: topic_arn,
protocol: "sqs",
endpoint: arn
)
# 4. Set queue policy — izinkan SNS mengirim ke queue ini
sqs.set_queue_attributes(
queue_url: url,
attributes: {
"Policy" => JSON.generate({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: { Service: "sns.amazonaws.com" },
Action: "SQS:SendMessage",
Resource: arn,
Condition: { ArnEquals: { "aws:SourceArn" => topic_arn } }
}]
})
}
)
end
# 5. Publish ke SNS — otomatis di-forward ke semua queue yang subscribe
sns.publish(
topic_arn: topic_arn,
message: JSON.generate({
event: "pesanan_dibuat",
pesanan_id: 12345,
total: 150_000
}),
subject: "pesanan_dibuat",
message_attributes: {
"tipe_event" => { data_type: "String", string_value: "pesanan_dibuat" }
}
)
Shoryuken — Background Job untuk Rails #
Shoryuken adalah worker framework SQS yang terintegrasi dengan ActiveJob Rails:
# Gemfile
# gem 'shoryuken', '~> 6.2'
# config/shoryuken.yml
aws:
region: ap-southeast-1
# Kredensial dari IAM Role atau environment variables
concurrency: 25 # jumlah thread worker concurrent
delay: 0 # delay default antara poll
queues:
- [pesanan_baru, 6] # prioritas tinggi (weight 6)
- [pembayaran, 4] # prioritas sedang
- [notifikasi, 2] # prioritas rendah
- [laporan, 1]
polling_strategy: WeightedRoundRobin # distribusi berdasarkan weight
# app/workers/pesanan_shoryuken_worker.rb
class PesananShoryukenWorker
include Shoryuken::Worker
shoryuken_options(
queue: "pesanan_baru",
auto_delete: true, # hapus otomatis setelah sukses
body_parser: :json # parse body sebagai JSON otomatis
)
def perform(sqs_msg, body)
pesanan_id = body["pesanan_id"]
Rails.logger.info "Memproses pesanan #{pesanan_id}"
# body sudah di-parse sebagai Hash (karena body_parser: :json)
pesanan = Pesanan.find(pesanan_id)
pesanan.update!(status: :diproses)
NotifikasiMailer.pesanan_dikonfirmasi(pesanan).deliver_later
rescue ActiveRecord::RecordNotFound => e
Rails.logger.error "Pesanan tidak ditemukan: #{e.message}"
# auto_delete: true → pesan tetap dihapus meski ada error
# Untuk retry: raise error agar pesan tidak terhapus dan kembali ke queue
end
end
# Enqueue pesan ke SQS
PesananShoryukenWorker.perform_async(
JSON.generate({ pesanan_id: 123, total: 150_000 })
)
# Atau dengan delay
PesananShoryukenWorker.perform_in(5.minutes, JSON.generate({ pesanan_id: 456 }))
# Integrasi dengan ActiveJob (tanpa Shoryuken worker class)
# config/application.rb
config.active_job.queue_adapter = :shoryuken
# app/jobs/kirim_notifikasi_job.rb
class KirimNotifikasiJob < ApplicationJob
queue_as :notifikasi
def perform(user_id, pesan)
user = User.find(user_id)
NotifikasiService.kirim(user, pesan)
end
end
# Enqueue
KirimNotifikasiJob.perform_later(user.id, "Pesanan dikonfirmasi!")
KirimNotifikasiJob.set(wait: 5.minutes).perform_later(user.id, "Reminder")
# Jalankan Shoryuken worker
bundle exec shoryuken -r ./config/environment.rb -C config/shoryuken.yml
# Untuk Rails
bundle exec shoryuken -r ./config/environment.rb -C config/shoryuken.yml -R
Pola At-Least-Once vs Exactly-Once #
SQS Standard tidak menjamin exactly-once — pesan bisa diterima lebih dari sekali:
# AT-LEAST-ONCE — Standard Queue
# Pesan bisa duplikat, pastikan processing idempoten
class ProsesanPembayaran
def proses(pesan_id, data)
# ANTI-PATTERN: tidak idempoten — bisa charge 2x
kartu.charge(data[:jumlah])
Pembayaran.create!(amount: data[:jumlah])
# BENAR: idempoten — cek apakah sudah diproses
return if Pembayaran.exists?(message_id: pesan_id)
ActiveRecord::Base.transaction do
kartu.charge(data[:jumlah])
Pembayaran.create!(
message_id: pesan_id, # simpan SQS Message ID untuk idempotency
amount: data[:jumlah],
status: "sukses"
)
end
end
end
# EXACTLY-ONCE — FIFO Queue
# Gunakan ContentBasedDeduplication atau MessageDeduplicationId
sqs.send_message(
queue_url: "https://sqs.../pembayaran.fifo",
message_body: JSON.generate({ pembayaran_id: 999 }),
message_group_id: "pembayaran-#{pengguna_id}",
message_deduplication_id: "pembayaran-#{pembayaran_id}"
# SQS akan tolak pesan dengan deduplication_id yang sama dalam 5 menit
)
Monitoring dengan CloudWatch #
require 'aws-sdk-cloudwatch'
cloudwatch = Aws::CloudWatch::Client.new(region: "ap-southeast-1")
# Buat alarm untuk depth queue yang tinggi
cloudwatch.put_metric_alarm(
alarm_name: "sqs-pesanan-baru-depth-tinggi",
alarm_description: "Queue pesanan-baru memiliki terlalu banyak pesan",
actions_enabled: true,
alarm_actions: ["arn:aws:sns:ap-southeast-1:123:tim-ops"],
metric_name: "ApproximateNumberOfMessagesVisible",
namespace: "AWS/SQS",
statistic: "Average",
dimensions: [{
name: "QueueName",
value: "pesanan-baru"
}],
period: 300, # evaluasi setiap 5 menit
evaluation_periods: 1,
threshold: 1000.0, # alert jika > 1000 pesan
comparison_operator: "GreaterThanThreshold",
treat_missing_data: "notBreaching"
)
# Buat alarm untuk DLQ
cloudwatch.put_metric_alarm(
alarm_name: "sqs-pesanan-dlq-tidak-kosong",
alarm_description: "Ada pesan di DLQ pesanan!",
actions_enabled: true,
alarm_actions: ["arn:aws:sns:ap-southeast-1:123:tim-ops"],
metric_name: "ApproximateNumberOfMessagesVisible",
namespace: "AWS/SQS",
statistic: "Sum",
dimensions: [{ name: "QueueName", value: "pesanan-baru-dlq" }],
period: 60,
evaluation_periods: 1,
threshold: 0,
comparison_operator: "GreaterThanThreshold",
treat_missing_data: "notBreaching"
)
Keamanan — IAM Policy yang Tepat #
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ProducerPermissions",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-southeast-1:123456789:pesanan-*"
},
{
"Sid": "ConsumerPermissions",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-southeast-1:123456789:pesanan-*"
}
]
}
# Best practice keamanan SQS:
# 1. Gunakan IAM Role, bukan Access Key untuk EC2/ECS/Lambda
# 2. Principle of least privilege — producer tidak perlu receive, consumer tidak perlu create
# 3. Enkripsi sisi server dengan KMS untuk data sensitif
# 4. VPC Endpoint agar traffic tidak keluar internet
# 5. Audit dengan CloudTrail
# Buat queue dengan enkripsi KMS
sqs.create_queue(
queue_name: "pembayaran-terenkripsi",
attributes: {
"KmsMasterKeyId" => "arn:aws:kms:ap-southeast-1:123:key/xxx",
"KmsDataKeyReusePeriodSeconds" => "300"
}
)
Ringkasan #
- Standard Queue untuk throughput tinggi, FIFO untuk ordering — Standard toleran duplikat tapi hampir unlimited throughput; FIFO tidak duplikat tapi dibatasi 300 TPS (3000 dengan batching).
- Long polling hemat biaya — set
wait_time_seconds: 20untuk mengurangi jumlah request poll yang kosong; tanpa ini biaya SQS bisa naik drastis untuk queue sepi.- Visibility timeout harus lebih lama dari proses — jika proses butuh 60 detik tapi timeout 30 detik, pesan akan muncul kembali dan diproses dua kali; set timeout 2-3x estimasi waktu proses.
- Selalu hapus pesan setelah berhasil — SQS tidak hapus otomatis setelah di-receive; panggil
delete_messagesetelah sukses, atau biarkan visibility timeout habis untuk retry.- DLQ wajib untuk queue produksi — konfigurasi
RedrivePolicydenganmaxReceiveCount: 3; pesan yang terus gagal tidak akan memblokir queue dan bisa dianalisis di DLQ.- Processing harus idempoten untuk Standard Queue — simpan
message_idSQS ke database dan cek sebelum proses untuk mencegah efek duplikat yang tidak diinginkan.- Batch operations menghemat biaya dan latency —
send_message_batchdandelete_message_batchmengirim/menghapus hingga 10 pesan dalam satu API call.- IAM Role, bukan Access Key untuk production — EC2, ECS, dan Lambda bisa menggunakan IAM Role secara otomatis tanpa perlu menyimpan credential di kode atau environment.
- SNS → SQS untuk fan-out — publish sekali ke SNS, otomatis terkirim ke banyak queue SQS; ideal untuk event yang perlu diproses oleh beberapa sistem berbeda.
- CloudWatch alarm untuk DLQ — buat alarm yang trigger saat DLQ tidak kosong; pesan di DLQ selalu berarti ada bug atau data tidak valid yang butuh perhatian segera.