Google Pub/Sub

Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging asinkron fully-managed dari Google Cloud yang dirancang untuk skalabilitas global dan latensi rendah. Ia menggabungkan keunggulan Kafka (retensi pesan, replay dengan snapshot) dan SQS (fully-managed, tanpa infra) dalam satu layanan. Pub/Sub bisa menangani jutaan pesan per detik dengan delivery at-least-once, mendukung push subscription untuk serverless (Cloud Functions, Cloud Run), dan punya filter subscription untuk routing tanpa consumer group yang kompleks. Di Ruby, gem resmi google-cloud-pubsub menyediakan API yang bersih dan idiomatik dengan dukungan penuh untuk semua fitur Pub/Sub modern.

Konsep Dasar Google Pub/Sub #

flowchart LR
    P[Publisher\nAplikasi Ruby] -->|publish| T[Topic:\npesanan]

    T --> S1[Subscription:\npesanan-notifikasi\nPull]
    T --> S2[Subscription:\npesanan-analitik\nPull]
    T --> S3[Subscription:\npesanan-webhook\nPush → Cloud Run]

    S1 --> C1[Consumer\nLayanan Notifikasi]
    S2 --> C2[Consumer\nLayanan Analitik]
    S3 --> C3[Cloud Run\nEndpoint]

    T --> DLT[Dead Letter Topic:\npesanan-dlq]
Konsep utama Pub/Sub:
  Topic        — saluran pesan, publisher mengirim ke sini
  Subscription — consumer membuat subscription ke topic
                 Satu topic bisa punya banyak subscription
                 Setiap subscription menerima SEMUA pesan (bukan round-robin)
  Message      — unit pesan dengan data, attributes, dan message_id
  Ack          — konfirmasi pesan berhasil diproses (hapus dari subscription)
  Nack         — tolak pesan, kembalikan untuk delivery ulang

Perbedaan kunci dengan SQS:
  ✓ Setiap subscription menerima salinan SEMUA pesan (bukan dibagi)
  ✓ Snapshot untuk replay pesan (seperti Kafka offset)
  ✓ Filter di level subscription — tidak perlu routing di aplikasi
  ✓ Push subscription — Pub/Sub yang kirim HTTP request ke endpoint
  ✗ Tidak ada FIFO bawaan (gunakan ordering key per-partition)

Instalasi dan Setup #

gem install google-cloud-pubsub

# Atau di Gemfile
# Gemfile
gem 'google-cloud-pubsub', '~> 2.18'

Autentikasi #

require 'google/cloud/pubsub'

# Cara 1: Application Default Credentials (ADC) — otomatis dari environment
# Di lokal: jalankan `gcloud auth application-default login`
# Di GCE/GKE/Cloud Run: otomatis dari service account yang di-attach
pubsub = Google::Cloud::PubSub.new(project_id: "my-project-id")

# Cara 2: Service Account JSON file
pubsub = Google::Cloud::PubSub.new(
  project_id:  "my-project-id",
  credentials: "/path/to/service-account.json"
)

# Cara 3: Environment variable (paling umum di production)
# GOOGLE_CLOUD_PROJECT=my-project-id
# GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
pubsub = Google::Cloud::PubSub.new

# Cara 4: Hash credentials langsung (hindari di production)
pubsub = Google::Cloud::PubSub.new(
  project_id:  "my-project-id",
  credentials: {
    type:                 "service_account",
    project_id:           "my-project-id",
    private_key_id:       ENV["GCP_PRIVATE_KEY_ID"],
    private_key:          ENV["GCP_PRIVATE_KEY"],
    client_email:         ENV["GCP_CLIENT_EMAIL"],
    client_id:            ENV["GCP_CLIENT_ID"],
    auth_uri:             "https://accounts.google.com/o/oauth2/auth",
    token_uri:            "https://oauth2.googleapis.com/token"
  }
)

# Cara 5: Emulator untuk development lokal (tanpa koneksi ke GCP)
# Jalankan: gcloud beta emulators pubsub start
ENV["PUBSUB_EMULATOR_HOST"] = "localhost:8085"
pubsub = Google::Cloud::PubSub.new(project_id: "test-project")

