feat: Add Redis upload cache for multi-replica deployments (#5724)
This commit is contained in:
@@ -19,6 +19,7 @@ type modifyRequest struct {
|
||||
func NewHandler(
|
||||
imgSvc ImgService,
|
||||
fileCache FileCache,
|
||||
uploadCache UploadCache,
|
||||
store *storage.Storage,
|
||||
server *settings.Server,
|
||||
assetsFs fs.FS,
|
||||
@@ -67,10 +68,10 @@ func NewHandler(
|
||||
api.PathPrefix("/resources").Handler(monkey(resourcePutHandler, "/api/resources")).Methods("PUT")
|
||||
api.PathPrefix("/resources").Handler(monkey(resourcePatchHandler(fileCache), "/api/resources")).Methods("PATCH")
|
||||
|
||||
api.PathPrefix("/tus").Handler(monkey(tusPostHandler(), "/api/tus")).Methods("POST")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusHeadHandler(), "/api/tus")).Methods("HEAD", "GET")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusPatchHandler(), "/api/tus")).Methods("PATCH")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusDeleteHandler(), "/api/tus")).Methods("DELETE")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusPostHandler(uploadCache), "/api/tus")).Methods("POST")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusHeadHandler(uploadCache), "/api/tus")).Methods("HEAD", "GET")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusPatchHandler(uploadCache), "/api/tus")).Methods("PATCH")
|
||||
api.PathPrefix("/tus").Handler(monkey(tusDeleteHandler(uploadCache), "/api/tus")).Methods("DELETE")
|
||||
|
||||
api.PathPrefix("/usage").Handler(monkey(diskUsage, "/api/usage")).Methods("GET")
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package fbhttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -12,48 +11,13 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jellydator/ttlcache/v3"
|
||||
"github.com/spf13/afero"
|
||||
|
||||
"github.com/filebrowser/filebrowser/v2/files"
|
||||
)
|
||||
|
||||
const maxUploadWait = 3 * time.Minute
|
||||
|
||||
// Tracks active uploads along with their respective upload lengths
|
||||
var activeUploads = initActiveUploads()
|
||||
|
||||
func initActiveUploads() *ttlcache.Cache[string, int64] {
|
||||
cache := ttlcache.New[string, int64]()
|
||||
cache.OnEviction(func(_ context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, int64]) {
|
||||
if reason == ttlcache.EvictionReasonExpired {
|
||||
fmt.Printf("deleting incomplete upload file: \"%s\"", item.Key())
|
||||
os.Remove(item.Key())
|
||||
}
|
||||
})
|
||||
go cache.Start()
|
||||
|
||||
return cache
|
||||
}
|
||||
|
||||
func registerUpload(filePath string, fileSize int64) {
|
||||
activeUploads.Set(filePath, fileSize, maxUploadWait)
|
||||
}
|
||||
|
||||
func completeUpload(filePath string) {
|
||||
activeUploads.Delete(filePath)
|
||||
}
|
||||
|
||||
func getActiveUploadLength(filePath string) (int64, error) {
|
||||
item := activeUploads.Get(filePath)
|
||||
if item == nil {
|
||||
return 0, fmt.Errorf("no active upload found for the given path")
|
||||
}
|
||||
|
||||
return item.Value(), nil
|
||||
}
|
||||
|
||||
func keepUploadActive(filePath string) func() {
|
||||
// keepUploadActive periodically touches the cache entry to prevent eviction during transfer
|
||||
func keepUploadActive(cache UploadCache, filePath string) func() {
|
||||
stop := make(chan bool)
|
||||
|
||||
go func() {
|
||||
@@ -65,7 +29,7 @@ func keepUploadActive(filePath string) func() {
|
||||
case <-stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
activeUploads.Touch(filePath)
|
||||
cache.Touch(filePath)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -75,7 +39,7 @@ func keepUploadActive(filePath string) func() {
|
||||
}
|
||||
}
|
||||
|
||||
func tusPostHandler() handleFunc {
|
||||
func tusPostHandler(cache UploadCache) handleFunc {
|
||||
return withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||
if !d.user.Perm.Create || !d.Check(r.URL.Path) {
|
||||
return http.StatusForbidden, nil
|
||||
@@ -146,7 +110,7 @@ func tusPostHandler() handleFunc {
|
||||
}
|
||||
|
||||
// Enables the user to utilize the PATCH endpoint for uploading file data
|
||||
registerUpload(file.RealPath(), uploadLength)
|
||||
cache.Register(file.RealPath(), uploadLength)
|
||||
|
||||
path, err := url.JoinPath("/", d.server.BaseURL, "/api/tus", r.URL.Path)
|
||||
if err != nil {
|
||||
@@ -158,7 +122,7 @@ func tusPostHandler() handleFunc {
|
||||
})
|
||||
}
|
||||
|
||||
func tusHeadHandler() handleFunc {
|
||||
func tusHeadHandler(cache UploadCache) handleFunc {
|
||||
return withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||
w.Header().Set("Cache-Control", "no-store")
|
||||
if !d.user.Perm.Create || !d.Check(r.URL.Path) {
|
||||
@@ -177,7 +141,7 @@ func tusHeadHandler() handleFunc {
|
||||
return errToStatus(err), err
|
||||
}
|
||||
|
||||
uploadLength, err := getActiveUploadLength(file.RealPath())
|
||||
uploadLength, err := cache.GetLength(file.RealPath())
|
||||
if err != nil {
|
||||
return http.StatusNotFound, err
|
||||
}
|
||||
@@ -189,7 +153,7 @@ func tusHeadHandler() handleFunc {
|
||||
})
|
||||
}
|
||||
|
||||
func tusPatchHandler() handleFunc {
|
||||
func tusPatchHandler(cache UploadCache) handleFunc {
|
||||
return withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||
if !d.user.Perm.Create || !d.Check(r.URL.Path) {
|
||||
return http.StatusForbidden, nil
|
||||
@@ -219,13 +183,13 @@ func tusPatchHandler() handleFunc {
|
||||
return errToStatus(err), err
|
||||
}
|
||||
|
||||
uploadLength, err := getActiveUploadLength(file.RealPath())
|
||||
uploadLength, err := cache.GetLength(file.RealPath())
|
||||
if err != nil {
|
||||
return http.StatusNotFound, err
|
||||
}
|
||||
|
||||
// Prevent the upload from being evicted during the transfer
|
||||
stop := keepUploadActive(file.RealPath())
|
||||
stop := keepUploadActive(cache, file.RealPath())
|
||||
defer stop()
|
||||
|
||||
switch {
|
||||
@@ -266,7 +230,7 @@ func tusPatchHandler() handleFunc {
|
||||
w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
|
||||
|
||||
if newOffset >= uploadLength {
|
||||
completeUpload(file.RealPath())
|
||||
cache.Complete(file.RealPath())
|
||||
_ = d.RunHook(func() error { return nil }, "upload", r.URL.Path, "", d.user)
|
||||
}
|
||||
|
||||
@@ -274,7 +238,7 @@ func tusPatchHandler() handleFunc {
|
||||
})
|
||||
}
|
||||
|
||||
func tusDeleteHandler() handleFunc {
|
||||
func tusDeleteHandler(cache UploadCache) handleFunc {
|
||||
return withUser(func(_ http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||
if r.URL.Path == "/" || !d.user.Perm.Create {
|
||||
return http.StatusForbidden, nil
|
||||
@@ -292,7 +256,7 @@ func tusDeleteHandler() handleFunc {
|
||||
return errToStatus(err), err
|
||||
}
|
||||
|
||||
_, err = getActiveUploadLength(file.RealPath())
|
||||
_, err = cache.GetLength(file.RealPath())
|
||||
if err != nil {
|
||||
return http.StatusNotFound, err
|
||||
}
|
||||
@@ -302,7 +266,7 @@ func tusDeleteHandler() handleFunc {
|
||||
return errToStatus(err), err
|
||||
}
|
||||
|
||||
completeUpload(file.RealPath())
|
||||
cache.Complete(file.RealPath())
|
||||
|
||||
return http.StatusNoContent, nil
|
||||
})
|
||||
|
||||
85
http/upload_cache_memory.go
Normal file
85
http/upload_cache_memory.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package fbhttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jellydator/ttlcache/v3"
|
||||
)
|
||||
|
||||
const uploadCacheTTL = 3 * time.Minute
|
||||
|
||||
// UploadCache is an interface for tracking active uploads.
|
||||
// Allows for different backends (e.g. in-memory or redis)
|
||||
// to support both single instance and multi replica deployments.
|
||||
type UploadCache interface {
|
||||
// Register stores an upload with its expected file size
|
||||
Register(filePath string, fileSize int64)
|
||||
|
||||
// Complete removes an upload from the cache
|
||||
Complete(filePath string)
|
||||
|
||||
// GetLength returns the expected file size for an active upload
|
||||
GetLength(filePath string) (int64, error)
|
||||
|
||||
// Touch refreshes the TTL for an active upload
|
||||
Touch(filePath string)
|
||||
|
||||
// Close cleans up any resources
|
||||
Close()
|
||||
}
|
||||
|
||||
// memoryUploadCache is an upload cache for single replica deployments
|
||||
type memoryUploadCache struct {
|
||||
cache *ttlcache.Cache[string, int64]
|
||||
}
|
||||
|
||||
func newMemoryUploadCache() *memoryUploadCache {
|
||||
cache := ttlcache.New[string, int64]()
|
||||
cache.OnEviction(func(_ context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, int64]) {
|
||||
if reason == ttlcache.EvictionReasonExpired {
|
||||
fmt.Printf("deleting incomplete upload file: \"%s\"\n", item.Key())
|
||||
os.Remove(item.Key())
|
||||
}
|
||||
})
|
||||
go cache.Start()
|
||||
|
||||
return &memoryUploadCache{cache: cache}
|
||||
}
|
||||
|
||||
func (c *memoryUploadCache) Register(filePath string, fileSize int64) {
|
||||
c.cache.Set(filePath, fileSize, uploadCacheTTL)
|
||||
}
|
||||
|
||||
func (c *memoryUploadCache) Complete(filePath string) {
|
||||
c.cache.Delete(filePath)
|
||||
}
|
||||
|
||||
func (c *memoryUploadCache) GetLength(filePath string) (int64, error) {
|
||||
item := c.cache.Get(filePath)
|
||||
if item == nil {
|
||||
return 0, fmt.Errorf("no active upload found for the given path")
|
||||
}
|
||||
return item.Value(), nil
|
||||
}
|
||||
|
||||
func (c *memoryUploadCache) Touch(filePath string) {
|
||||
c.cache.Touch(filePath)
|
||||
}
|
||||
|
||||
func (c *memoryUploadCache) Close() {
|
||||
c.cache.Stop()
|
||||
}
|
||||
|
||||
// NewUploadCache creates a new upload cache.
|
||||
// If redisURL is empty, an in-memory cache will be used (suitable for single instance deployments).
|
||||
// Otherwise, Redis will be used for the cache (suitable for multi-instance deployments).
|
||||
// The redisURL can include credentials, e.g. redis://user:pass@host:port
|
||||
func NewUploadCache(redisURL string) (UploadCache, error) {
|
||||
if redisURL != "" {
|
||||
return newRedisUploadCache(redisURL)
|
||||
}
|
||||
return newMemoryUploadCache(), nil
|
||||
}
|
||||
82
http/upload_cache_redis.go
Normal file
82
http/upload_cache_redis.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package fbhttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// redisUploadCache is an upload cache for multi replica deployments
|
||||
type redisUploadCache struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func newRedisUploadCache(redisURL string) (*redisUploadCache, error) {
|
||||
if redisURL == "" {
|
||||
return nil, fmt.Errorf("redis URL is required")
|
||||
}
|
||||
|
||||
opts, err := redis.ParseURL(redisURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid redis URL: %w", err)
|
||||
}
|
||||
|
||||
client := redis.NewClient(opts)
|
||||
|
||||
// Test connection
|
||||
if err := client.Ping(context.Background()).Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to redis: %w", err)
|
||||
}
|
||||
|
||||
return &redisUploadCache{client: client}, nil
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) filePathKey(filePath string) string {
|
||||
return "filebrowser:upload:" + filePath
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) Register(filePath string, fileSize int64) {
|
||||
err := c.client.Set(context.Background(), c.filePathKey(filePath), fileSize, uploadCacheTTL).Err()
|
||||
if err != nil {
|
||||
log.Printf("failed to register upload in redis cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) Complete(filePath string) {
|
||||
err := c.client.Del(context.Background(), c.filePathKey(filePath)).Err()
|
||||
if err != nil {
|
||||
log.Printf("failed to complete upload in redis cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) GetLength(filePath string) (int64, error) {
|
||||
result, err := c.client.Get(context.Background(), c.filePathKey(filePath)).Result()
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return 0, fmt.Errorf("no active upload found for the given path")
|
||||
}
|
||||
return 0, fmt.Errorf("redis error: %w", err)
|
||||
}
|
||||
|
||||
size, err := strconv.ParseInt(result, 10, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid upload length in cache: %w", err)
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) Touch(filePath string) {
|
||||
err := c.client.Expire(context.Background(), c.filePathKey(filePath), uploadCacheTTL).Err()
|
||||
if err != nil {
|
||||
log.Printf("failed to touch upload in redis cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisUploadCache) Close() {
|
||||
c.client.Close()
|
||||
}
|
||||
Reference in New Issue
Block a user