perf(storage-s3): stream files and abort s3 request from static handler (#13430)
### What? Stream S3 object directly to response instead of creating a Buffer in memory and wire up an abort controller to stop streaming if user aborts download ### Why? To avoid excessive memory usage and to abort s3 download if user has aborted the request anyway. ### How? In node environment the AWS S3 always returns a Readable. The streamToBuffer method always required this, but the any type hided that this was actually needed. Now there is an explicit type check, but this should never trigger in a node server environment. Wire up and abort controller to the request so that we tell the S3 object to also stop streaming further if the user aborts. Fixes #10286 Maybe also helps on other issues with s3 and resource usage
This commit is contained in:
committed by
GitHub
parent
c67ceca8e2
commit
36fd6e905a
@@ -27,34 +27,34 @@ interface Args {
|
||||
signedDownloads?: SignedDownloadsConfig
|
||||
}
|
||||
|
||||
// Type guard for NodeJS.Readable streams
|
||||
const isNodeReadableStream = (body: unknown): body is Readable => {
|
||||
const isNodeReadableStream = (body: AWS.GetObjectOutput['Body']): body is Readable => {
|
||||
return (
|
||||
typeof body === 'object' &&
|
||||
body !== null &&
|
||||
'pipe' in body &&
|
||||
typeof (body as any).pipe === 'function' &&
|
||||
typeof body.pipe === 'function' &&
|
||||
'destroy' in body &&
|
||||
typeof (body as any).destroy === 'function'
|
||||
typeof body.destroy === 'function'
|
||||
)
|
||||
}
|
||||
|
||||
const destroyStream = (object: AWS.GetObjectOutput | undefined) => {
|
||||
const abortRequestAndDestroyStream = ({
|
||||
abortController,
|
||||
object,
|
||||
}: {
|
||||
abortController: AbortController
|
||||
object?: AWS.GetObjectOutput
|
||||
}) => {
|
||||
try {
|
||||
abortController.abort()
|
||||
} catch {
|
||||
/* noop */
|
||||
}
|
||||
if (object?.Body && isNodeReadableStream(object.Body)) {
|
||||
object.Body.destroy()
|
||||
}
|
||||
}
|
||||
|
||||
// Convert a stream into a promise that resolves with a Buffer
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const streamToBuffer = async (readableStream: any) => {
|
||||
const chunks = []
|
||||
for await (const chunk of readableStream) {
|
||||
chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk)
|
||||
}
|
||||
return Buffer.concat(chunks)
|
||||
}
|
||||
|
||||
export const getHandler = ({
|
||||
bucket,
|
||||
collection,
|
||||
@@ -63,6 +63,15 @@ export const getHandler = ({
|
||||
}: Args): StaticHandler => {
|
||||
return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => {
|
||||
let object: AWS.GetObjectOutput | undefined = undefined
|
||||
let streamed = false
|
||||
|
||||
const abortController = new AbortController()
|
||||
if (req.signal) {
|
||||
req.signal.addEventListener('abort', () => {
|
||||
abortRequestAndDestroyStream({ abortController, object })
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req })
|
||||
|
||||
@@ -89,10 +98,13 @@ export const getHandler = ({
|
||||
}
|
||||
}
|
||||
|
||||
object = await getStorageClient().getObject({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
})
|
||||
object = await getStorageClient().getObject(
|
||||
{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
},
|
||||
{ abortSignal: abortController.signal },
|
||||
)
|
||||
|
||||
if (!object.Body) {
|
||||
return new Response(null, { status: 404, statusText: 'Not Found' })
|
||||
@@ -130,25 +142,26 @@ export const getHandler = ({
|
||||
})
|
||||
}
|
||||
|
||||
// On error, manually destroy stream to close socket
|
||||
if (object.Body && isNodeReadableStream(object.Body)) {
|
||||
const stream = object.Body
|
||||
stream.on('error', (err) => {
|
||||
req.payload.logger.error({
|
||||
err,
|
||||
key,
|
||||
msg: 'Error streaming S3 object, destroying stream',
|
||||
})
|
||||
stream.destroy()
|
||||
if (!isNodeReadableStream(object.Body)) {
|
||||
req.payload.logger.error({
|
||||
key,
|
||||
msg: 'S3 object body is not a readable stream',
|
||||
})
|
||||
return new Response('Internal Server Error', { status: 500 })
|
||||
}
|
||||
|
||||
const bodyBuffer = await streamToBuffer(object.Body)
|
||||
|
||||
return new Response(bodyBuffer, {
|
||||
headers,
|
||||
status: 200,
|
||||
const stream = object.Body
|
||||
stream.on('error', (err) => {
|
||||
req.payload.logger.error({
|
||||
err,
|
||||
key,
|
||||
msg: 'Error while streaming S3 object (aborting)',
|
||||
})
|
||||
abortRequestAndDestroyStream({ abortController, object })
|
||||
})
|
||||
|
||||
streamed = true
|
||||
return new Response(stream, { headers, status: 200 })
|
||||
} catch (err) {
|
||||
if (err instanceof AWS.NoSuchKey) {
|
||||
return new Response(null, { status: 404, statusText: 'Not Found' })
|
||||
@@ -156,7 +169,9 @@ export const getHandler = ({
|
||||
req.payload.logger.error(err)
|
||||
return new Response('Internal Server Error', { status: 500 })
|
||||
} finally {
|
||||
destroyStream(object)
|
||||
if (!streamed) {
|
||||
abortRequestAndDestroyStream({ abortController, object })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user