Manajemen Topic dan Subscription #

pubsub = Google::Cloud::PubSub.new(project_id: "my-project")

# Buat topic
topic = pubsub.create_topic("pesanan")

# Atau ambil topic yang sudah ada
topic = pubsub.topic("pesanan")

# Buat topic dengan konfigurasi
topic = pubsub.create_topic(
  "transaksi",
  message_retention_duration: "604800s",  # retensi pesan 7 hari
  kms_key_name: "projects/my-project/locations/global/keyRings/my-ring/cryptoKeys/my-key"
)

# Daftar semua topic
pubsub.topics.each { |t| puts t.name }

# Hapus topic
topic.delete

# ===== Subscription =====

# Pull Subscription — consumer yang minta pesan secara aktif
sub = topic.subscribe(
  "pesanan-notifikasi",
  deadline:                      60,     # ack deadline dalam detik (max 600)
  retain_acked_messages:         false,  # true untuk replay capability
  message_retention_duration:    "604800s",  # 7 hari
  enable_message_ordering:       false   # true untuk ordered delivery
)

# Push Subscription — Pub/Sub kirim HTTP POST ke endpoint
sub_push = topic.subscribe(
  "pesanan-webhook",
  endpoint: "https://api.example.com/webhook/pubsub"
)

# Subscription dengan filter — hanya terima pesan dengan atribut tertentu
sub_filter = topic.subscribe(
  "pesanan-premium",
  filter: 'attributes.tipe_pelanggan = "premium"'
)

# Subscription dengan Dead Letter Topic
dlq_topic = pubsub.create_topic("pesanan-dlq")
sub_dengan_dlq = topic.subscribe(
  "pesanan-dengan-dlq",
  dead_letter_topic:              dlq_topic,
  max_delivery_attempts:         5   # setelah 5x gagal → DLQ
)

# Subscription dengan retry policy
sub_dengan_retry = topic.subscribe(
  "pesanan-retry",
  minimum_backoff: 10,   # minimal 10 detik sebelum retry
  maximum_backoff: 600   # maksimal 600 detik (10 menit)
)

# Ambil subscription yang sudah ada
sub = pubsub.subscription("pesanan-notifikasi")

# Daftar subscription
topic.subscriptions.each { |s| puts s.name }

Publisher — Mengirim Pesan #

topic = pubsub.topic("pesanan")

# Publish pesan sederhana
message_id = topic.publish(
  JSON.generate({
    event:      "pesanan_dibuat",
    pesanan_id: 12345,
    total:      150_000,
    timestamp:  Time.now.iso8601
  })
)
puts "Message ID: #{message_id}"

# Publish dengan attributes — bisa digunakan untuk filter subscription
message_id = topic.publish(
  JSON.generate({ pesanan_id: 12346, total: 250_000 }),
  "tipe_event"      => "pesanan_baru",
  "tipe_pelanggan"  => "premium",
  "versi_schema"    => "v2",
  "lingkungan"      => Rails.env
)

# Publish dengan ordering key — pesan dengan key yang sama dikirim berurutan
# (subscription harus enable_message_ordering: true)
topic.publish(
  JSON.generate({ pesanan_id: 100, status: "dibuat" }),
  ordering_key: "pengguna-#{pengguna_id}"
)

topic.publish(
  JSON.generate({ pesanan_id: 100, status: "dibayar" }),
  ordering_key: "pengguna-#{pengguna_id}"
  # Pesan ini dijamin dikirim SETELAH pesan di atas untuk key yang sama
)

Batch Publish — Efisiensi Throughput Tinggi #

# publish dengan blok — Pub/Sub akan batch otomatis
topic.publish do |batch|
  pesanan_list.each do |pesanan|
    batch.publish(
      JSON.generate({
        pesanan_id: pesanan.id,
        total:      pesanan.total,
        status:     pesanan.status
      }),
      "tipe_event" => "pesanan_sync"
    )
  end
