fix: graphql query concurrency issues (#6925)

## Description

This is the beta (v3) PR for the v2 PR
[here](https://github.com/payloadcms/payload/pull/6857)

Addresses #6800, #5108

- [x] I have read and understand the
[CONTRIBUTING.md](https://github.com/payloadcms/payload/blob/main/CONTRIBUTING.md)
document in this repository.

## Type of change

- [x] Bug fix (non-breaking change which fixes an issue)

## Checklist:

- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] Existing test suite passes locally with my changes
This commit is contained in:
Patrik
2024-07-08 11:55:04 -04:00
committed by GitHub
parent cd9df738c1
commit fb72d19d6c
49 changed files with 209 additions and 79 deletions

View File

@@ -12,7 +12,7 @@ export const count: Count = async function count(
{ collection, locale, req = {} as PayloadRequest, where },
) {
const Model = this.collections[collection]
const options: QueryOptions = withSession(this, req.transactionID)
const options: QueryOptions = await withSession(this, req)
let hasNearConstraint = false

View File

@@ -10,7 +10,7 @@ export const create: Create = async function create(
{ collection, data, req = {} as PayloadRequest },
) {
const Model = this.collections[collection]
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
let doc
try {
;[doc] = await Model.create([data], options)

View File

@@ -14,7 +14,7 @@ export const createGlobal: CreateGlobal = async function createGlobal(
globalType: slug,
...data,
}
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
let [result] = (await Model.create([global], options)) as any

View File

@@ -9,7 +9,7 @@ export const createGlobalVersion: CreateGlobalVersion = async function createGlo
{ autosave, createdAt, globalSlug, parent, req = {} as PayloadRequest, updatedAt, versionData },
) {
const VersionModel = this.versions[globalSlug]
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
const [doc] = await VersionModel.create(
[

View File

@@ -17,7 +17,7 @@ export const createVersion: CreateVersion = async function createVersion(
},
) {
const VersionModel = this.versions[collectionSlug]
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
const [doc] = await VersionModel.create(
[

View File

@@ -10,7 +10,7 @@ export const deleteMany: DeleteMany = async function deleteMany(
) {
const Model = this.collections[collection]
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
}

View File

@@ -10,7 +10,7 @@ export const deleteOne: DeleteOne = async function deleteOne(
{ collection, req = {} as PayloadRequest, where },
) {
const Model = this.collections[collection]
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
const query = await Model.buildQuery({
payload: this.payload,

View File

@@ -10,7 +10,7 @@ export const deleteVersions: DeleteVersions = async function deleteVersions(
) {
const VersionsModel = this.versions[collection]
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
}

View File

@@ -15,7 +15,7 @@ export const find: Find = async function find(
) {
const Model = this.collections[collection]
const collectionConfig = this.payload.collections[collection].config
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
let hasNearConstraint = false

View File

@@ -13,7 +13,7 @@ export const findGlobal: FindGlobal = async function findGlobal(
) {
const Model = this.globals
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
}

View File

@@ -28,7 +28,7 @@ export const findGlobalVersions: FindGlobalVersions = async function findGlobalV
this.payload.globals.config.find(({ slug }) => slug === global),
)
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
limit,
skip,
}

View File

@@ -12,7 +12,7 @@ export const findOne: FindOne = async function findOne(
) {
const Model = this.collections[collection]
const options: MongooseQueryOptions = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
}

View File

@@ -26,7 +26,7 @@ export const findVersions: FindVersions = async function findVersions(
const Model = this.versions[collection]
const collectionConfig = this.payload.collections[collection].config
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
limit,
skip,
}

View File

@@ -15,7 +15,7 @@ export const queryDrafts: QueryDrafts = async function queryDrafts(
) {
const VersionModel = this.versions[collection]
const collectionConfig = this.payload.collections[collection].config
const options = withSession(this, req.transactionID)
const options = await withSession(this, req)
let hasNearConstraint
let sort

View File

@@ -1,6 +1,8 @@
import type { CommitTransaction } from 'payload'
export const commitTransaction: CommitTransaction = async function commitTransaction(id) {
if (id instanceof Promise) return
if (!this.sessions[id]?.inTransaction()) {
return
}

View File

@@ -1,27 +1,35 @@
import type { RollbackTransaction } from 'payload'
export const rollbackTransaction: RollbackTransaction = async function rollbackTransaction(
id = '',
incomingID = '',
) {
let transactionID: number | string
if (incomingID instanceof Promise) {
transactionID = await incomingID
} else {
transactionID = incomingID
}
// if multiple operations are using the same transaction, the first will flow through and delete the session.
// subsequent calls should be ignored.
if (!this.sessions[id]) {
if (!this.sessions[transactionID]) {
return
}
// when session exists but is not inTransaction something unexpected is happening to the session
if (!this.sessions[id].inTransaction()) {
if (!this.sessions[transactionID].inTransaction()) {
this.payload.logger.warn('rollbackTransaction called when no transaction exists')
delete this.sessions[id]
delete this.sessions[transactionID]
return
}
// the first call for rollback should be aborted and deleted causing any other operations with the same transaction to fail
try {
await this.sessions[id].abortTransaction()
await this.sessions[id].endSession()
await this.sessions[transactionID].abortTransaction()
await this.sessions[transactionID].endSession()
} catch (error) {
// ignore the error as it is likely a race condition from multiple errors
}
delete this.sessions[id]
delete this.sessions[transactionID]
}

View File

@@ -11,7 +11,7 @@ export const updateGlobal: UpdateGlobal = async function updateGlobal(
) {
const Model = this.globals
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
new: true,
}

View File

@@ -18,7 +18,7 @@ export async function updateGlobalVersion<T extends TypeWithID>(
const VersionModel = this.versions[global]
const whereToUse = where || { id: { equals: id } }
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
new: true,
}

View File

@@ -13,7 +13,7 @@ export const updateOne: UpdateOne = async function updateOne(
const where = id ? { id: { equals: id } } : whereArg
const Model = this.collections[collection]
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
new: true,
}

View File

@@ -11,7 +11,7 @@ export const updateVersion: UpdateVersion = async function updateVersion(
const VersionModel = this.versions[collection]
const whereToUse = where || { id: { equals: id } }
const options = {
...withSession(this, req.transactionID),
...(await withSession(this, req)),
lean: true,
new: true,
}

View File

@@ -1,4 +1,5 @@
import type { ClientSession } from 'mongoose'
import type { PayloadRequest } from 'payload'
import type { MongooseAdapter } from './index.js'
@@ -6,9 +7,15 @@ import type { MongooseAdapter } from './index.js'
* returns the session belonging to the transaction of the req.session if exists
* @returns ClientSession
*/
export function withSession(
export async function withSession(
db: MongooseAdapter,
transactionID?: number | string,
): { session: ClientSession } | object {
return db.sessions[transactionID] ? { session: db.sessions[transactionID] } : {}
req: PayloadRequest,
): Promise<{ session: ClientSession } | object> {
let transactionID = req.transactionID
if (transactionID instanceof Promise) {
transactionID = await req.transactionID
}
if (req) return db.sessions[transactionID] ? { session: db.sessions[transactionID] } : {}
}

View File

@@ -17,7 +17,7 @@ export const count: Count = async function count(
const tableName = this.tableNameMap.get(toSnakeCase(collectionConfig.slug))
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const table = this.tables[tableName]
const { joins, where } = await buildQuery({

View File

@@ -10,7 +10,7 @@ export const create: Create = async function create(
this: PostgresAdapter,
{ collection: collectionSlug, data, req },
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collection = this.payload.collections[collectionSlug].config
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))

View File

@@ -10,7 +10,7 @@ export async function createGlobal<T extends Record<string, unknown>>(
this: PostgresAdapter,
{ slug, data, req = {} as PayloadRequest }: CreateGlobalArgs,
): Promise<T> {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const globalConfig = this.payload.globals.config.find((config) => config.slug === slug)
const tableName = this.tableNameMap.get(toSnakeCase(globalConfig.slug))

View File

@@ -1,7 +1,7 @@
import type { PayloadRequest, TypeWithID, TypeWithVersion } from 'payload'
import type { CreateGlobalVersionArgs, PayloadRequest, TypeWithID, TypeWithVersion } from 'payload'
import { sql } from 'drizzle-orm'
import { type CreateGlobalVersionArgs, buildVersionGlobalFields } from 'payload'
import { buildVersionGlobalFields } from 'payload'
import toSnakeCase from 'to-snake-case'
import type { PostgresAdapter } from './types.js'
@@ -12,7 +12,7 @@ export async function createGlobalVersion<T extends TypeWithID>(
this: PostgresAdapter,
{ autosave, globalSlug, req = {} as PayloadRequest, versionData }: CreateGlobalVersionArgs,
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const global = this.payload.globals.config.find(({ slug }) => slug === globalSlug)
const tableName = this.tableNameMap.get(`_${toSnakeCase(global.slug)}${this.versionsSuffix}`)

View File

@@ -18,7 +18,7 @@ export async function createVersion<T extends TypeWithID>(
versionData,
}: CreateVersionArgs<T>,
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collection = this.payload.collections[collectionSlug].config
const defaultTableName = toSnakeCase(collection.slug)

View File

@@ -11,7 +11,7 @@ export const deleteMany: DeleteMany = async function deleteMany(
this: PostgresAdapter,
{ collection, req = {} as PayloadRequest, where },
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collectionConfig = this.payload.collections[collection].config
const tableName = this.tableNameMap.get(toSnakeCase(collectionConfig.slug))

View File

@@ -14,7 +14,7 @@ export const deleteOne: DeleteOne = async function deleteOne(
this: PostgresAdapter,
{ collection: collectionSlug, req = {} as PayloadRequest, where: whereArg },
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collection = this.payload.collections[collectionSlug].config
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))

View File

@@ -12,7 +12,7 @@ export const deleteVersions: DeleteVersions = async function deleteVersion(
this: PostgresAdapter,
{ collection, locale, req = {} as PayloadRequest, where: where },
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collectionConfig: SanitizedCollectionConfig = this.payload.collections[collection].config
const tableName = this.tableNameMap.get(

View File

@@ -30,7 +30,7 @@ export const findMany = async function find({
tableName,
where: whereArg,
}: Args) {
const db = adapter.sessions[req.transactionID]?.db || adapter.drizzle
const db = adapter.sessions[await req.transactionID]?.db || adapter.drizzle
const table = adapter.tables[tableName]
const limit = limitArg ?? 10

View File

@@ -38,7 +38,7 @@ type Args = {
*/
export const migratePostgresV2toV3 = async ({ debug, payload, req }: Args) => {
const adapter = payload.db as PostgresAdapter
const db = adapter.sessions[req.transactionID]?.db
const db = adapter.sessions[await req.transactionID]?.db
const dir = payload.db.migrationDir
// get the drizzle migrateUpSQL from drizzle using the last schema

View File

@@ -1,6 +1,6 @@
import type { PayloadRequest, SanitizedCollectionConfig } from 'payload'
import type { PayloadRequest, QueryDrafts, SanitizedCollectionConfig } from 'payload'
import { type QueryDrafts, buildVersionCollectionFields, combineQueries } from 'payload'
import { buildVersionCollectionFields, combineQueries } from 'payload'
import toSnakeCase from 'to-snake-case'
import type { PostgresAdapter } from './types.js'

View File

@@ -1,6 +1,8 @@
import type { CommitTransaction } from 'payload'
export const commitTransaction: CommitTransaction = async function commitTransaction(id) {
if (id instanceof Promise) return
// if the session was deleted it has already been aborted
if (!this.sessions[id]) {
return

View File

@@ -1,17 +1,19 @@
import type { RollbackTransaction } from 'payload'
export const rollbackTransaction: RollbackTransaction = async function rollbackTransaction(
id = '',
incomingID = '',
) {
const transactionID = incomingID instanceof Promise ? await incomingID : incomingID
// if multiple operations are using the same transaction, the first will flow through and delete the session.
// subsequent calls should be ignored.
if (!this.sessions[id]) {
if (!this.sessions[transactionID]) {
return
}
// end the session promise in failure by calling reject
await this.sessions[id].reject()
await this.sessions[transactionID].reject()
// delete the session causing any other operations with the same transaction to fail
delete this.sessions[id]
delete this.sessions[transactionID]
}

View File

@@ -12,7 +12,7 @@ export const updateOne: UpdateOne = async function updateOne(
this: PostgresAdapter,
{ id, collection: collectionSlug, data, draft, locale, req, where: whereArg },
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collection = this.payload.collections[collectionSlug].config
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))
const whereToUse = whereArg || { id: { equals: id } }

View File

@@ -10,7 +10,7 @@ export async function updateGlobal<T extends Record<string, unknown>>(
this: PostgresAdapter,
{ slug, data, req = {} as PayloadRequest }: UpdateGlobalArgs,
): Promise<T> {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const globalConfig = this.payload.globals.config.find((config) => config.slug === slug)
const tableName = this.tableNameMap.get(toSnakeCase(globalConfig.slug))

View File

@@ -25,7 +25,7 @@ export async function updateGlobalVersion<T extends TypeWithID>(
where: whereArg,
}: UpdateGlobalVersionArgs<T>,
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const globalConfig: SanitizedGlobalConfig = this.payload.globals.config.find(
({ slug }) => slug === global,
)

View File

@@ -25,7 +25,7 @@ export async function updateVersion<T extends TypeWithID>(
where: whereArg,
}: UpdateVersionArgs<T>,
) {
const db = this.sessions[req.transactionID]?.db || this.drizzle
const db = this.sessions[await req.transactionID]?.db || this.drizzle
const collectionConfig: SanitizedCollectionConfig = this.payload.collections[collection].config
const whereToUse = whereArg || { id: { equals: id } }
const tableName = this.tableNameMap.get(

View File

@@ -27,8 +27,8 @@ export function findResolver(collection: Collection): Resolver {
let { req } = context
const locale = req.locale
const fallbackLocale = req.fallbackLocale
req = isolateObjectProperty(req, 'locale')
req = isolateObjectProperty(req, 'fallbackLocale')
req = isolateObjectProperty(req, ['locale', 'fallbackLocale', 'transactionID'])
req.locale = args.locale || locale
req.fallbackLocale = args.fallbackLocale || fallbackLocale
if (!req.query) req.query = {}
@@ -49,7 +49,7 @@ export function findResolver(collection: Collection): Resolver {
draft: args.draft,
limit: args.limit,
page: args.page,
req: isolateObjectProperty(req, 'transactionID'),
req,
sort: args.sort,
where: args.where,
}

View File

@@ -166,7 +166,7 @@ type CreateCacheKeyArgs = {
locale: string
overrideAccess: boolean
showHiddenFields: boolean
transactionID: number | string
transactionID: Promise<number | string> | number | string
}
export const createDataloaderCacheKey = ({
collectionSlug,

View File

@@ -159,9 +159,9 @@ export type BeginTransaction = (
options?: Record<string, unknown>,
) => Promise<null | number | string>
export type RollbackTransaction = (id: number | string) => Promise<void>
export type RollbackTransaction = (id: Promise<number | string> | number | string) => Promise<void>
export type CommitTransaction = (id: number | string) => Promise<void>
export type CommitTransaction = (id: Promise<number | string> | number | string) => Promise<void>
export type QueryDraftsArgs = {
collection: string

View File

@@ -43,8 +43,9 @@ export type CustomPayloadRequestProperties = {
t: TFunction
/**
* Identifier for the database transaction for interactions in a single, all-or-nothing operation.
* Can also be used to ensure consistency when multiple operations try to create a transaction concurrently on the same request.
*/
transactionID?: number | string
transactionID?: Promise<number | string> | number | string
/**
* Used to ensure consistency when multiple operations try to create a transaction concurrently on the same request
*/

View File

@@ -5,25 +5,27 @@ import type { PayloadRequest } from '../types/index.js'
* @returns true if beginning a transaction and false when req already has a transaction to use
*/
export async function initTransaction(req: PayloadRequest): Promise<boolean> {
const { payload, transactionID, transactionIDPromise } = req
const { payload, transactionID } = req
if (transactionID instanceof Promise) {
// wait for whoever else is already creating the transaction
await transactionID
return false
}
if (transactionID) {
// we already have a transaction, we're not in charge of committing it
return false
}
if (transactionIDPromise) {
// wait for whoever else is already creating the transaction
await transactionIDPromise
return false
}
if (typeof payload.db.beginTransaction === 'function') {
// create a new transaction
req.transactionIDPromise = payload.db.beginTransaction().then((transactionID) => {
req.transactionID = payload.db.beginTransaction().then((transactionID) => {
if (transactionID) {
req.transactionID = transactionID
}
delete req.transactionIDPromise
return transactionID
})
await req.transactionIDPromise
await req.transactionID
return !!req.transactionID
}
return false

View File

@@ -1,20 +1,33 @@
/* eslint-disable no-restricted-exports */
/**
* Creates a proxy for the given object that has its own property
*/
export default function isolateObjectProperty<T>(object: T, key): T {
const delegate = {}
const handler = {
export default function isolateObjectProperty<T extends object>(
object: T,
key: (keyof T)[] | keyof T,
): T {
const keys = Array.isArray(key) ? key : [key]
const delegate = {} as T
// Initialize delegate with the keys, if they exist in the original object
for (const k of keys) {
if (k in object) {
delegate[k] = object[k]
}
}
const handler: ProxyHandler<T> = {
deleteProperty(target, p): boolean {
return Reflect.deleteProperty(p === key ? delegate : target, p)
return Reflect.deleteProperty(keys.includes(p as keyof T) ? delegate : target, p)
},
get(target, p, receiver) {
return Reflect.get(p === key ? delegate : target, p, receiver)
return Reflect.get(keys.includes(p as keyof T) ? delegate : target, p, receiver)
},
has(target, p) {
return Reflect.has(p === key ? delegate : target, p)
return Reflect.has(keys.includes(p as keyof T) ? delegate : target, p)
},
set(target, p, newValue, receiver) {
if (p === key) {
if (keys.includes(p as keyof T)) {
// in case of transactionID we must ignore any receiver, because
// "If provided and target does not have a setter for propertyKey, the property will be set on receiver instead."
return Reflect.set(delegate, p, newValue)

View File

@@ -5,7 +5,7 @@ import type { PayloadRequest } from '../types/index.js'
*/
export async function killTransaction(req: PayloadRequest): Promise<void> {
const { payload, transactionID } = req
if (transactionID) {
if (transactionID && !(transactionID instanceof Promise)) {
await payload.db.rollbackTransaction(req.transactionID)
delete req.transactionID
}

View File

@@ -1,5 +1,5 @@
'use client'
import type { ElementNode, LexicalEditor, LexicalNode } from 'lexical'
import type { ElementNode, LexicalEditor, LexicalNode, TextNode } from 'lexical'
import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext.js'
import { mergeRegister } from '@lexical/utils'
@@ -11,7 +11,6 @@ import {
$isNodeSelection,
$isRangeSelection,
$isTextNode,
type TextNode,
TextNode as TextNodeValue,
} from 'lexical'
import { useEffect } from 'react'

View File

@@ -1,7 +1,7 @@
'use client'
import type { ClientTranslationKeys, I18nClient } from '@payloadcms/translations'
import { getTranslation } from '@payloadcms/translations';
import { getTranslation } from '@payloadcms/translations'
import type { FieldMap } from '../../utilities/buildComponentMap.js'

View File

@@ -67,6 +67,48 @@ export default buildConfigWithDefaults({
singular: 'Relation B',
},
},
{
fields: [
{
name: 'name',
type: 'text',
},
{
name: 'items',
type: 'relationship',
relationTo: 'items',
hasMany: true,
},
],
slug: 'shops',
access: { read: () => true },
},
{
fields: [
{
name: 'name',
type: 'text',
},
{
name: 'itemTags',
type: 'relationship',
relationTo: 'itemTags',
hasMany: true,
},
],
slug: 'items',
access: { read: () => true },
},
{
fields: [
{
name: 'name',
type: 'text',
},
],
slug: 'itemTags',
access: { read: () => true },
},
],
onInit: async (payload) => {
const user = await payload.create({
@@ -82,6 +124,19 @@ export default buildConfigWithDefaults({
data: postDoc,
user,
})
const tag = await payload.create({
collection: 'itemTags',
data: { name: 'tag1' },
})
const item = await payload.create({
collection: 'items',
data: { name: 'item1', itemTags: [tag.id] },
})
const shop = await payload.create({
collection: 'shops',
data: { name: 'shop1', items: [item.id] },
})
},
typescript: {
outputFile: path.resolve(dirname, 'payload-types.ts'),

View File

@@ -32,6 +32,45 @@ describe('dataloader', () => {
})
describe('graphql', () => {
it('should allow multiple parallel queries', async () => {
for (let i = 0; i < 100; i++) {
const query = `
query {
Shops {
docs {
name
items {
name
}
}
}
Items {
docs {
name
itemTags {
name
}
}
}
}`
const { data } = await restClient
.GRAPHQL_POST({
body: JSON.stringify({ query }),
headers: {
Authorization: `JWT ${token}`,
},
})
.then((res) => res.json())
const normalizedResponse = JSON.parse(JSON.stringify(data))
expect(normalizedResponse).toStrictEqual({
Shops: { docs: [{ name: 'shop1', items: [{ name: 'item1' }] }] },
Items: { docs: [{ name: 'item1', itemTags: [{ name: 'tag1' }] }] },
})
}
})
it('should allow querying via graphql', async () => {
const query = `query {
Posts {