initial v0.8 pre-release
This commit is contained in:
126
apis/realtime.go
126
apis/realtime.go
@@ -15,13 +15,12 @@ import (
|
||||
"github.com/pocketbase/pocketbase/forms"
|
||||
"github.com/pocketbase/pocketbase/models"
|
||||
"github.com/pocketbase/pocketbase/resolvers"
|
||||
"github.com/pocketbase/pocketbase/tools/rest"
|
||||
"github.com/pocketbase/pocketbase/tools/search"
|
||||
"github.com/pocketbase/pocketbase/tools/subscriptions"
|
||||
)
|
||||
|
||||
// BindRealtimeApi registers the realtime api endpoints.
|
||||
func BindRealtimeApi(app core.App, rg *echo.Group) {
|
||||
// bindRealtimeApi registers the realtime api endpoints.
|
||||
func bindRealtimeApi(app core.App, rg *echo.Group) {
|
||||
api := realtimeApi{app: app}
|
||||
|
||||
subGroup := rg.Group("/realtime", ActivityLogger(app))
|
||||
@@ -113,25 +112,25 @@ func (api *realtimeApi) setSubscriptions(c echo.Context) error {
|
||||
|
||||
// read request data
|
||||
if err := c.Bind(form); err != nil {
|
||||
return rest.NewBadRequestError("", err)
|
||||
return NewBadRequestError("", err)
|
||||
}
|
||||
|
||||
// validate request data
|
||||
if err := form.Validate(); err != nil {
|
||||
return rest.NewBadRequestError("", err)
|
||||
return NewBadRequestError("", err)
|
||||
}
|
||||
|
||||
// find subscription client
|
||||
client, err := api.app.SubscriptionsBroker().ClientById(form.ClientId)
|
||||
if err != nil {
|
||||
return rest.NewNotFoundError("Missing or invalid client id.", err)
|
||||
return NewNotFoundError("Missing or invalid client id.", err)
|
||||
}
|
||||
|
||||
// check if the previous request was authorized
|
||||
oldAuthId := extractAuthIdFromGetter(client)
|
||||
newAuthId := extractAuthIdFromGetter(c)
|
||||
if oldAuthId != "" && oldAuthId != newAuthId {
|
||||
return rest.NewForbiddenError("The current and the previous request authorization don't match.", nil)
|
||||
return NewForbiddenError("The current and the previous request authorization don't match.", nil)
|
||||
}
|
||||
|
||||
event := &core.RealtimeSubscribeEvent{
|
||||
@@ -143,7 +142,7 @@ func (api *realtimeApi) setSubscriptions(c echo.Context) error {
|
||||
handlerErr := api.app.OnRealtimeBeforeSubscribeRequest().Trigger(event, func(e *core.RealtimeSubscribeEvent) error {
|
||||
// update auth state
|
||||
e.Client.Set(ContextAdminKey, e.HttpContext.Get(ContextAdminKey))
|
||||
e.Client.Set(ContextUserKey, e.HttpContext.Get(ContextUserKey))
|
||||
e.Client.Set(ContextAuthRecordKey, e.HttpContext.Get(ContextAuthRecordKey))
|
||||
|
||||
// unsubscribe from any previous existing subscriptions
|
||||
e.Client.Unsubscribe()
|
||||
@@ -161,53 +160,52 @@ func (api *realtimeApi) setSubscriptions(c echo.Context) error {
|
||||
return handlerErr
|
||||
}
|
||||
|
||||
// updateClientsAuthModel updates the existing clients auth model with the new one (matched by ID).
|
||||
func (api *realtimeApi) updateClientsAuthModel(contextKey string, newModel models.Model) error {
|
||||
for _, client := range api.app.SubscriptionsBroker().Clients() {
|
||||
clientModel, _ := client.Get(contextKey).(models.Model)
|
||||
if clientModel != nil && clientModel.GetId() == newModel.GetId() {
|
||||
client.Set(contextKey, newModel)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// unregisterClientsByAuthModel unregister all clients that has the provided auth model.
|
||||
func (api *realtimeApi) unregisterClientsByAuthModel(contextKey string, model models.Model) error {
|
||||
for _, client := range api.app.SubscriptionsBroker().Clients() {
|
||||
clientModel, _ := client.Get(contextKey).(models.Model)
|
||||
if clientModel != nil && clientModel.GetId() == model.GetId() {
|
||||
api.app.SubscriptionsBroker().Unregister(client.Id())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *realtimeApi) bindEvents() {
|
||||
userTable := (&models.User{}).TableName()
|
||||
adminTable := (&models.Admin{}).TableName()
|
||||
|
||||
// update user/admin auth state
|
||||
// update the clients that has admin or auth record association
|
||||
api.app.OnModelAfterUpdate().PreAdd(func(e *core.ModelEvent) error {
|
||||
modelTable := e.Model.TableName()
|
||||
|
||||
var contextKey string
|
||||
switch modelTable {
|
||||
case userTable:
|
||||
contextKey = ContextUserKey
|
||||
case adminTable:
|
||||
contextKey = ContextAdminKey
|
||||
default:
|
||||
return nil
|
||||
if record, ok := e.Model.(*models.Record); ok && record != nil && record.Collection().IsAuth() {
|
||||
return api.updateClientsAuthModel(ContextAuthRecordKey, record)
|
||||
}
|
||||
|
||||
for _, client := range api.app.SubscriptionsBroker().Clients() {
|
||||
model, _ := client.Get(contextKey).(models.Model)
|
||||
if model != nil && model.GetId() == e.Model.GetId() {
|
||||
client.Set(contextKey, e.Model)
|
||||
}
|
||||
if admin, ok := e.Model.(*models.Admin); ok && admin != nil {
|
||||
return api.updateClientsAuthModel(ContextAdminKey, admin)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// remove user/admin client(s)
|
||||
// remove the client(s) associated to the deleted admin or auth record
|
||||
api.app.OnModelAfterDelete().PreAdd(func(e *core.ModelEvent) error {
|
||||
modelTable := e.Model.TableName()
|
||||
|
||||
var contextKey string
|
||||
switch modelTable {
|
||||
case userTable:
|
||||
contextKey = ContextUserKey
|
||||
case adminTable:
|
||||
contextKey = ContextAdminKey
|
||||
default:
|
||||
return nil
|
||||
if record, ok := e.Model.(*models.Record); ok && record != nil && record.Collection().IsAuth() {
|
||||
return api.unregisterClientsByAuthModel(ContextAuthRecordKey, record)
|
||||
}
|
||||
|
||||
for _, client := range api.app.SubscriptionsBroker().Clients() {
|
||||
model, _ := client.Get(contextKey).(models.Model)
|
||||
if model != nil && model.GetId() == e.Model.GetId() {
|
||||
api.app.SubscriptionsBroker().Unregister(client.Id())
|
||||
}
|
||||
if admin, ok := e.Model.(*models.Admin); ok && admin != nil {
|
||||
return api.unregisterClientsByAuthModel(ContextAdminKey, admin)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -254,17 +252,17 @@ func (api *realtimeApi) canAccessRecord(client subscriptions.Client, record *mod
|
||||
|
||||
// emulate request data
|
||||
requestData := map[string]any{
|
||||
"method": "get",
|
||||
"method": "GET",
|
||||
"query": map[string]any{},
|
||||
"data": map[string]any{},
|
||||
"user": nil,
|
||||
"auth": nil,
|
||||
}
|
||||
user, _ := client.Get(ContextUserKey).(*models.User)
|
||||
if user != nil {
|
||||
requestData["user"], _ = user.AsMap()
|
||||
authRecord, _ := client.Get(ContextAuthRecordKey).(*models.Record)
|
||||
if authRecord != nil {
|
||||
requestData["auth"] = authRecord.PublicExport()
|
||||
}
|
||||
|
||||
resolver := resolvers.NewRecordFieldResolver(api.app.Dao(), record.Collection(), requestData)
|
||||
resolver := resolvers.NewRecordFieldResolver(api.app.Dao(), record.Collection(), requestData, true)
|
||||
expr, err := search.FilterData(*accessRule).BuildExpr(resolver)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -275,7 +273,7 @@ func (api *realtimeApi) canAccessRecord(client subscriptions.Client, record *mod
|
||||
return nil
|
||||
}
|
||||
|
||||
foundRecord, err := api.app.Dao().FindRecordById(record.Collection(), record.Id, ruleFunc)
|
||||
foundRecord, err := api.app.Dao().FindRecordById(record.Collection().Id, record.Id, ruleFunc)
|
||||
if err == nil && foundRecord != nil {
|
||||
return true
|
||||
}
|
||||
@@ -303,6 +301,8 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
// know if the clients have access to view the expanded records
|
||||
cleanRecord := *record
|
||||
cleanRecord.SetExpand(nil)
|
||||
cleanRecord.WithUnkownData(false)
|
||||
cleanRecord.IgnoreEmailVisibility(false)
|
||||
|
||||
subscriptionRuleMap := map[string]*string{
|
||||
(collection.Name + "/" + cleanRecord.Id): collection.ViewRule,
|
||||
@@ -316,7 +316,7 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
Record: &cleanRecord,
|
||||
}
|
||||
|
||||
serializedData, err := json.Marshal(data)
|
||||
dataBytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
if api.app.IsDebug() {
|
||||
log.Println(err)
|
||||
@@ -324,6 +324,8 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
return err
|
||||
}
|
||||
|
||||
encodedData := string(dataBytes)
|
||||
|
||||
for _, client := range clients {
|
||||
for subscription, rule := range subscriptionRuleMap {
|
||||
if !client.HasSubscription(subscription) {
|
||||
@@ -336,7 +338,21 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
|
||||
msg := subscriptions.Message{
|
||||
Name: subscription,
|
||||
Data: string(serializedData),
|
||||
Data: encodedData,
|
||||
}
|
||||
|
||||
// ignore the auth record email visibility checks for
|
||||
// auth owner, admin or manager
|
||||
if collection.IsAuth() {
|
||||
authId := extractAuthIdFromGetter(client)
|
||||
if authId == data.Record.Id ||
|
||||
api.canAccessRecord(client, data.Record, collection.AuthOptions().ManageRule) {
|
||||
data.Record.IgnoreEmailVisibility(true) // ignore
|
||||
if newData, err := json.Marshal(data); err == nil {
|
||||
msg.Data = string(newData)
|
||||
}
|
||||
data.Record.IgnoreEmailVisibility(false) // restore
|
||||
}
|
||||
}
|
||||
|
||||
client.Channel() <- msg
|
||||
@@ -351,9 +367,9 @@ type getter interface {
|
||||
}
|
||||
|
||||
func extractAuthIdFromGetter(val getter) string {
|
||||
user, _ := val.Get(ContextUserKey).(*models.User)
|
||||
if user != nil {
|
||||
return user.Id
|
||||
record, _ := val.Get(ContextAuthRecordKey).(*models.Record)
|
||||
if record != nil {
|
||||
return record.Id
|
||||
}
|
||||
|
||||
admin, _ := val.Get(ContextAdminKey).(*models.Admin)
|
||||
|
||||
Reference in New Issue
Block a user