added select auto fail/retry

This commit is contained in:
Gani Georgiev
2023-02-21 16:38:12 +02:00
parent 0afb09b3bd
commit 41c3cc8a90
9 changed files with 335 additions and 129 deletions

View File

@@ -4,19 +4,12 @@
package daos
import (
"context"
"errors"
"strings"
"sync"
"time"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/models"
"golang.org/x/sync/semaphore"
)
const DefaultMaxFailRetries = 5
// New creates a new Dao instance with the provided db builder
// (for both async and sync db operations).
func New(db dbx.Builder) *Dao {
@@ -39,10 +32,6 @@ type Dao struct {
concurrentDB dbx.Builder
nonconcurrentDB dbx.Builder
// @todo delete after removing Block and Continue
sem *semaphore.Weighted
mux sync.RWMutex
BeforeCreateFunc func(eventDao *Dao, m models.Model) error
AfterCreateFunc func(eventDao *Dao, m models.Model)
BeforeUpdateFunc func(eventDao *Dao, m models.Model) error
@@ -74,55 +63,15 @@ func (dao *Dao) NonconcurrentDB() dbx.Builder {
return dao.nonconcurrentDB
}
// Deprecated: Will be removed in the next releases. Use [Dao.NonconcurrentDB()] instead.
//
// Block acquires a lock and blocks all other go routines that uses
// the Dao instance until dao.Continue() is called, effectively making
// the concurrent requests to perform synchronous db operations.
//
// This method should be used only as a last resort and as a workaround
// for the SQLITE_BUSY error when mixing read&write in a transaction.
//
// Example:
//
// func someLongRunningTransaction() error {
// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// defer cancel()
// if err := app.Dao().Block(ctx); err != nil {
// return err
// }
// defer app.Dao().Continue()
//
// return app.Dao().RunInTransaction(func (txDao *daos.Dao) error {
// // some long running read&write transaction...
// })
// }
func (dao *Dao) Block(ctx context.Context) error {
if dao.sem == nil {
dao.mux.Lock()
dao.sem = semaphore.NewWeighted(1)
dao.mux.Unlock()
}
return dao.sem.Acquire(ctx, 1)
}
// Deprecated: Will be removed in the next releases. Use [Dao.NonconcurrentDB()] instead.
//
// Continue releases the previously acquired Block() lock.
func (dao *Dao) Continue() {
if dao.sem == nil {
return
}
dao.sem.Release(1)
}
// ModelQuery creates a new query with preset Select and From fields
// based on the provided model argument.
func (dao *Dao) ModelQuery(m models.Model) *dbx.SelectQuery {
tableName := m.TableName()
return dao.DB().Select("{{" + tableName + "}}.*").From(tableName)
return dao.DB().
Select("{{" + tableName + "}}.*").
From(tableName).
WithExecHook(onLockErrorRetry)
}
// FindById finds a single db record with the specified id and
@@ -224,7 +173,7 @@ func (dao *Dao) Delete(m models.Model) error {
return errors.New("ID is not set")
}
return dao.failRetry(func(retryDao *Dao) error {
return dao.lockRetry(func(retryDao *Dao) error {
if retryDao.BeforeDeleteFunc != nil {
if err := retryDao.BeforeDeleteFunc(retryDao, m); err != nil {
return err
@@ -240,20 +189,20 @@ func (dao *Dao) Delete(m models.Model) error {
}
return nil
}, DefaultMaxFailRetries)
}, defaultMaxRetries)
}
// Save upserts (update or create if primary key is not set) the provided model.
func (dao *Dao) Save(m models.Model) error {
if m.IsNew() {
return dao.failRetry(func(retryDao *Dao) error {
return dao.lockRetry(func(retryDao *Dao) error {
return retryDao.create(m)
}, DefaultMaxFailRetries)
}, defaultMaxRetries)
}
return dao.failRetry(func(retryDao *Dao) error {
return dao.lockRetry(func(retryDao *Dao) error {
return retryDao.update(m)
}, DefaultMaxFailRetries)
}, defaultMaxRetries)
}
func (dao *Dao) update(m models.Model) error {
@@ -347,32 +296,19 @@ func (dao *Dao) create(m models.Model) error {
return nil
}
func (dao *Dao) failRetry(op func(retryDao *Dao) error, maxRetries int) error {
func (dao *Dao) lockRetry(op func(retryDao *Dao) error, maxRetries int) error {
retryDao := dao
attempts := 1
Retry:
if attempts == 2 {
// assign new Dao without the before hooks to avoid triggering
// the already fired before callbacks multiple times
retryDao = NewMultiDB(dao.concurrentDB, dao.nonconcurrentDB)
retryDao.AfterCreateFunc = dao.AfterCreateFunc
retryDao.AfterUpdateFunc = dao.AfterUpdateFunc
retryDao.AfterDeleteFunc = dao.AfterDeleteFunc
}
return baseLockRetry(func(attempt int) error {
if attempt == 2 {
// assign new Dao without the before hooks to avoid triggering
// the already fired before callbacks multiple times
retryDao = NewMultiDB(dao.concurrentDB, dao.nonconcurrentDB)
retryDao.AfterCreateFunc = dao.AfterCreateFunc
retryDao.AfterUpdateFunc = dao.AfterUpdateFunc
retryDao.AfterDeleteFunc = dao.AfterDeleteFunc
}
// execute
err := op(retryDao)
if err != nil &&
attempts < maxRetries &&
// note: we are checking the err message so that we can handle both the cgo and noncgo errors
strings.Contains(err.Error(), "database is locked") {
// wait and retry
time.Sleep(time.Duration(200*attempts) * time.Millisecond)
attempts++
goto Retry
}
return err
return op(retryDao)
}, maxRetries)
}