end

# Async publisher — publish di background thread, tidak blocking
publisher = topic.async_publisher(
  max_bytes:    1_000_000,   # flush jika buffer mencapai 1MB
  max_messages: 100,         # flush jika buffer punya 100 pesan
  interval:     0.25         # flush setiap 250ms meski belum penuh
)

# Callback untuk konfirmasi atau error
publisher.on_error do |exception|
  Rails.logger.error "Error publish: #{exception.message}"
end

# Publish asinkron — langsung return tanpa tunggu konfirmasi
1000.times do |i|
  publisher.publish(
    JSON.generate({ seq: i, ts: Time.now.to_i }),
    "batch_id" => "batch-#{Time.now.to_i}"
  )
end

# Flush semua pesan yang masih di buffer
publisher.flush
publisher.stop.wait!   # tunggu semua selesai sebelum shutdown

Subscriber — Menerima Pesan #

Pull Subscriber — Streaming (Direkomendasikan) #

sub = pubsub.subscription("pesanan-notifikasi")

# Streaming pull — subscriber berjalan di background thread
# Jauh lebih efisien dari polling manual
subscriber = sub.listen(
  streams:           4,    # jumlah streaming connection paralel
  inventory:         1000, # jumlah pesan yang di-buffer
  threads: {
    callback: 8,   # thread untuk menjalankan callback
    push:     4    # thread untuk push ack/nack ke Pub/Sub
  }
) do |received_message|

  begin
    data = JSON.parse(received_message.data, symbolize_names: true)
    attrs = received_message.attributes
    puts "Pesan: #{received_message.message_id}"
    puts "Publish time: #{received_message.published_at}"
    puts "Delivery attempt: #{received_message.delivery_attempt}"
    puts "Data: #{data}"
    puts "Attrs: #{attrs}"

    # Proses pesan
    proses_pesanan(data)

    # ACK — konfirmasi berhasil diproses
    received_message.acknowledge!
    puts "ACK: #{received_message.message_id}"

  rescue JSON::ParserError => e
    Rails.logger.error "JSON tidak valid: #{e.message}"
    received_message.acknowledge!   # hapus pesan tidak valid agar tidak diulangi

  rescue => e
    Rails.logger.error "Error: #{e.message}"
    # NACK — tolak, kembalikan untuk delivery ulang
    received_message.nack!
    # Pub/Sub akan retry sesuai retry policy (dengan backoff)
  end
end

# Mulai subscriber
subscriber.start

# Graceful shutdown
trap("SIGTERM") do
  subscriber.stop
  subscriber.wait!
  puts "Subscriber berhenti"
end

# Blokir thread utama
subscriber.wait!

Pull Manual — Satu per Satu #

sub = pubsub.subscription("pesanan-analitik")

# Pull sejumlah pesan secara sinkron
loop do
  received_messages = sub.pull(
    max_messages:    10,     # maks pesan per pull
    return_immediately: false  # tunggu jika tidak ada pesan
  )

  received_messages.each do |pesan|
    begin
      data = JSON.parse(pesan.data, symbolize_names: true)
      proses_analitik(data)

      # ACK via subscription
      sub.acknowledge(pesan)

    rescue => e
      puts "Error: #{e.message}"
      sub.nack(pesan)
    end
  end

  sleep 1 if received_messages.empty?
end

Acknowledgment dan Nack #

# Tiga pilihan setelah menerima pesan:

# 1. ACK — berhasil diproses
received_message.acknowledge!
# atau: sub.acknowledge(received_message)

# 2. NACK — gagal, minta delivery ulang
received_message.nack!
# Pub/Sub akan retry sesuai backoff policy

# 3. Modify Ack Deadline — perpanjang waktu sebelum retry
# Berguna jika proses butuh waktu lebih dari ack deadline default
received_message.modify_ack_deadline!(120)   # tambah 120 detik lagi

# Perpanjang secara berkala dalam thread terpisah
Thread.new do
  loop do
    sleep 50   # perpanjang sebelum deadline 60 detik habis
    received_message.modify_ack_deadline!(60)
  end
