Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging asinkron fully-managed dari Google Cloud yang dirancang untuk skalabilitas global dan latensi rendah. Ia menggabungkan keunggulan Kafka (retensi pesan, replay dengan snapshot) dan SQS (fully-managed, tanpa infra) dalam satu layanan. Pub/Sub bisa menangani jutaan pesan per detik dengan delivery at-least-once, mendukung push subscription untuk serverless (Cloud Functions, Cloud Run), dan punya filter subscription untuk routing tanpa consumer group yang kompleks. Di Ruby, gem resmi google-cloud-pubsub menyediakan API yang bersih dan idiomatik dengan dukungan penuh untuk semua fitur Pub/Sub modern.
Konsep Dasar Google Pub/Sub #
flowchart LR
P[Publisher\nAplikasi Ruby] -->|publish| T[Topic:\npesanan]
T --> S1[Subscription:\npesanan-notifikasi\nPull]
T --> S2[Subscription:\npesanan-analitik\nPull]
T --> S3[Subscription:\npesanan-webhook\nPush → Cloud Run]
S1 --> C1[Consumer\nLayanan Notifikasi]
S2 --> C2[Consumer\nLayanan Analitik]
S3 --> C3[Cloud Run\nEndpoint]
T --> DLT[Dead Letter Topic:\npesanan-dlq]Konsep utama Pub/Sub:
Topic — saluran pesan, publisher mengirim ke sini
Subscription — consumer membuat subscription ke topic
Satu topic bisa punya banyak subscription
Setiap subscription menerima SEMUA pesan (bukan round-robin)
Message — unit pesan dengan data, attributes, dan message_id
Ack — konfirmasi pesan berhasil diproses (hapus dari subscription)
Nack — tolak pesan, kembalikan untuk delivery ulang
Perbedaan kunci dengan SQS:
✓ Setiap subscription menerima salinan SEMUA pesan (bukan dibagi)
✓ Snapshot untuk replay pesan (seperti Kafka offset)
✓ Filter di level subscription — tidak perlu routing di aplikasi
✓ Push subscription — Pub/Sub yang kirim HTTP request ke endpoint
✗ Tidak ada FIFO bawaan (gunakan ordering key per-partition)
Instalasi dan Setup #
gem install google-cloud-pubsub
# Atau di Gemfile
# Gemfile
gem 'google-cloud-pubsub', '~> 2.18'
Autentikasi #
require 'google/cloud/pubsub'
# Cara 1: Application Default Credentials (ADC) — otomatis dari environment
# Di lokal: jalankan `gcloud auth application-default login`
# Di GCE/GKE/Cloud Run: otomatis dari service account yang di-attach
pubsub = Google::Cloud::PubSub.new(project_id: "my-project-id")
# Cara 2: Service Account JSON file
pubsub = Google::Cloud::PubSub.new(
project_id: "my-project-id",
credentials: "/path/to/service-account.json"
)
# Cara 3: Environment variable (paling umum di production)
# GOOGLE_CLOUD_PROJECT=my-project-id
# GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
pubsub = Google::Cloud::PubSub.new
# Cara 4: Hash credentials langsung (hindari di production)
pubsub = Google::Cloud::PubSub.new(
project_id: "my-project-id",
credentials: {
type: "service_account",
project_id: "my-project-id",
private_key_id: ENV["GCP_PRIVATE_KEY_ID"],
private_key: ENV["GCP_PRIVATE_KEY"],
client_email: ENV["GCP_CLIENT_EMAIL"],
client_id: ENV["GCP_CLIENT_ID"],
auth_uri: "https://accounts.google.com/o/oauth2/auth",
token_uri: "https://oauth2.googleapis.com/token"
}
)
# Cara 5: Emulator untuk development lokal (tanpa koneksi ke GCP)
# Jalankan: gcloud beta emulators pubsub start
ENV["PUBSUB_EMULATOR_HOST"] = "localhost:8085"
pubsub = Google::Cloud::PubSub.new(project_id: "test-project")
Manajemen Topic dan Subscription #
pubsub = Google::Cloud::PubSub.new(project_id: "my-project")
# Buat topic
topic = pubsub.create_topic("pesanan")
# Atau ambil topic yang sudah ada
topic = pubsub.topic("pesanan")
# Buat topic dengan konfigurasi
topic = pubsub.create_topic(
"transaksi",
message_retention_duration: "604800s", # retensi pesan 7 hari
kms_key_name: "projects/my-project/locations/global/keyRings/my-ring/cryptoKeys/my-key"
)
# Daftar semua topic
pubsub.topics.each { |t| puts t.name }
# Hapus topic
topic.delete
# ===== Subscription =====
# Pull Subscription — consumer yang minta pesan secara aktif
sub = topic.subscribe(
"pesanan-notifikasi",
deadline: 60, # ack deadline dalam detik (max 600)
retain_acked_messages: false, # true untuk replay capability
message_retention_duration: "604800s", # 7 hari
enable_message_ordering: false # true untuk ordered delivery
)
# Push Subscription — Pub/Sub kirim HTTP POST ke endpoint
sub_push = topic.subscribe(
"pesanan-webhook",
endpoint: "https://api.example.com/webhook/pubsub"
)
# Subscription dengan filter — hanya terima pesan dengan atribut tertentu
sub_filter = topic.subscribe(
"pesanan-premium",
filter: 'attributes.tipe_pelanggan = "premium"'
)
# Subscription dengan Dead Letter Topic
dlq_topic = pubsub.create_topic("pesanan-dlq")
sub_dengan_dlq = topic.subscribe(
"pesanan-dengan-dlq",
dead_letter_topic: dlq_topic,
max_delivery_attempts: 5 # setelah 5x gagal → DLQ
)
# Subscription dengan retry policy
sub_dengan_retry = topic.subscribe(
"pesanan-retry",
minimum_backoff: 10, # minimal 10 detik sebelum retry
maximum_backoff: 600 # maksimal 600 detik (10 menit)
)
# Ambil subscription yang sudah ada
sub = pubsub.subscription("pesanan-notifikasi")
# Daftar subscription
topic.subscriptions.each { |s| puts s.name }
Publisher — Mengirim Pesan #
topic = pubsub.topic("pesanan")
# Publish pesan sederhana
message_id = topic.publish(
JSON.generate({
event: "pesanan_dibuat",
pesanan_id: 12345,
total: 150_000,
timestamp: Time.now.iso8601
})
)
puts "Message ID: #{message_id}"
# Publish dengan attributes — bisa digunakan untuk filter subscription
message_id = topic.publish(
JSON.generate({ pesanan_id: 12346, total: 250_000 }),
"tipe_event" => "pesanan_baru",
"tipe_pelanggan" => "premium",
"versi_schema" => "v2",
"lingkungan" => Rails.env
)
# Publish dengan ordering key — pesan dengan key yang sama dikirim berurutan
# (subscription harus enable_message_ordering: true)
topic.publish(
JSON.generate({ pesanan_id: 100, status: "dibuat" }),
ordering_key: "pengguna-#{pengguna_id}"
)
topic.publish(
JSON.generate({ pesanan_id: 100, status: "dibayar" }),
ordering_key: "pengguna-#{pengguna_id}"
# Pesan ini dijamin dikirim SETELAH pesan di atas untuk key yang sama
)
Batch Publish — Efisiensi Throughput Tinggi #
# publish dengan blok — Pub/Sub akan batch otomatis
topic.publish do |batch|
pesanan_list.each do |pesanan|
batch.publish(
JSON.generate({
pesanan_id: pesanan.id,
total: pesanan.total,
status: pesanan.status
}),
"tipe_event" => "pesanan_sync"
)
end
end
# Async publisher — publish di background thread, tidak blocking
publisher = topic.async_publisher(
max_bytes: 1_000_000, # flush jika buffer mencapai 1MB
max_messages: 100, # flush jika buffer punya 100 pesan
interval: 0.25 # flush setiap 250ms meski belum penuh
)
# Callback untuk konfirmasi atau error
publisher.on_error do |exception|
Rails.logger.error "Error publish: #{exception.message}"
end
# Publish asinkron — langsung return tanpa tunggu konfirmasi
1000.times do |i|
publisher.publish(
JSON.generate({ seq: i, ts: Time.now.to_i }),
"batch_id" => "batch-#{Time.now.to_i}"
)
end
# Flush semua pesan yang masih di buffer
publisher.flush
publisher.stop.wait! # tunggu semua selesai sebelum shutdown
Subscriber — Menerima Pesan #
Pull Subscriber — Streaming (Direkomendasikan) #
sub = pubsub.subscription("pesanan-notifikasi")
# Streaming pull — subscriber berjalan di background thread
# Jauh lebih efisien dari polling manual
subscriber = sub.listen(
streams: 4, # jumlah streaming connection paralel
inventory: 1000, # jumlah pesan yang di-buffer
threads: {
callback: 8, # thread untuk menjalankan callback
push: 4 # thread untuk push ack/nack ke Pub/Sub
}
) do |received_message|
begin
data = JSON.parse(received_message.data, symbolize_names: true)
attrs = received_message.attributes
puts "Pesan: #{received_message.message_id}"
puts "Publish time: #{received_message.published_at}"
puts "Delivery attempt: #{received_message.delivery_attempt}"
puts "Data: #{data}"
puts "Attrs: #{attrs}"
# Proses pesan
proses_pesanan(data)
# ACK — konfirmasi berhasil diproses
received_message.acknowledge!
puts "ACK: #{received_message.message_id}"
rescue JSON::ParserError => e
Rails.logger.error "JSON tidak valid: #{e.message}"
received_message.acknowledge! # hapus pesan tidak valid agar tidak diulangi
rescue => e
Rails.logger.error "Error: #{e.message}"
# NACK — tolak, kembalikan untuk delivery ulang
received_message.nack!
# Pub/Sub akan retry sesuai retry policy (dengan backoff)
end
end
# Mulai subscriber
subscriber.start
# Graceful shutdown
trap("SIGTERM") do
subscriber.stop
subscriber.wait!
puts "Subscriber berhenti"
end
# Blokir thread utama
subscriber.wait!
Pull Manual — Satu per Satu #
sub = pubsub.subscription("pesanan-analitik")
# Pull sejumlah pesan secara sinkron
loop do
received_messages = sub.pull(
max_messages: 10, # maks pesan per pull
return_immediately: false # tunggu jika tidak ada pesan
)
received_messages.each do |pesan|
begin
data = JSON.parse(pesan.data, symbolize_names: true)
proses_analitik(data)
# ACK via subscription
sub.acknowledge(pesan)
rescue => e
puts "Error: #{e.message}"
sub.nack(pesan)
end
end
sleep 1 if received_messages.empty?
end
Acknowledgment dan Nack #
# Tiga pilihan setelah menerima pesan:
# 1. ACK — berhasil diproses
received_message.acknowledge!
# atau: sub.acknowledge(received_message)
# 2. NACK — gagal, minta delivery ulang
received_message.nack!
# Pub/Sub akan retry sesuai backoff policy
# 3. Modify Ack Deadline — perpanjang waktu sebelum retry
# Berguna jika proses butuh waktu lebih dari ack deadline default
received_message.modify_ack_deadline!(120) # tambah 120 detik lagi
# Perpanjang secara berkala dalam thread terpisah
Thread.new do
loop do
sleep 50 # perpanjang sebelum deadline 60 detik habis
received_message.modify_ack_deadline!(60)
end
end
# Proses yang butuh waktu lama
proses_berat(JSON.parse(received_message.data))
received_message.acknowledge!
Dead Letter Topic #
# Setup Dead Letter Topic
dlq_topic = pubsub.create_topic("pesanan-dlq")
# Buat subscription DLQ untuk memantau pesan gagal
dlq_sub = dlq_topic.subscribe("pesanan-dlq-monitor")
# Subscription utama dengan DLQ
sub = topic.subscribe(
"pesanan-dengan-dlq",
dead_letter_topic: dlq_topic,
max_delivery_attempts: 5 # setelah 5x → DLQ
)
# Monitor DLQ
dlq_subscriber = dlq_sub.listen do |pesan|
attempts = pesan.delivery_attempt
puts "DLQ: #{pesan.message_id} setelah #{attempts} percobaan"
data = JSON.parse(pesan.data) rescue pesan.data
# Log dan simpan untuk analisis
PesananGagal.create!(
message_id: pesan.message_id,
payload: pesan.data,
delivery_attempt: attempts,
attributes: pesan.attributes,
gagal_pada: Time.now
)
# Kirim notifikasi ke tim
Monitoring.alert("Pesan di DLQ", message_id: pesan.message_id, data: data)
pesan.acknowledge!
end
dlq_subscriber.start
Snapshot dan Replay #
Snapshot memungkinkan consumer membaca ulang pesan dari titik tertentu di masa lalu — mirip dengan Kafka offset:
sub = pubsub.subscription("pesanan-analitik")
# Buat snapshot — simpan posisi saat ini
snapshot = sub.create_snapshot("sebelum-migrasi-#{Time.now.strftime('%Y%m%d%H%M%S')}")
puts "Snapshot: #{snapshot.name}"
puts "Berlaku sampai: #{snapshot.expiration_time}"
# Jalankan proses migrasi atau eksperimen
# ...
# Jika ada masalah, rollback ke snapshot
sub.seek(snapshot) # mundur ke posisi saat snapshot dibuat
puts "Seek ke snapshot berhasil"
# Atau seek ke waktu tertentu (replay pesan dari 2 jam lalu)
sub.seek(2.hours.ago)
# Daftar semua snapshot
pubsub.snapshots.each { |s| puts "#{s.name}: #{s.expiration_time}" }
# Hapus snapshot
snapshot.delete
Filter Subscription #
Filter subscription memungkinkan routing berdasarkan attribute pesan tanpa perlu kode routing di consumer:
# Topic utama yang menerima semua event
topic = pubsub.topic("semua_event")
# Subscription yang hanya menerima pesanan premium
sub_premium = topic.subscribe(
"event-pesanan-premium",
filter: 'attributes.tipe_pelanggan = "premium" AND attributes.tipe_event = "pesanan_baru"'
)
# Subscription yang hanya menerima pembayaran di atas 1 juta
sub_besar = topic.subscribe(
"event-pembayaran-besar",
filter: 'attributes.tipe_event = "pembayaran" AND attributes.jumlah > "1000000"'
)
# Subscription untuk semua error
sub_error = topic.subscribe(
"event-error-semua",
filter: 'attributes.severity = "ERROR" OR attributes.severity = "CRITICAL"'
)
# Publisher cukup publish ke satu topic dengan attributes
topic.publish(
JSON.generate({ pesanan_id: 999, total: 5_000_000 }),
"tipe_event" => "pesanan_baru",
"tipe_pelanggan" => "premium",
"jumlah" => "5000000"
)
# Hanya sub_premium dan sub_besar yang akan menerima pesan ini
Push Subscription — Serverless Receiver #
Push subscription membalik arah — Pub/Sub yang aktif mengirim HTTP POST ke endpoint kamu:
# Buat push subscription yang mengirim ke Cloud Run endpoint
sub = topic.subscribe(
"pesanan-push",
endpoint: "https://api.example.com/pubsub/pesanan",
authentication: {
service_account_email: "[email protected]",
audience: "https://api.example.com"
}
)
# Di sisi server Rails yang menerima push
# app/controllers/pubsub_controller.rb
class PubsubController < ApplicationController
skip_before_action :verify_authenticity_token
def pesanan
# Pub/Sub mengirim base64-encoded data dalam format JSON
envelope = JSON.parse(request.body.read)
payload = Base64.decode64(envelope["message"]["data"])
data = JSON.parse(payload, symbolize_names: true)
attrs = envelope["message"]["attributes"] || {}
Rails.logger.info "Pub/Sub message: #{envelope['message']['messageId']}"
proses_pesanan(data)
# HTTP 200 = ACK. Non-2xx = NACK (Pub/Sub akan retry)
head :ok
rescue => e
Rails.logger.error "Error: #{e.message}"
head :internal_server_error # trigger retry
end
end
Integrasi dengan Rails #
# config/initializers/pubsub.rb
module Pubsub
def self.client
@client ||= Google::Cloud::PubSub.new(
project_id: ENV.fetch("GOOGLE_CLOUD_PROJECT"),
credentials: ENV["GOOGLE_APPLICATION_CREDENTIALS"]
)
end
def self.topic(nama)
@topics ||= {}
@topics[nama] ||= client.topic(nama) || client.create_topic(nama)
end
def self.publish(nama_topic, data, **attributes)
topic(nama_topic).publish(
JSON.generate(data),
**attributes.transform_keys(&:to_s),
"lingkungan" => Rails.env,
"versi_schema" => "v1",
"timestamp" => Time.now.iso8601
)
end
end
# Gunakan dari mana saja di Rails
class PesananService
def self.buat(pengguna, params)
pesanan = Pesanan.create!(pengguna: pengguna, **params)
Pubsub.publish(
"pesanan",
{ pesanan_id: pesanan.id, total: pesanan.total, status: pesanan.status },
tipe_event: "pesanan_dibuat",
tipe_pelanggan: pengguna.tier
)
pesanan
end
end
# app/workers/pubsub_worker.rb — worker untuk pull subscription
class PubsubWorker
SUBSCRIPTION_NAME = ENV.fetch("PUBSUB_SUBSCRIPTION", "pesanan-worker")
def self.jalankan
sub = Pubsub.client.subscription(SUBSCRIPTION_NAME)
subscriber = sub.listen(streams: 4, threads: { callback: 8 }) do |msg|
begin
data = JSON.parse(msg.data, symbolize_names: true)
case data[:event]
when "pesanan_dibuat"
ProsesanPesanan.call(data)
when "pembayaran_sukses"
ProsesanPembayaran.call(data)
else
Rails.logger.warn "Event tidak dikenal: #{data[:event]}"
end
msg.acknowledge!
rescue => e
Rails.logger.error "PubsubWorker error: #{e.message}\n#{e.backtrace.first(5).join("\n")}"
msg.nack!
end
end
subscriber.start
trap("SIGTERM") { subscriber.stop.wait! }
subscriber.wait!
end
end
Pub/Sub Lite — Untuk Throughput Ultra-Tinggi #
Pub/Sub Lite adalah varian yang lebih murah untuk throughput sangat tinggi, dengan trade-off: tidak globally replicated dan perlu manage kapasitas:
require 'google/cloud/pubsub/v1'
# Pub/Sub Lite menggunakan API berbeda
lite_client = Google::Cloud::PubSub::V1::AdminService::Client.new
# Buat Lite Topic dengan provisioned kapasitas
lite_topic = lite_client.create_topic(
parent: "projects/my-project/locations/us-central1-a",
topic_id: "transaksi-lite",
topic: {
partition_config: {
count: 8, # 8 partisi
capacity: {
publish_mib_per_sec: 16, # 16 MB/s publish capacity
subscribe_mib_per_sec: 32 # 32 MB/s subscribe capacity
}
},
retention_config: {
per_partition_bytes: 30.gigabytes,
period: { seconds: 604_800 } # 7 hari
}
}
)
Monitoring dengan Google Cloud Monitoring #
# Metric penting Pub/Sub yang perlu dipantau:
# subscription/num_undelivered_messages → jumlah pesan belum diproses
# subscription/oldest_unacked_message_age → usia pesan tertua yang belum di-ack
# topic/send_message_operation_count → throughput publisher
# subscription/pull_message_operation_count → throughput subscriber
# Buat alert via gcloud CLI:
# gcloud alpha monitoring policies create \
# --notification-channels=... \
# --condition-filter='resource.type="pubsub_subscription"
# AND metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"
# AND metric.labels.subscription_id="pesanan-notifikasi"' \
# --condition-threshold-value=1000 \
# --condition-threshold-comparison=COMPARISON_GT \
# --display-name="Pub/Sub pesanan-notifikasi lag tinggi"
Perbandingan Google Pub/Sub vs Amazon SQS #
| Aspek | Google Pub/Sub | Amazon SQS |
|---|---|---|
| Model delivery | At-least-once | At-least-once (Standard) / Exactly-once (FIFO) |
| Fan-out | Native — setiap subscription terima semua | Perlu SNS + SQS |
| Ordering | Dengan ordering key (per-partition) | FIFO queue saja |
| Replay | Snapshot + seek ke waktu/snapshot | Tidak ada (SQS), retensi di Kafka |
| Push | Native push subscription | Tidak ada (perlu Lambda trigger) |
| Filter | Filter di level subscription | Tidak ada native filter |
| Retensi pesan | Maks 7 hari | Maks 14 hari |
| Global replicated | Ya (default) | Ya (dalam region) |
| Harga model | Per operasi | Per request + data transfer |
| Ekosistem | GCP native | AWS native |
Ringkasan #
- Setiap subscription menerima semua pesan — berbeda dari SQS yang membagi pesan antar consumer; di Pub/Sub, satu topic + dua subscription = setiap subscription menerima salinan semua pesan secara independen.
- Streaming pull lebih efisien dari polling manual —
sub.listen { |msg| ... }mengelola koneksi, thread pool, dan batching secara otomatis; gunakan ini untuk consumer production.- ACK deadline harus realistis — set deadline lebih lama dari estimasi waktu proses; jika pesan sering timeout sebelum di-ack, Pub/Sub akan terus mengirim ulang dan menyebabkan duplikat.
- Filter subscription untuk routing — daripada membuat logic routing di kode consumer, gunakan filter di level subscription; jauh lebih bersih dan tidak perlu deployment ulang saat menambah filter.
- Snapshot untuk replay dan rollback — buat snapshot sebelum deployment besar atau migrasi data; jika ada masalah, bisa seek kembali ke snapshot untuk replay semua pesan yang sudah diproses.
- Dead Letter Topic wajib untuk subscription produksi — konfigurasi
max_delivery_attempts: 5dan buat consumer untuk DLQ; pesan yang terus gagal tidak akan memblokir pesan lain.- Async publisher untuk throughput tinggi —
topic.async_publishermen-batch pesan di buffer dan mengirim secara asinkron; jauh lebih efisien dari publish sinkron satu per satu.- Push subscription untuk serverless — Cloud Run dan Cloud Functions bisa menerima pesan tanpa perlu keep-alive connection; Pub/Sub yang aktif mengirim HTTP POST saat ada pesan.
- Ordering key untuk urutan terjamin — aktifkan
enable_message_ordering: truepada subscription dan kirim denganordering_keyyang sama untuk memastikan urutan per-key.- ADC (Application Default Credentials) untuk autentikasi — di Cloud Run/GKE gunakan service account yang di-attach tanpa credential file; di lokal gunakan
gcloud auth application-default login.