refactor: connector socket getter

This commit is contained in:
Tyler Stewart
2017-08-14 16:44:18 -06:00
parent f3183314cc
commit d763820c86
5 changed files with 18 additions and 26 deletions

View File

@@ -13,13 +13,9 @@ module.exports = {
const simple = command.directive === 'NLST';
let dataSocket;
const path = command.arg || '.';
return this.connector.waitForConnection()
.then(socket => {
this.commandSocket.pause();
dataSocket = socket;
})
.tap(() => this.commandSocket.pause())
.then(() => when.try(this.fs.get.bind(this.fs), path))
.then(stat => stat.isDirectory() ? when.try(this.fs.list.bind(this.fs), path) : [stat])
.then(files => {
@@ -33,7 +29,7 @@ module.exports = {
return {
raw: true,
message,
socket: dataSocket
socket: this.connector.socket
};
});
return this.reply(150)

View File

@@ -1,7 +1,7 @@
module.exports = {
directive: 'QUIT',
handler: function () {
return this.close(221);
return this.close(221, 'Client called QUIT');
},
syntax: '{{cmd}}',
description: 'Disconnect',

View File

@@ -6,22 +6,18 @@ module.exports = {
if (!this.fs) return this.reply(550, 'File system not instantiated');
if (!this.fs.read) return this.reply(402, 'Not supported by file system');
let dataSocket;
return this.connector.waitForConnection()
.then(socket => {
this.commandSocket.pause();
dataSocket = socket;
})
.tap(() => this.commandSocket.pause())
.then(() => when.try(this.fs.read.bind(this.fs), command.arg, {start: this.restByteCount}))
.then(stream => {
this.restByteCount = 0;
return when.promise((resolve, reject) => {
dataSocket.on('error', err => stream.emit('error', err));
this.connector.socket.on('error', err => stream.emit('error', err));
stream.on('data', data => dataSocket.write(data, this.transferType));
stream.on('data', data => this.connector.socket.write(data, this.transferType));
stream.on('end', () => resolve(this.reply(226)));
stream.on('error', err => reject(err));
this.reply(150).then(() => dataSocket.resume());
this.reply(150).then(() => this.connector.socket.resume());
});
})
.catch(when.TimeoutError, err => {

View File

@@ -9,28 +9,24 @@ module.exports = {
const append = command.directive === 'APPE';
const fileName = command.arg;
let dataSocket;
return this.connector.waitForConnection()
.then(socket => {
this.commandSocket.pause();
dataSocket = socket;
})
.tap(() => this.commandSocket.pause())
.then(() => when.try(this.fs.write.bind(this.fs), fileName, {append, start: this.restByteCount}))
.then(stream => {
this.restByteCount = 0;
return when.promise((resolve, reject) => {
stream.once('error', err => dataSocket.emit('error', err));
stream.once('error', err => this.connector.socket.emit('error', err));
stream.once('finish', () => resolve(this.reply(226, fileName)));
// Emit `close` if stream has a close listener, otherwise emit `finish` with the end() method
// It is assumed that the `close` handler will call the end() method
dataSocket.once('end', () => stream.listenerCount('close') ? stream.emit('close') : stream.end());
dataSocket.once('error', err => reject(err));
dataSocket.on('data', data => stream.write(data, this.transferType));
this.connector.socket.once('end', () => stream.listenerCount('close') ? stream.emit('close') : stream.end());
this.connector.socket.once('error', err => reject(err));
this.connector.socket.on('data', data => stream.write(data, this.transferType));
this.reply(150).then(() => dataSocket.resume());
this.reply(150).then(() => this.connector.socket.resume());
})
.finally(() => when.try(stream.destroy.bind(stream)));
.finally(() => stream.destroy ? when.try(stream.destroy.bind(stream)) : null);
})
.catch(when.TimeoutError, err => {
log.error(err);

View File

@@ -14,6 +14,10 @@ class Connector {
return this.connection.log;
}
get socket() {
return this.dataSocket;
}
get server() {
return this.connection.server;
}