end

# Proses yang butuh waktu lama
proses_berat(JSON.parse(received_message.data))
received_message.acknowledge!

Dead Letter Topic #

# Setup Dead Letter Topic
dlq_topic = pubsub.create_topic("pesanan-dlq")

# Buat subscription DLQ untuk memantau pesan gagal
dlq_sub = dlq_topic.subscribe("pesanan-dlq-monitor")

# Subscription utama dengan DLQ
sub = topic.subscribe(
  "pesanan-dengan-dlq",
  dead_letter_topic:     dlq_topic,
  max_delivery_attempts: 5   # setelah 5x → DLQ
)

# Monitor DLQ
dlq_subscriber = dlq_sub.listen do |pesan|
  attempts = pesan.delivery_attempt
  puts "DLQ: #{pesan.message_id} setelah #{attempts} percobaan"

  data = JSON.parse(pesan.data) rescue pesan.data

  # Log dan simpan untuk analisis
  PesananGagal.create!(
    message_id:       pesan.message_id,
    payload:          pesan.data,
    delivery_attempt: attempts,
    attributes:       pesan.attributes,
    gagal_pada:       Time.now
  )

  # Kirim notifikasi ke tim
  Monitoring.alert("Pesan di DLQ", message_id: pesan.message_id, data: data)

  pesan.acknowledge!
end

dlq_subscriber.start

Snapshot dan Replay #

Snapshot memungkinkan consumer membaca ulang pesan dari titik tertentu di masa lalu — mirip dengan Kafka offset:

sub = pubsub.subscription("pesanan-analitik")

# Buat snapshot — simpan posisi saat ini
snapshot = sub.create_snapshot("sebelum-migrasi-#{Time.now.strftime('%Y%m%d%H%M%S')}")
puts "Snapshot: #{snapshot.name}"
puts "Berlaku sampai: #{snapshot.expiration_time}"

# Jalankan proses migrasi atau eksperimen
# ...

# Jika ada masalah, rollback ke snapshot
sub.seek(snapshot)   # mundur ke posisi saat snapshot dibuat
puts "Seek ke snapshot berhasil"

# Atau seek ke waktu tertentu (replay pesan dari 2 jam lalu)
sub.seek(2.hours.ago)

# Daftar semua snapshot
pubsub.snapshots.each { |s| puts "#{s.name}: #{s.expiration_time}" }

# Hapus snapshot
snapshot.delete

Filter Subscription #

Filter subscription memungkinkan routing berdasarkan attribute pesan tanpa perlu kode routing di consumer:

# Topic utama yang menerima semua event
topic = pubsub.topic("semua_event")

# Subscription yang hanya menerima pesanan premium
sub_premium = topic.subscribe(
  "event-pesanan-premium",
  filter: 'attributes.tipe_pelanggan = "premium" AND attributes.tipe_event = "pesanan_baru"'
)

# Subscription yang hanya menerima pembayaran di atas 1 juta
sub_besar = topic.subscribe(
  "event-pembayaran-besar",
  filter: 'attributes.tipe_event = "pembayaran" AND attributes.jumlah > "1000000"'
)

# Subscription untuk semua error
sub_error = topic.subscribe(
  "event-error-semua",
  filter: 'attributes.severity = "ERROR" OR attributes.severity = "CRITICAL"'
)

# Publisher cukup publish ke satu topic dengan attributes
topic.publish(
  JSON.generate({ pesanan_id: 999, total: 5_000_000 }),
  "tipe_event"     => "pesanan_baru",
  "tipe_pelanggan" => "premium",
  "jumlah"         => "5000000"
)
# Hanya sub_premium dan sub_besar yang akan menerima pesan ini

Push Subscription — Serverless Receiver #

Push subscription membalik arah — Pub/Sub yang aktif mengirim HTTP POST ke endpoint kamu:

