diff --git a/Gemfile.lock b/Gemfile.lock index 26a08348..2765ecd4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH mrsk (0.15.1) activesupport (>= 7.0) bcrypt_pbkdf (~> 1.0) + concurrent-ruby (~> 1.2) dotenv (~> 2.8) ed25519 (~> 1.2) net-ssh (~> 7.0) diff --git a/lib/mrsk/commander.rb b/lib/mrsk/commander.rb index bdfef962..7b3af31f 100644 --- a/lib/mrsk/commander.rb +++ b/lib/mrsk/commander.rb @@ -143,7 +143,10 @@ class Mrsk::Commander private # Lazy setup of SSHKit def configure_sshkit_with(config) - SSHKit::Backend::Netssh.configure { |ssh| ssh.ssh_options = config.ssh_options } + SSHKit::Backend::Netssh.configure do |sshkit| + sshkit.max_concurrent_starts = config.sshkit_max_concurrent_starts if config.sshkit_max_concurrent_starts + sshkit.ssh_options = config.ssh_options + end SSHKit.config.command_map[:docker] = "docker" # No need to use /usr/bin/env, just clogs up the logs SSHKit.config.output_verbosity = verbosity end diff --git a/lib/mrsk/configuration.rb b/lib/mrsk/configuration.rb index ee6dce05..df389158 100644 --- a/lib/mrsk/configuration.rb +++ b/lib/mrsk/configuration.rb @@ -157,6 +157,10 @@ class Mrsk::Configuration end + def sshkit_max_concurrent_starts + raw_config.sshkit["max_concurrent_starts"] if raw_config.sshkit.present? + end + def healthcheck { "path" => "/up", "port" => 3000, "max_attempts" => 7 }.merge(raw_config.healthcheck || {}) end diff --git a/lib/mrsk/sshkit_with_ext.rb b/lib/mrsk/sshkit_with_ext.rb index 6cd9c3e4..58b1f4e3 100644 --- a/lib/mrsk/sshkit_with_ext.rb +++ b/lib/mrsk/sshkit_with_ext.rb @@ -54,3 +54,51 @@ class SSHKit::Backend::Abstract end prepend CommandEnvMerge end + +class SSHKit::Backend::Netssh::Configuration + DEFAULT_MAX_CONCURRENT_STARTS = 30 + + attr_writer :max_concurrent_starts + + def max_concurrent_starts + @max_concurrent_starts ||= DEFAULT_MAX_CONCURRENT_STARTS + end +end + +class SSHKit::Backend::Netssh + module LimitConcurrentStartsClass + attr_reader :start_semaphore + + def configure(&block) + super &block + # Create this here to avoid lazy creation by multiple threads + @start_semaphore = Concurrent::Semaphore.new(config.max_concurrent_starts) + end + end + + class << self + prepend LimitConcurrentStartsClass + end + + module LimitConcurrentStartsInstance + private + def with_ssh(&block) + host.ssh_options = self.class.config.ssh_options.merge(host.ssh_options || {}) + self.class.pool.with( + method(:start_with_concurrency_limit), + String(host.hostname), + host.username, + host.netssh_options, + &block + ) + end + + def start_with_concurrency_limit(*args) + self.class.start_semaphore.acquire do + Net::SSH.start(*args) + end + end + end + + prepend LimitConcurrentStartsInstance +end diff --git a/mrsk.gemspec b/mrsk.gemspec index 24c40bcf..7a10c2be 100644 --- a/mrsk.gemspec +++ b/mrsk.gemspec @@ -20,6 +20,7 @@ Gem::Specification.new do |spec| spec.add_dependency "zeitwerk", "~> 2.5" spec.add_dependency "ed25519", "~> 1.2" spec.add_dependency "bcrypt_pbkdf", "~> 1.0" + spec.add_dependency "concurrent-ruby", "~> 1.2" spec.add_development_dependency "debug" spec.add_development_dependency "mocha" diff --git a/test/configuration_test.rb b/test/configuration_test.rb index 0f7e12ff..e635cd04 100644 --- a/test/configuration_test.rb +++ b/test/configuration_test.rb @@ -224,6 +224,11 @@ class ConfigurationTest < ActiveSupport::TestCase assert_equal "app@1.2.3.4", @config.ssh_options[:proxy].jump_proxies end + test "sshkit max concurrent starts" do + config = Mrsk::Configuration.new(@deploy.tap { |c| c.merge!(sshkit: { "max_concurrent_starts" => 50 }) }) + assert_equal 50, @config.sshkit_max_concurrent_starts + end + test "volume_args" do assert_equal ["--volume", "/local/path:/container/path"], @config.volume_args end