Kaynağa Gözat

enhance: memory management in search indexing #1240

0xJacky 1 hafta önce
ebeveyn
işleme
f67bff7a4f
2 değiştirilmiş dosya ile 182 ekleme ve 61 silme
  1. 66 9
      internal/cache/index.go
  2. 116 52
      internal/cache/search.go

+ 66 - 9
internal/cache/index.go

@@ -47,6 +47,31 @@ func InitScanner(ctx context.Context) {
 	}
 }
 
+// shouldSkipPath checks if a path should be skipped during scanning or watching
+func shouldSkipPath(path string) bool {
+	// Define directories to exclude from scanning/watching
+	excludedDirs := []string{
+		nginx.GetConfPath("ssl"),              // SSL certificates and keys
+		nginx.GetConfPath("cache"),            // Nginx cache files
+		nginx.GetConfPath("logs"),             // Log files directory
+		nginx.GetConfPath("temp"),             // Temporary files directory
+		nginx.GetConfPath("proxy_temp"),       // Proxy temporary files
+		nginx.GetConfPath("client_body_temp"), // Client body temporary files
+		nginx.GetConfPath("fastcgi_temp"),     // FastCGI temporary files
+		nginx.GetConfPath("uwsgi_temp"),       // uWSGI temporary files
+		nginx.GetConfPath("scgi_temp"),        // SCGI temporary files
+	}
+
+	// Check if path starts with any excluded directory
+	for _, excludedDir := range excludedDirs {
+		if excludedDir != "" && strings.HasPrefix(path, excludedDir) {
+			return true
+		}
+	}
+
+	return false
+}
+
 // GetScanner returns the singleton scanner instance
 func GetScanner() *Scanner {
 	scannerInitMutex.Lock()
@@ -95,7 +120,6 @@ func (s *Scanner) Initialize(ctx context.Context) error {
 // watchAllDirectories recursively adds all directories under nginx config path to watcher
 func (s *Scanner) watchAllDirectories() error {
 	root := nginx.GetConfPath()
-	sslDir := nginx.GetConfPath("ssl")
 
 	return filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
 		if err != nil {
@@ -103,8 +127,8 @@ func (s *Scanner) watchAllDirectories() error {
 		}
 
 		if d.IsDir() {
-			// Skip ssl directory
-			if path == sslDir {
+			// Skip excluded directories (ssl, cache, logs, temp, etc.)
+			if shouldSkipPath(path) {
 				return filepath.SkipDir
 			}
 
@@ -170,9 +194,8 @@ func (s *Scanner) handleFileEvent(event fsnotify.Event) {
 		return
 	}
 
-	// Skip ssl directory
-	sslDir := nginx.GetConfPath("ssl")
-	if strings.HasPrefix(event.Name, sslDir) {
+	// Skip excluded directories (ssl, cache, etc.)
+	if shouldSkipPath(event.Name) {
 		return
 	}
 
@@ -212,6 +235,41 @@ func (s *Scanner) scanSingleFile(filePath string) error {
 	s.setScanningState(true)
 	defer s.setScanningState(false)
 
+	// Check if path should be skipped
+	if shouldSkipPath(filePath) {
+		return nil
+	}
+
+	// Get file info to check type and size
+	fileInfo, err := os.Lstat(filePath) // Use Lstat to avoid following symlinks
+	if err != nil {
+		return err
+	}
+
+	// Skip directories
+	if fileInfo.IsDir() {
+		logger.Debugf("Skipping directory: %s", filePath)
+		return nil
+	}
+
+	// Skip symlinks to avoid potential issues
+	if fileInfo.Mode()&os.ModeSymlink != 0 {
+		logger.Debugf("Skipping symlink: %s", filePath)
+		return nil
+	}
+
+	// Skip non-regular files (devices, pipes, sockets, etc.)
+	if !fileInfo.Mode().IsRegular() {
+		logger.Debugf("Skipping non-regular file: %s (mode: %s)", filePath, fileInfo.Mode())
+		return nil
+	}
+
+	// Skip files larger than 1MB before reading
+	if fileInfo.Size() > 1024*1024 {
+		logger.Debugf("Skipping large file: %s (size: %d bytes)", filePath, fileInfo.Size())
+		return nil
+	}
+
 	// Read file content
 	content, err := os.ReadFile(filePath)
 	if err != nil {
@@ -256,7 +314,6 @@ func (s *Scanner) ScanAllConfigs() error {
 	defer s.setScanningState(false)
 
 	root := nginx.GetConfPath()
-	sslDir := nginx.GetConfPath("ssl")
 
 	// Scan all files in the config directory and subdirectories
 	return filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
@@ -264,8 +321,8 @@ func (s *Scanner) ScanAllConfigs() error {
 			return err
 		}
 
-		// Skip ssl directory
-		if d.IsDir() && path == sslDir {
+		// Skip excluded directories (ssl, cache, logs, temp, etc.)
+		if d.IsDir() && shouldSkipPath(path) {
 			return filepath.SkipDir
 		}
 

+ 116 - 52
internal/cache/search.go

@@ -13,6 +13,7 @@ import (
 	"github.com/blevesearch/bleve/v2/analysis/lang/en"
 	"github.com/blevesearch/bleve/v2/mapping"
 	"github.com/blevesearch/bleve/v2/search/query"
+	"github.com/gabriel-vasile/mimetype"
 	"github.com/uozi-tech/cosy/logger"
 )
 
@@ -40,6 +41,12 @@ type SearchIndexer struct {
 	ctx         context.Context
 	cancel      context.CancelFunc
 	cleanupOnce sync.Once
+
+	// Memory management
+	totalContentSize int64
+	documentCount    int64
+	maxMemoryUsage   int64
+	memoryMutex      sync.RWMutex
 }
 
 var (
@@ -57,7 +64,8 @@ func GetSearchIndexer() *SearchIndexer {
 		}
 
 		searchIndexer = &SearchIndexer{
-			indexPath: tempDir,
+			indexPath:      tempDir,
+			maxMemoryUsage: 100 * 1024 * 1024, // 100MB memory limit for indexed content
 		}
 	})
 	return searchIndexer
@@ -131,6 +139,12 @@ func (si *SearchIndexer) cleanup() {
 			si.index = nil
 		}
 
+		// Reset memory tracking
+		si.memoryMutex.Lock()
+		si.totalContentSize = 0
+		si.documentCount = 0
+		si.memoryMutex.Unlock()
+
 		// Remove the temporary directory
 		if err := os.RemoveAll(si.indexPath); err != nil {
 			logger.Error("Failed to remove search index directory:", err)
@@ -191,10 +205,10 @@ func (si *SearchIndexer) handleConfigScan(configPath string, content []byte) (er
 		}
 	}()
 
-	// File size limit: 10MB to prevent memory overflow
-	const maxFileSize = 10 * 1024 * 1024 // 10MB
+	// File size limit: 1MB to prevent memory overflow and improve performance
+	const maxFileSize = 1024 * 1024 // 1MB
 	if len(content) > maxFileSize {
-		logger.Warn("Skipping file due to size limit", "path", configPath, "size", len(content), "limit", maxFileSize)
+		logger.Debugf("Skipping file due to size limit, path: %s, size: %d, limit: %d", configPath, len(content), maxFileSize)
 		return nil
 	}
 
@@ -203,9 +217,9 @@ func (si *SearchIndexer) handleConfigScan(configPath string, content []byte) (er
 		return nil
 	}
 
-	// Basic content validation: check if it's text content
-	if !isTextContent(content) {
-		logger.Warn("Skipping non-text file", "path", configPath)
+	// Basic content validation: check if it's a configuration file
+	if !isConfigFile(content) {
+		logger.Debugf("Skipping non-config file: %s", configPath)
 		return nil
 	}
 
@@ -249,6 +263,18 @@ func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
 		}
 	}()
 
+	// Additional size check as a safety measure
+	if len(doc.Content) > 2*1024*1024 { // 2MB absolute limit
+		return fmt.Errorf("document content too large: %d bytes", len(doc.Content))
+	}
+
+	// Check memory usage before indexing
+	contentSize := int64(len(doc.Content))
+	if !si.checkMemoryLimitBeforeIndexing(contentSize) {
+		logger.Warn("Skipping document due to memory limit", "document_id", doc.ID, "content_size", contentSize)
+		return nil
+	}
+
 	si.indexMutex.RLock()
 	defer si.indexMutex.RUnlock()
 
@@ -256,15 +282,19 @@ func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
 		return fmt.Errorf("search index not initialized")
 	}
 
-	// Additional size check as a safety measure
-	if len(doc.Content) > 50*1024*1024 { // 50MB absolute limit
-		return fmt.Errorf("document content too large: %d bytes", len(doc.Content))
+	// Index the document
+	err = si.index.Index(doc.ID, doc)
+	if err != nil {
+		return err
 	}
 
+	// Update memory usage tracking
+	si.updateMemoryUsage(doc.ID, contentSize, true)
+
 	// logger.Debugf("Indexing document: ID=%s, Type=%s, Name=%s, Path=%s",
 	// 	doc.ID, doc.Type, doc.Name, doc.Path)
 
-	return si.index.Index(doc.ID, doc)
+	return nil
 }
 
 // Search performs a search query
@@ -324,7 +354,7 @@ func (si *SearchIndexer) searchWithType(ctx context.Context, queryStr string, do
 		}
 		results := si.convertResults(res.result)
 
-		// Debug log the search execution
+		// log the search execution
 		logger.Debugf("Search index query '%s' (type: %s, limit: %d) returned %d results",
 			queryStr, docType, limit, len(results))
 
@@ -436,6 +466,10 @@ func (si *SearchIndexer) DeleteDocument(docID string) error {
 		return fmt.Errorf("search index not initialized")
 	}
 
+	// Note: We don't track the exact size of deleted documents here
+	// as it would require storing document sizes separately.
+	// The memory tracking will reset during periodic cleanups or restarts.
+
 	return si.index.Delete(docID)
 }
 
@@ -499,9 +533,16 @@ func (si *SearchIndexer) GetIndexStats() (map[string]interface{}, error) {
 		return nil, err
 	}
 
+	// Get memory usage statistics
+	totalContentSize, trackedDocCount, maxMemoryUsage := si.getMemoryUsage()
+
 	return map[string]interface{}{
-		"document_count": docCount,
-		"index_path":     si.indexPath,
+		"document_count":         docCount,
+		"tracked_document_count": trackedDocCount,
+		"total_content_size":     totalContentSize,
+		"max_memory_usage":       maxMemoryUsage,
+		"memory_usage_percent":   float64(totalContentSize) / float64(maxMemoryUsage) * 100,
+		"index_path":             si.indexPath,
 	}, nil
 }
 
@@ -537,49 +578,72 @@ func SearchAll(ctx context.Context, query string, limit int) ([]SearchResult, er
 	return GetSearchIndexer().Search(ctx, query, limit)
 }
 
-// isTextContent checks if the content appears to be text-based
-// This helps prevent indexing binary files that might have been misidentified
-func isTextContent(content []byte) bool {
-	if len(content) == 0 {
-		return true // Empty content is considered text
-	}
-
-	// Check for common binary file signatures
-	if len(content) >= 4 {
-		// Check for some common binary file headers
-		switch {
-		case content[0] == 0x7F && content[1] == 0x45 && content[2] == 0x4C && content[3] == 0x46: // ELF
-			return false
-		case content[0] == 0x89 && content[1] == 0x50 && content[2] == 0x4E && content[3] == 0x47: // PNG
-			return false
-		case content[0] == 0xFF && content[1] == 0xD8 && content[2] == 0xFF: // JPEG
-			return false
-		case content[0] == 0x50 && content[1] == 0x4B && content[2] == 0x03 && content[3] == 0x04: // ZIP
-			return false
-		case content[0] == 0x50 && content[1] == 0x4B && content[2] == 0x05 && content[3] == 0x06: // ZIP (empty)
-			return false
-		case content[0] == 0x50 && content[1] == 0x4B && content[2] == 0x07 && content[3] == 0x08: // ZIP (spanned)
-			return false
-		}
+// checkMemoryLimitBeforeIndexing checks if adding new content would exceed memory limits
+func (si *SearchIndexer) checkMemoryLimitBeforeIndexing(contentSize int64) bool {
+	si.memoryMutex.RLock()
+	defer si.memoryMutex.RUnlock()
+
+	// Check if adding this content would exceed the memory limit
+	newTotalSize := si.totalContentSize + contentSize
+	if newTotalSize > si.maxMemoryUsage {
+		logger.Debugf("Memory limit would be exceeded: current=%d, new=%d, limit=%d",
+			si.totalContentSize, newTotalSize, si.maxMemoryUsage)
+		return false
 	}
 
-	// Check if the first part of the content contains mostly printable characters
-	// Sample up to 8KB for performance
-	sampleSize := len(content)
-	if sampleSize > 8192 {
-		sampleSize = 8192
+	// Also check document count limit (max 1000 documents)
+	if si.documentCount >= 1000 {
+		logger.Debugf("Document count limit reached: %d", si.documentCount)
+		return false
 	}
 
-	nonPrintableCount := 0
-	for i := 0; i < sampleSize; i++ {
-		b := content[i]
-		// Allow printable ASCII characters, newlines, tabs, and carriage returns
-		if (b < 32 && b != 9 && b != 10 && b != 13) || b > 126 {
-			nonPrintableCount++
+	return true
+}
+
+// updateMemoryUsage updates the memory usage tracking
+func (si *SearchIndexer) updateMemoryUsage(documentID string, contentSize int64, isAddition bool) {
+	si.memoryMutex.Lock()
+	defer si.memoryMutex.Unlock()
+
+	if isAddition {
+		si.totalContentSize += contentSize
+		si.documentCount++
+		logger.Debugf("Added document %s: size=%d, total_size=%d, count=%d",
+			documentID, contentSize, si.totalContentSize, si.documentCount)
+	} else {
+		si.totalContentSize -= contentSize
+		si.documentCount--
+		if si.totalContentSize < 0 {
+			si.totalContentSize = 0
 		}
+		if si.documentCount < 0 {
+			si.documentCount = 0
+		}
+		logger.Debugf("Removed document %s: size=%d, total_size=%d, count=%d",
+			documentID, contentSize, si.totalContentSize, si.documentCount)
+	}
+}
+
+// getMemoryUsage returns current memory usage statistics
+func (si *SearchIndexer) getMemoryUsage() (int64, int64, int64) {
+	si.memoryMutex.RLock()
+	defer si.memoryMutex.RUnlock()
+	return si.totalContentSize, si.documentCount, si.maxMemoryUsage
+}
+
+// isConfigFile checks if the content is a text/plain file (most nginx configs)
+func isConfigFile(content []byte) bool {
+	if len(content) == 0 {
+		return false // Empty files are not useful for configuration
+	}
+
+	// Detect MIME type and only accept text/plain
+	mtype := mimetype.Detect(content)
+
+	if mtype.Is("text/plain") {
+		return true
 	}
 
-	// If more than 30% of the sampled content is non-printable, consider it binary
-	threshold := float64(sampleSize) * 0.3
-	return float64(nonPrintableCount) <= threshold
+	logger.Debugf("Skipping non-text/plain file with MIME type: %s", mtype.String())
+	return false
 }