Add GCP Secret Manager adapter

This commit is contained in:
André Laszlo
2024-11-19 22:59:19 +01:00
parent 9cf8da64c4
commit 3793bdc2c3
3 changed files with 337 additions and 0 deletions

View File

@@ -3,6 +3,7 @@ module Kamal::Secrets::Adapters
def self.lookup(name)
name = "one_password" if name.downcase == "1password"
name = "last_pass" if name.downcase == "lastpass"
name = "gcp_secret_manager" if %w[gcp secret_manager].include? name.downcase
adapter_class(name)
end

View File

@@ -0,0 +1,125 @@
class Kamal::Secrets::Adapters::GcpSecretManager < Kamal::Secrets::Adapters::Base
private
def login(account)
# Since only the account option is passed from the cli, we'll use it for:
# - Account
# - GCP project
# - Service account impersonation
#
# Syntax:
# ACCOUNT: USER | USER "," DELEGATION_CHAIN
# USER: DEFAULT_USER | EMAIL
# DELEGATION_CHAIN: EMAIL | EMAIL "," DELEGATION_CHAIN
# EMAIL: <The email address of the user or service account, like "my-user@example.com" >
# DEFAULT_USER: "default"
#
# Some valid examples:
# - "my-user@example.com" sets the user
# - "my-user@example.com,my-service-user@example.com" will use my-user and enable service account impersonation as my-service-user
# - "default" will use the default user and no impersonation
# - "default,my-service-user@example.com" will use the default user, and enable service account impersonation as my-service-user
# - "default,my-service-user@example.com,another-service-user@example.com" same as above, but with an impersonation delegation chain
if !logged_in?
raise RuntimeError, "gcloud is not authenticated, please run `gcloud auth login`"
end
user, impersonate_service_account = parse_account(account)
{
user: user,
impersonate_service_account: impersonate_service_account
}
end
def fetch_secrets(secrets, account:, session:)
# puts("secrets spec: #{secrets.inspect}")
{}.tap do |results|
secrets_with_metadata(secrets).each do |secret, metadata|
project, secret_name, secret_version = metadata
item_name = project == "default" ? secret_name : "#{project}/#{secret_name}"
results[item_name] = fetch_secret(session, project, secret_name, secret_version)
raise RuntimeError, "Could not read #{item_name} from Google Secret Manager" unless $?.success?
end
end
end
def fetch_secret(session, project, secret_name, secret_version)
secret = run_command("secrets versions access #{secret_version} --secret=#{secret_name.shellescape}", session: session, project: project)
Base64.decode64(secret.dig("payload", "data"))
end
# The secret needs to at least contain a secret name, but project name, and secret version can also be specified.
#
# The string "default" can be used to refer to the default project configured for gcloud.
#
# The version can be either the string "latest", or a version number.
#
# The following formats are valid:
#
# - The following are all equivalent, and sets project: default, secret name: my-secret, version: latest
# - "my-secret"
# - "default/my-secret"
# - "default/my-secret/latest"
# - "my-secret/latest" in combination with --from=default
# - "my-secret/123" (only in combination with --from=some-project) -> project: some-project, secret name: my-secret, version: 123
# - "some-project/my-secret/123" -> project: some-project, secret name: my-secret, version: 123
def secrets_with_metadata(secrets)
{}.tap do |items|
secrets.each do |secret|
parts = secret.split("/")
parts.unshift("default") if parts.length == 1
project = parts.shift
secret_name = parts.shift
secret_version = parts.shift || "latest"
items[secret] = [ project, secret_name, secret_version ]
end
end
end
def run_command(command, session: nil, project: nil)
full_command = [ "gcloud", command ]
full_command << "--project=#{project}" unless project == "default"
full_command << "--account=#{session[:user]}" unless session[:user] == "default"
full_command << "--impersonate-service-account=#{session[:impersonate_service_account]}" if session[:impersonate_service_account]
full_command << "--format=json"
full_command = full_command.join(" ")
result = `#{full_command}`.strip
JSON.parse(result)
end
def check_dependencies!
raise RuntimeError, "gcloud CLI is not installed" unless cli_installed?
end
def cli_installed?
`gcloud --version 2> /dev/null`
$?.success?
end
def logged_in?
JSON.parse(`gcloud auth list --format=json`).any?
end
def parse_account(account)
return "default", nil if account == "default"
parts = account.split(",", 2)
if parts.length == 2
return parts.shift, parts.shift
elsif parts.length != 1
raise RuntimeError, "Invalid account, too many parts: #{account}"
elsif is_user?(account)
return account, nil
end
raise RuntimeError, "Invalid account, not a user: #{account}"
end
def is_user?(candidate)
candidate.include?("@")
end
end