fix(plugin-import-export): pre-scan columns before streaming CSV export (#13009)
### What? Fixes an issue where only the fields from the first batch of documents were used to generate CSV column headers during streaming exports. ### Why? Previously, columns were determined during the first streaming batch. If a field appeared only in later documents, it was omitted from the CSV entirely — leading to incomplete exports when fields were sparsely populated across the dataset. ### How? - Adds a **pre-scan step** before streaming begins to collect all column keys across all pages - Uses this superset of keys to define the final CSV header - Ensures every row is padded to match the full column set This matches the behavior of non-streamed exports and guarantees that the streamed CSV output includes all relevant fields, regardless of when they appear in pagination.
This commit is contained in:
@@ -111,23 +111,45 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
|
|
||||||
if (download) {
|
if (download) {
|
||||||
if (debug) {
|
if (debug) {
|
||||||
req.payload.logger.info('Starting download stream')
|
req.payload.logger.info('Pre-scanning all columns before streaming')
|
||||||
|
}
|
||||||
|
|
||||||
|
const allColumnsSet = new Set<string>()
|
||||||
|
const allColumns: string[] = []
|
||||||
|
let scanPage = 1
|
||||||
|
let hasMore = true
|
||||||
|
|
||||||
|
while (hasMore) {
|
||||||
|
const result = await payload.find({ ...findArgs, page: scanPage })
|
||||||
|
|
||||||
|
result.docs.forEach((doc) => {
|
||||||
|
const flat = flattenObject({ doc, fields, toCSVFunctions })
|
||||||
|
Object.keys(flat).forEach((key) => {
|
||||||
|
if (!allColumnsSet.has(key)) {
|
||||||
|
allColumnsSet.add(key)
|
||||||
|
allColumns.push(key)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
hasMore = result.hasNextPage
|
||||||
|
scanPage += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if (debug) {
|
||||||
|
req.payload.logger.info(`Discovered ${allColumns.length} columns`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const encoder = new TextEncoder()
|
const encoder = new TextEncoder()
|
||||||
let isFirstBatch = true
|
let isFirstBatch = true
|
||||||
let columns: string[] | undefined
|
let streamPage = 1
|
||||||
let page = 1
|
|
||||||
|
|
||||||
const stream = new Readable({
|
const stream = new Readable({
|
||||||
async read() {
|
async read() {
|
||||||
const result = await payload.find({
|
const result = await payload.find({ ...findArgs, page: streamPage })
|
||||||
...findArgs,
|
|
||||||
page,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
req.payload.logger.info(`Processing batch ${page} with ${result.docs.length} documents`)
|
req.payload.logger.info(`Streaming batch ${streamPage} with ${result.docs.length} docs`)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.docs.length === 0) {
|
if (result.docs.length === 0) {
|
||||||
@@ -135,19 +157,24 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const csvInput = result.docs.map((doc) => flattenObject({ doc, fields, toCSVFunctions }))
|
const batchRows = result.docs.map((doc) => flattenObject({ doc, fields, toCSVFunctions }))
|
||||||
|
|
||||||
if (isFirstBatch) {
|
const paddedRows = batchRows.map((row) => {
|
||||||
columns = Object.keys(csvInput[0] ?? {})
|
const fullRow: Record<string, unknown> = {}
|
||||||
}
|
for (const col of allColumns) {
|
||||||
|
fullRow[col] = row[col] ?? ''
|
||||||
|
}
|
||||||
|
return fullRow
|
||||||
|
})
|
||||||
|
|
||||||
const csvString = stringify(csvInput, {
|
const csvString = stringify(paddedRows, {
|
||||||
header: isFirstBatch,
|
header: isFirstBatch,
|
||||||
columns,
|
columns: allColumns,
|
||||||
})
|
})
|
||||||
|
|
||||||
this.push(encoder.encode(csvString))
|
this.push(encoder.encode(csvString))
|
||||||
isFirstBatch = false
|
isFirstBatch = false
|
||||||
|
streamPage += 1
|
||||||
|
|
||||||
if (!result.hasNextPage) {
|
if (!result.hasNextPage) {
|
||||||
if (debug) {
|
if (debug) {
|
||||||
@@ -155,8 +182,6 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
}
|
}
|
||||||
this.push(null) // End the stream
|
this.push(null) // End the stream
|
||||||
}
|
}
|
||||||
|
|
||||||
page += 1
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -168,11 +193,15 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Non-download path (buffered export)
|
||||||
if (debug) {
|
if (debug) {
|
||||||
req.payload.logger.info('Starting file generation')
|
req.payload.logger.info('Starting file generation')
|
||||||
}
|
}
|
||||||
|
|
||||||
const outputData: string[] = []
|
const outputData: string[] = []
|
||||||
let isFirstBatch = true
|
const rows: Record<string, unknown>[] = []
|
||||||
|
const columnsSet = new Set<string>()
|
||||||
|
const columns: string[] = []
|
||||||
let page = 1
|
let page = 1
|
||||||
let hasNextPage = true
|
let hasNextPage = true
|
||||||
|
|
||||||
@@ -189,9 +218,19 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (isCSV) {
|
if (isCSV) {
|
||||||
const csvInput = result.docs.map((doc) => flattenObject({ doc, fields, toCSVFunctions }))
|
const batchRows = result.docs.map((doc) => flattenObject({ doc, fields, toCSVFunctions }))
|
||||||
outputData.push(stringify(csvInput, { header: isFirstBatch }))
|
|
||||||
isFirstBatch = false
|
// Track discovered column keys
|
||||||
|
batchRows.forEach((row) => {
|
||||||
|
Object.keys(row).forEach((key) => {
|
||||||
|
if (!columnsSet.has(key)) {
|
||||||
|
columnsSet.add(key)
|
||||||
|
columns.push(key)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
rows.push(...batchRows)
|
||||||
} else {
|
} else {
|
||||||
const jsonInput = result.docs.map((doc) => JSON.stringify(doc))
|
const jsonInput = result.docs.map((doc) => JSON.stringify(doc))
|
||||||
outputData.push(jsonInput.join(',\n'))
|
outputData.push(jsonInput.join(',\n'))
|
||||||
@@ -201,6 +240,23 @@ export const createExport = async (args: CreateExportArgs) => {
|
|||||||
page += 1
|
page += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isCSV) {
|
||||||
|
const paddedRows = rows.map((row) => {
|
||||||
|
const fullRow: Record<string, unknown> = {}
|
||||||
|
for (const col of columns) {
|
||||||
|
fullRow[col] = row[col] ?? ''
|
||||||
|
}
|
||||||
|
return fullRow
|
||||||
|
})
|
||||||
|
|
||||||
|
outputData.push(
|
||||||
|
stringify(paddedRows, {
|
||||||
|
header: true,
|
||||||
|
columns,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
const buffer = Buffer.from(format === 'json' ? `[${outputData.join(',')}]` : outputData.join(''))
|
const buffer = Buffer.from(format === 'json' ? `[${outputData.join(',')}]` : outputData.join(''))
|
||||||
if (debug) {
|
if (debug) {
|
||||||
req.payload.logger.info(`${format} file generation complete`)
|
req.payload.logger.info(`${format} file generation complete`)
|
||||||
|
|||||||
Reference in New Issue
Block a user