diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 09e83a185..d4275d102 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -32,16 +32,18 @@ import ( const ( // maxEmbeddingDataSize is the min size of an embedding object in bytes minEmbeddingDataSize = 2048 - embeddingFetchTimeout = 15 * gTime.Second + embeddingFetchTimeout = 10 * gTime.Second ) // _fetchConfig is the configuration for the fetching objects from S3 type _fetchConfig struct { - RetryCount int - FetchTimeOut gTime.Duration + RetryCount int + InitialTimeout gTime.Duration + MaxTimeout gTime.Duration } -var _defaultFetchConfig = _fetchConfig{RetryCount: 3, FetchTimeOut: 15 * gTime.Second} +var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 5 * gTime.Second, MaxTimeout: 30 * gTime.Second} +var _b2FetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 15 * gTime.Second, MaxTimeout: 30 * gTime.Second} type Controller struct { Repo *embedding.Repository @@ -333,11 +335,20 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, dc string) (ente.EmbeddingObject, error) { opt := _defaultFetchConfig + if dc == c.S3Config.GetHotBackblazeDC() { + opt = _b2FetchConfig + } ctxLogger := log.WithField("objectKey", objectKey) totalAttempts := opt.RetryCount + 1 + timeout := opt.InitialTimeout for i := 0; i < totalAttempts; i++ { - // Create a new context with a timeout for each fetch - fetchCtx, cancel := context.WithTimeout(ctx, opt.FetchTimeOut) + if i > 0 { + timeout = timeout * 2 + if timeout > opt.MaxTimeout { + timeout = opt.MaxTimeout + } + } + fetchCtx, cancel := context.WithTimeout(ctx, timeout) select { case <-ctx.Done(): cancel()