# Buat push subscription yang mengirim ke Cloud Run endpoint
sub = topic.subscribe(
  "pesanan-push",
  endpoint:              "https://api.example.com/pubsub/pesanan",
  authentication: {
    service_account_email: "[email protected]",
    audience:              "https://api.example.com"
  }
)

# Di sisi server Rails yang menerima push
# app/controllers/pubsub_controller.rb
class PubsubController < ApplicationController
  skip_before_action :verify_authenticity_token

  def pesanan
    # Pub/Sub mengirim base64-encoded data dalam format JSON
    envelope = JSON.parse(request.body.read)
    payload  = Base64.decode64(envelope["message"]["data"])
    data     = JSON.parse(payload, symbolize_names: true)
    attrs    = envelope["message"]["attributes"] || {}

    Rails.logger.info "Pub/Sub message: #{envelope['message']['messageId']}"

    proses_pesanan(data)

    # HTTP 200 = ACK. Non-2xx = NACK (Pub/Sub akan retry)
    head :ok

  rescue => e
    Rails.logger.error "Error: #{e.message}"
    head :internal_server_error   # trigger retry
  end
end

Integrasi dengan Rails #

# config/initializers/pubsub.rb
module Pubsub
  def self.client
    @client ||= Google::Cloud::PubSub.new(
      project_id: ENV.fetch("GOOGLE_CLOUD_PROJECT"),
      credentials: ENV["GOOGLE_APPLICATION_CREDENTIALS"]
    )
  end

  def self.topic(nama)
    @topics ||= {}
    @topics[nama] ||= client.topic(nama) || client.create_topic(nama)
  end

  def self.publish(nama_topic, data, **attributes)
    topic(nama_topic).publish(
      JSON.generate(data),
      **attributes.transform_keys(&:to_s),
      "lingkungan"    => Rails.env,
      "versi_schema"  => "v1",
      "timestamp"     => Time.now.iso8601
    )
  end
end

# Gunakan dari mana saja di Rails
class PesananService
  def self.buat(pengguna, params)
    pesanan = Pesanan.create!(pengguna: pengguna, **params)

    Pubsub.publish(
      "pesanan",
      { pesanan_id: pesanan.id, total: pesanan.total, status: pesanan.status },
      tipe_event:     "pesanan_dibuat",
      tipe_pelanggan: pengguna.tier
    )

    pesanan
  end
end

# app/workers/pubsub_worker.rb — worker untuk pull subscription
class PubsubWorker
  SUBSCRIPTION_NAME = ENV.fetch("PUBSUB_SUBSCRIPTION", "pesanan-worker")

  def self.jalankan
    sub = Pubsub.client.subscription(SUBSCRIPTION_NAME)

    subscriber = sub.listen(streams: 4, threads: { callback: 8 }) do |msg|
      begin
        data = JSON.parse(msg.data, symbolize_names: true)

        case data[:event]
        when "pesanan_dibuat"
          ProsesanPesanan.call(data)
        when "pembayaran_sukses"
          ProsesanPembayaran.call(data)
        else
          Rails.logger.warn "Event tidak dikenal: #{data[:event]}"
        end

        msg.acknowledge!

      rescue => e
        Rails.logger.error "PubsubWorker error: #{e.message}\n#{e.backtrace.first(5).join("\n")}"
        msg.nack!
      end
    end

    subscriber.start
    trap("SIGTERM") { subscriber.stop.wait! }
    subscriber.wait!
  end
end

Pub/Sub Lite — Untuk Throughput Ultra-Tinggi #

Pub/Sub Lite adalah varian yang lebih murah untuk throughput sangat tinggi, dengan trade-off: tidak globally replicated dan perlu manage kapasitas:

require 'google/cloud/pubsub/v1'

# Pub/Sub Lite menggunakan API berbeda
lite_client = Google::Cloud::PubSub::V1::AdminService::Client.new

