Merge pull request #399 from mrsked/manage-ssh-connection-starts

Manage SSH connection starts
This commit is contained in:
Donal McBreen
2023-08-07 14:37:34 +01:00
committed by GitHub
13 changed files with 201 additions and 43 deletions

View File

@@ -143,7 +143,11 @@ class Mrsk::Commander
private
# Lazy setup of SSHKit
def configure_sshkit_with(config)
SSHKit::Backend::Netssh.configure { |ssh| ssh.ssh_options = config.ssh_options }
SSHKit::Backend::Netssh.pool.idle_timeout = config.sshkit.pool_idle_timeout
SSHKit::Backend::Netssh.configure do |sshkit|
sshkit.max_concurrent_starts = config.sshkit.max_concurrent_starts
sshkit.ssh_options = config.ssh.options
end
SSHKit.config.command_map[:docker] = "docker" # No need to use /usr/bin/env, just clogs up the logs
SSHKit.config.output_verbosity = verbosity
end

View File

@@ -13,12 +13,12 @@ module Mrsk::Commands
def run_over_ssh(*command, host:)
"ssh".tap do |cmd|
if config.ssh_proxy && config.ssh_proxy.is_a?(Net::SSH::Proxy::Jump)
cmd << " -J #{config.ssh_proxy.jump_proxies}"
elsif config.ssh_proxy && config.ssh_proxy.is_a?(Net::SSH::Proxy::Command)
cmd << " -o ProxyCommand='#{config.ssh_proxy.command_line_template}'"
if config.ssh.proxy && config.ssh.proxy.is_a?(Net::SSH::Proxy::Jump)
cmd << " -J #{config.ssh.proxy.jump_proxies}"
elsif config.ssh.proxy && config.ssh.proxy.is_a?(Net::SSH::Proxy::Command)
cmd << " -o ProxyCommand='#{config.ssh.proxy.command_line_template}'"
end
cmd << " -t #{config.ssh_user}@#{host} '#{command.join(" ")}'"
cmd << " -t #{config.ssh.user}@#{host} '#{command.join(" ")}'"
end
end

View File

@@ -135,25 +135,12 @@ class Mrsk::Configuration
end
def ssh_user
if raw_config.ssh.present?
raw_config.ssh["user"] || "root"
else
"root"
end
def ssh
Mrsk::Configuration::Ssh.new(config: self)
end
def ssh_proxy
if raw_config.ssh.present? && raw_config.ssh["proxy"]
Net::SSH::Proxy::Jump.new \
raw_config.ssh["proxy"].include?("@") ? raw_config.ssh["proxy"] : "root@#{raw_config.ssh["proxy"]}"
elsif raw_config.ssh.present? && raw_config.ssh["proxy_command"]
Net::SSH::Proxy::Command.new(raw_config.ssh["proxy_command"])
end
end
def ssh_options
{ user: ssh_user, proxy: ssh_proxy, auth_methods: [ "publickey" ] }.compact
def sshkit
Mrsk::Configuration::Sshkit.new(config: self)
end
@@ -185,7 +172,8 @@ class Mrsk::Configuration
service_with_version: service_with_version,
env_args: env_args,
volume_args: volume_args,
ssh_options: ssh_options,
ssh_options: ssh.options,
sshkit: sshkit.to_h,
builder: builder.to_h,
accessories: raw_config.accessories,
logging: logging_args,

View File

@@ -0,0 +1,24 @@
class Mrsk::Configuration::Ssh
def initialize(config:)
@config = config.raw_config.ssh || {}
end
def user
config.fetch("user", "root")
end
def proxy
if (proxy = config["proxy"])
Net::SSH::Proxy::Jump.new(proxy.include?("@") ? proxy : "root@#{proxy}")
elsif (proxy_command = config["proxy_command"])
Net::SSH::Proxy::Command.new(proxy_command)
end
end
def options
{ user: user, proxy: proxy, auth_methods: [ "publickey" ], keepalive: true, keepalive_interval: 30 }.compact
end
private
attr_accessor :config
end

View File

@@ -0,0 +1,20 @@
class Mrsk::Configuration::Sshkit
def initialize(config:)
@options = config.raw_config.sshkit || {}
end
def max_concurrent_starts
options.fetch("max_concurrent_starts", 30)
end
def pool_idle_timeout
options.fetch("pool_idle_timeout", 900)
end
def to_h
options
end
private
attr_accessor :options
end

View File

@@ -54,3 +54,51 @@ class SSHKit::Backend::Abstract
end
prepend CommandEnvMerge
end
class SSHKit::Backend::Netssh::Configuration
attr_accessor :max_concurrent_starts
end
class SSHKit::Backend::Netssh
module LimitConcurrentStartsClass
attr_reader :start_semaphore
def configure(&block)
super &block
# Create this here to avoid lazy creation by multiple threads
if config.max_concurrent_starts
@start_semaphore = Concurrent::Semaphore.new(config.max_concurrent_starts)
end
end
end
class << self
prepend LimitConcurrentStartsClass
end
module LimitConcurrentStartsInstance
private
def with_ssh(&block)
host.ssh_options = self.class.config.ssh_options.merge(host.ssh_options || {})
self.class.pool.with(
method(:start_with_concurrency_limit),
String(host.hostname),
host.username,
host.netssh_options,
&block
)
end
def start_with_concurrency_limit(*args)
if self.class.start_semaphore
self.class.start_semaphore.acquire do
Net::SSH.start(*args)
end
else
Net::SSH.start(*args)
end
end
end
prepend LimitConcurrentStartsInstance
end