# Buat Lite Topic dengan provisioned kapasitas
lite_topic = lite_client.create_topic(
  parent: "projects/my-project/locations/us-central1-a",
  topic_id: "transaksi-lite",
  topic: {
    partition_config: {
      count:    8,                    # 8 partisi
      capacity: {
        publish_mib_per_sec:   16,    # 16 MB/s publish capacity
        subscribe_mib_per_sec: 32     # 32 MB/s subscribe capacity
      }
    },
    retention_config: {
      per_partition_bytes: 30.gigabytes,
      period: { seconds: 604_800 }   # 7 hari
    }
  }
)

Monitoring dengan Google Cloud Monitoring #

# Metric penting Pub/Sub yang perlu dipantau:
# subscription/num_undelivered_messages  → jumlah pesan belum diproses
# subscription/oldest_unacked_message_age → usia pesan tertua yang belum di-ack
# topic/send_message_operation_count      → throughput publisher
# subscription/pull_message_operation_count → throughput subscriber

# Buat alert via gcloud CLI:
# gcloud alpha monitoring policies create \
#   --notification-channels=... \
#   --condition-filter='resource.type="pubsub_subscription"
#     AND metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"
#     AND metric.labels.subscription_id="pesanan-notifikasi"' \
#   --condition-threshold-value=1000 \
#   --condition-threshold-comparison=COMPARISON_GT \
#   --display-name="Pub/Sub pesanan-notifikasi lag tinggi"

Perbandingan Google Pub/Sub vs Amazon SQS #

AspekGoogle Pub/SubAmazon SQS
Model deliveryAt-least-onceAt-least-once (Standard) / Exactly-once (FIFO)
Fan-outNative — setiap subscription terima semuaPerlu SNS + SQS
OrderingDengan ordering key (per-partition)FIFO queue saja
ReplaySnapshot + seek ke waktu/snapshotTidak ada (SQS), retensi di Kafka
PushNative push subscriptionTidak ada (perlu Lambda trigger)
FilterFilter di level subscriptionTidak ada native filter
Retensi pesanMaks 7 hariMaks 14 hari
Global replicatedYa (default)Ya (dalam region)
Harga modelPer operasiPer request + data transfer
EkosistemGCP nativeAWS native

Ringkasan #

  • Setiap subscription menerima semua pesan — berbeda dari SQS yang membagi pesan antar consumer; di Pub/Sub, satu topic + dua subscription = setiap subscription menerima salinan semua pesan secara independen.
  • Streaming pull lebih efisien dari polling manualsub.listen { |msg| ... } mengelola koneksi, thread pool, dan batching secara otomatis; gunakan ini untuk consumer production.
  • ACK deadline harus realistis — set deadline lebih lama dari estimasi waktu proses; jika pesan sering timeout sebelum di-ack, Pub/Sub akan terus mengirim ulang dan menyebabkan duplikat.
  • Filter subscription untuk routing — daripada membuat logic routing di kode consumer, gunakan filter di level subscription; jauh lebih bersih dan tidak perlu deployment ulang saat menambah filter.
  • Snapshot untuk replay dan rollback — buat snapshot sebelum deployment besar atau migrasi data; jika ada masalah, bisa seek kembali ke snapshot untuk replay semua pesan yang sudah diproses.
  • Dead Letter Topic wajib untuk subscription produksi — konfigurasi max_delivery_attempts: 5 dan buat consumer untuk DLQ; pesan yang terus gagal tidak akan memblokir pesan lain.
  • Async publisher untuk throughput tinggitopic.async_publisher men-batch pesan di buffer dan mengirim secara asinkron; jauh lebih efisien dari publish sinkron satu per satu.
  • Push subscription untuk serverless — Cloud Run dan Cloud Functions bisa menerima pesan tanpa perlu keep-alive connection; Pub/Sub yang aktif mengirim HTTP POST saat ada pesan.
  • Ordering key untuk urutan terjamin — aktifkan enable_message_ordering: true pada subscription dan kirim dengan ordering_key yang sama untuk memastikan urutan per-key.
  • ADC (Application Default Credentials) untuk autentikasi — di Cloud Run/GKE gunakan service account yang di-attach tanpa credential file; di lokal gunakan gcloud auth application-default login.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact