search.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. package cache
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/blevesearch/bleve/v2"
  11. "github.com/blevesearch/bleve/v2/analysis/lang/en"
  12. "github.com/blevesearch/bleve/v2/mapping"
  13. "github.com/blevesearch/bleve/v2/search/query"
  14. "github.com/gabriel-vasile/mimetype"
  15. "github.com/uozi-tech/cosy/logger"
  16. )
  17. // SearchDocument represents a document in the search index
  18. type SearchDocument struct {
  19. ID string `json:"id"`
  20. Type string `json:"type"` // "site", "stream", or "config"
  21. Name string `json:"name"` // extracted from filename
  22. Path string `json:"path"` // file path
  23. Content string `json:"content"` // file content
  24. UpdatedAt time.Time `json:"updated_at"`
  25. }
  26. // SearchResult represents a search result
  27. type SearchResult struct {
  28. Document SearchDocument `json:"document"`
  29. Score float64 `json:"score"`
  30. }
  31. // SearchIndexer manages the Bleve search index
  32. type SearchIndexer struct {
  33. index bleve.Index
  34. indexPath string
  35. indexMutex sync.RWMutex
  36. ctx context.Context
  37. cancel context.CancelFunc
  38. cleanupOnce sync.Once
  39. // Memory management
  40. totalContentSize int64
  41. documentCount int64
  42. maxMemoryUsage int64
  43. memoryMutex sync.RWMutex
  44. }
  45. var (
  46. searchIndexer *SearchIndexer
  47. searchIndexerOnce sync.Once
  48. )
  49. // GetSearchIndexer returns the singleton search indexer instance
  50. func GetSearchIndexer() *SearchIndexer {
  51. searchIndexerOnce.Do(func() {
  52. // Create a temporary directory for the index
  53. tempDir, err := os.MkdirTemp("", "nginx-ui-search-index-*")
  54. if err != nil {
  55. logger.Fatalf("Failed to create temp directory for search index: %v", err)
  56. }
  57. searchIndexer = &SearchIndexer{
  58. indexPath: tempDir,
  59. maxMemoryUsage: 100 * 1024 * 1024, // 100MB memory limit for indexed content
  60. }
  61. })
  62. return searchIndexer
  63. }
  64. // InitSearchIndex initializes the search index
  65. func InitSearchIndex(ctx context.Context) error {
  66. indexer := GetSearchIndexer()
  67. return indexer.Initialize(ctx)
  68. }
  69. // Initialize sets up the Bleve search index
  70. func (si *SearchIndexer) Initialize(ctx context.Context) error {
  71. si.indexMutex.Lock()
  72. defer si.indexMutex.Unlock()
  73. // Create a derived context for cleanup
  74. si.ctx, si.cancel = context.WithCancel(ctx)
  75. // Check if context is cancelled
  76. select {
  77. case <-ctx.Done():
  78. return ctx.Err()
  79. default:
  80. }
  81. // Try to open existing index, create new if it fails
  82. var err error
  83. si.index, err = bleve.Open(si.indexPath)
  84. if err != nil {
  85. // Check context again before creating new index
  86. select {
  87. case <-ctx.Done():
  88. return ctx.Err()
  89. default:
  90. }
  91. logger.Info("Creating new search index at:", si.indexPath)
  92. si.index, err = bleve.New(si.indexPath, si.createIndexMapping())
  93. if err != nil {
  94. return fmt.Errorf("failed to create search index: %w", err)
  95. }
  96. }
  97. // Register callback for config scanning
  98. RegisterCallback(si.handleConfigScan)
  99. // Start cleanup goroutine
  100. go si.watchContext()
  101. logger.Info("Search index initialized successfully")
  102. return nil
  103. }
  104. // watchContext monitors the context and cleans up when it's cancelled
  105. func (si *SearchIndexer) watchContext() {
  106. <-si.ctx.Done()
  107. si.cleanup()
  108. }
  109. // cleanup closes the index and removes the temporary directory
  110. func (si *SearchIndexer) cleanup() {
  111. si.cleanupOnce.Do(func() {
  112. logger.Info("Cleaning up search index...")
  113. si.indexMutex.Lock()
  114. defer si.indexMutex.Unlock()
  115. if si.index != nil {
  116. si.index.Close()
  117. si.index = nil
  118. }
  119. // Reset memory tracking
  120. si.memoryMutex.Lock()
  121. si.totalContentSize = 0
  122. si.documentCount = 0
  123. si.memoryMutex.Unlock()
  124. // Remove the temporary directory
  125. if err := os.RemoveAll(si.indexPath); err != nil {
  126. logger.Error("Failed to remove search index directory:", err)
  127. } else {
  128. logger.Info("Search index directory removed successfully")
  129. }
  130. })
  131. }
  132. // createIndexMapping creates the mapping for the search index
  133. func (si *SearchIndexer) createIndexMapping() mapping.IndexMapping {
  134. docMapping := bleve.NewDocumentMapping()
  135. // Text fields with standard analyzer
  136. textField := bleve.NewTextFieldMapping()
  137. textField.Analyzer = en.AnalyzerName
  138. textField.Store = true
  139. textField.Index = true
  140. // Keyword fields for exact match
  141. keywordField := bleve.NewKeywordFieldMapping()
  142. keywordField.Store = true
  143. keywordField.Index = true
  144. // Date field
  145. dateField := bleve.NewDateTimeFieldMapping()
  146. dateField.Store = true
  147. dateField.Index = true
  148. // Map fields to types
  149. fieldMappings := map[string]*mapping.FieldMapping{
  150. "id": keywordField,
  151. "type": keywordField,
  152. "path": keywordField,
  153. "name": textField,
  154. "content": textField,
  155. "updated_at": dateField,
  156. }
  157. for field, fieldMapping := range fieldMappings {
  158. docMapping.AddFieldMappingsAt(field, fieldMapping)
  159. }
  160. indexMapping := bleve.NewIndexMapping()
  161. indexMapping.DefaultMapping = docMapping
  162. indexMapping.DefaultAnalyzer = en.AnalyzerName
  163. return indexMapping
  164. }
  165. // handleConfigScan processes scanned config files and indexes them
  166. func (si *SearchIndexer) handleConfigScan(configPath string, content []byte) (err error) {
  167. // Add panic recovery to prevent the entire application from crashing
  168. defer func() {
  169. if r := recover(); r != nil {
  170. err = fmt.Errorf("panic during config scan: %v", r)
  171. logger.Error("Panic occurred while scanning config", "config_path", configPath, "content_size", len(content), "error", err)
  172. }
  173. }()
  174. // File size limit: 1MB to prevent memory overflow and improve performance
  175. const maxFileSize = 1024 * 1024 // 1MB
  176. if len(content) > maxFileSize {
  177. logger.Debugf("Skipping file due to size limit, path: %s, size: %d, limit: %d", configPath, len(content), maxFileSize)
  178. return nil
  179. }
  180. // Skip empty files
  181. if len(content) == 0 {
  182. return nil
  183. }
  184. // Basic content validation: check if it's a configuration file
  185. if !isConfigFile(content) {
  186. logger.Debugf("Skipping non-config file: %s", configPath)
  187. return nil
  188. }
  189. docType := si.determineConfigType(configPath)
  190. if docType == "" {
  191. return nil // Skip unsupported file types
  192. }
  193. doc := SearchDocument{
  194. ID: configPath,
  195. Type: docType,
  196. Name: filepath.Base(configPath),
  197. Path: configPath,
  198. Content: string(content),
  199. UpdatedAt: time.Now(),
  200. }
  201. return si.IndexDocument(doc)
  202. }
  203. // determineConfigType determines the type of config file based on path
  204. func (si *SearchIndexer) determineConfigType(configPath string) string {
  205. normalizedPath := filepath.ToSlash(configPath)
  206. switch {
  207. case strings.Contains(normalizedPath, "sites-available") || strings.Contains(normalizedPath, "sites-enabled"):
  208. return "site"
  209. case strings.Contains(normalizedPath, "streams-available") || strings.Contains(normalizedPath, "streams-enabled"):
  210. return "stream"
  211. default:
  212. return "config"
  213. }
  214. }
  215. // IndexDocument indexes a single document
  216. func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
  217. // Add panic recovery to prevent the entire application from crashing
  218. defer func() {
  219. if r := recover(); r != nil {
  220. err = fmt.Errorf("panic during indexing: %v", r)
  221. logger.Error("Panic occurred while indexing document", "document_id", doc.ID, "error", err)
  222. }
  223. }()
  224. // Additional size check as a safety measure
  225. if len(doc.Content) > 2*1024*1024 { // 2MB absolute limit
  226. return fmt.Errorf("document content too large: %d bytes", len(doc.Content))
  227. }
  228. // Check memory usage before indexing
  229. contentSize := int64(len(doc.Content))
  230. if !si.checkMemoryLimitBeforeIndexing(contentSize) {
  231. logger.Warn("Skipping document due to memory limit", "document_id", doc.ID, "content_size", contentSize)
  232. return nil
  233. }
  234. si.indexMutex.RLock()
  235. defer si.indexMutex.RUnlock()
  236. if si.index == nil {
  237. return fmt.Errorf("search index not initialized")
  238. }
  239. // Index the document
  240. err = si.index.Index(doc.ID, doc)
  241. if err != nil {
  242. return err
  243. }
  244. // Update memory usage tracking
  245. si.updateMemoryUsage(doc.ID, contentSize, true)
  246. // logger.Debugf("Indexing document: ID=%s, Type=%s, Name=%s, Path=%s",
  247. // doc.ID, doc.Type, doc.Name, doc.Path)
  248. return nil
  249. }
  250. // Search performs a search query
  251. func (si *SearchIndexer) Search(ctx context.Context, queryStr string, limit int) ([]SearchResult, error) {
  252. return si.searchWithType(ctx, queryStr, "", limit)
  253. }
  254. // SearchByType performs a search filtered by document type
  255. func (si *SearchIndexer) SearchByType(ctx context.Context, queryStr string, docType string, limit int) ([]SearchResult, error) {
  256. return si.searchWithType(ctx, queryStr, docType, limit)
  257. }
  258. // searchWithType performs the actual search with optional type filtering
  259. func (si *SearchIndexer) searchWithType(ctx context.Context, queryStr string, docType string, limit int) ([]SearchResult, error) {
  260. si.indexMutex.RLock()
  261. defer si.indexMutex.RUnlock()
  262. // Check if context is cancelled
  263. select {
  264. case <-ctx.Done():
  265. return nil, ctx.Err()
  266. default:
  267. }
  268. if si.index == nil {
  269. return nil, fmt.Errorf("search index not initialized")
  270. }
  271. if limit <= 0 {
  272. limit = 500 // Increase default limit to handle more results
  273. }
  274. query := si.buildQuery(queryStr, docType)
  275. searchRequest := bleve.NewSearchRequest(query)
  276. searchRequest.Size = limit
  277. searchRequest.Fields = []string{"*"}
  278. // Use a channel to handle search with context cancellation
  279. type searchResult struct {
  280. result *bleve.SearchResult
  281. err error
  282. }
  283. resultChan := make(chan searchResult, 1)
  284. go func() {
  285. result, err := si.index.Search(searchRequest)
  286. resultChan <- searchResult{result: result, err: err}
  287. }()
  288. // Wait for search result or context cancellation
  289. select {
  290. case <-ctx.Done():
  291. return nil, ctx.Err()
  292. case res := <-resultChan:
  293. if res.err != nil {
  294. return nil, fmt.Errorf("search execution failed: %w", res.err)
  295. }
  296. results := si.convertResults(res.result)
  297. // log the search execution
  298. logger.Debugf("Search index query '%s' (type: %s, limit: %d) returned %d results",
  299. queryStr, docType, limit, len(results))
  300. return results, nil
  301. }
  302. }
  303. // buildQuery builds a search query with optional type filtering
  304. func (si *SearchIndexer) buildQuery(queryStr string, docType string) query.Query {
  305. mainQuery := bleve.NewBooleanQuery()
  306. // Add type filter if specified
  307. if docType != "" {
  308. typeQuery := bleve.NewTermQuery(docType)
  309. typeQuery.SetField("type")
  310. mainQuery.AddMust(typeQuery)
  311. }
  312. // Add text search across name and content fields only
  313. textQuery := bleve.NewBooleanQuery()
  314. searchFields := []string{"name", "content"}
  315. for _, field := range searchFields {
  316. // Create a boolean query for this field to combine multiple query types
  317. fieldQuery := bleve.NewBooleanQuery()
  318. // 1. Exact match query (highest priority)
  319. matchQuery := bleve.NewMatchQuery(queryStr)
  320. matchQuery.SetField(field)
  321. matchQuery.SetBoost(3.0) // Higher boost for exact matches
  322. fieldQuery.AddShould(matchQuery)
  323. // 2. Prefix query for partial matches (e.g., "access" matches "access_log")
  324. prefixQuery := bleve.NewPrefixQuery(queryStr)
  325. prefixQuery.SetField(field)
  326. prefixQuery.SetBoost(2.0) // Medium boost for prefix matches
  327. fieldQuery.AddShould(prefixQuery)
  328. // 3. Wildcard query for more flexible matching
  329. wildcardQuery := bleve.NewWildcardQuery("*" + queryStr + "*")
  330. wildcardQuery.SetField(field)
  331. wildcardQuery.SetBoost(1.5) // Lower boost for wildcard matches
  332. fieldQuery.AddShould(wildcardQuery)
  333. // 4. Fuzzy match query (allows 1 character difference)
  334. fuzzyQuery := bleve.NewFuzzyQuery(queryStr)
  335. fuzzyQuery.SetField(field)
  336. fuzzyQuery.SetFuzziness(1)
  337. fuzzyQuery.SetBoost(1.0) // Lowest boost for fuzzy matches
  338. fieldQuery.AddShould(fuzzyQuery)
  339. textQuery.AddShould(fieldQuery)
  340. }
  341. if docType != "" {
  342. mainQuery.AddMust(textQuery)
  343. } else {
  344. return textQuery
  345. }
  346. return mainQuery
  347. }
  348. // convertResults converts Bleve search results to our SearchResult format
  349. func (si *SearchIndexer) convertResults(searchResult *bleve.SearchResult) []SearchResult {
  350. results := make([]SearchResult, 0, len(searchResult.Hits))
  351. for _, hit := range searchResult.Hits {
  352. doc := SearchDocument{
  353. ID: si.getStringField(hit.Fields, "id"),
  354. Type: si.getStringField(hit.Fields, "type"),
  355. Name: si.getStringField(hit.Fields, "name"),
  356. Path: si.getStringField(hit.Fields, "path"),
  357. Content: si.getStringField(hit.Fields, "content"),
  358. }
  359. // Parse updated_at if present
  360. if updatedAtStr := si.getStringField(hit.Fields, "updated_at"); updatedAtStr != "" {
  361. if updatedAt, err := time.Parse(time.RFC3339, updatedAtStr); err == nil {
  362. doc.UpdatedAt = updatedAt
  363. }
  364. }
  365. results = append(results, SearchResult{
  366. Document: doc,
  367. Score: hit.Score,
  368. })
  369. }
  370. return results
  371. }
  372. // getStringField safely gets a string field from search results
  373. func (si *SearchIndexer) getStringField(fields map[string]interface{}, fieldName string) string {
  374. if value, ok := fields[fieldName]; ok {
  375. if str, ok := value.(string); ok {
  376. return str
  377. }
  378. }
  379. return ""
  380. }
  381. // DeleteDocument removes a document from the index
  382. func (si *SearchIndexer) DeleteDocument(docID string) error {
  383. si.indexMutex.RLock()
  384. defer si.indexMutex.RUnlock()
  385. if si.index == nil {
  386. return fmt.Errorf("search index not initialized")
  387. }
  388. // Note: We don't track the exact size of deleted documents here
  389. // as it would require storing document sizes separately.
  390. // The memory tracking will reset during periodic cleanups or restarts.
  391. return si.index.Delete(docID)
  392. }
  393. // RebuildIndex rebuilds the entire search index
  394. func (si *SearchIndexer) RebuildIndex(ctx context.Context) error {
  395. si.indexMutex.Lock()
  396. defer si.indexMutex.Unlock()
  397. // Check if context is cancelled
  398. select {
  399. case <-ctx.Done():
  400. return ctx.Err()
  401. default:
  402. }
  403. if si.index != nil {
  404. si.index.Close()
  405. }
  406. // Check context before removing old index
  407. select {
  408. case <-ctx.Done():
  409. return ctx.Err()
  410. default:
  411. }
  412. // Remove old index
  413. if err := os.RemoveAll(si.indexPath); err != nil {
  414. logger.Error("Failed to remove old index:", err)
  415. }
  416. // Check context before creating new index
  417. select {
  418. case <-ctx.Done():
  419. return ctx.Err()
  420. default:
  421. }
  422. // Create new index
  423. var err error
  424. si.index, err = bleve.New(si.indexPath, si.createIndexMapping())
  425. if err != nil {
  426. return fmt.Errorf("failed to create new index: %w", err)
  427. }
  428. logger.Info("Search index rebuilt successfully")
  429. return nil
  430. }
  431. // GetIndexStats returns statistics about the search index
  432. func (si *SearchIndexer) GetIndexStats() (map[string]interface{}, error) {
  433. si.indexMutex.RLock()
  434. defer si.indexMutex.RUnlock()
  435. if si.index == nil {
  436. return nil, fmt.Errorf("search index not initialized")
  437. }
  438. docCount, err := si.index.DocCount()
  439. if err != nil {
  440. return nil, err
  441. }
  442. // Get memory usage statistics
  443. totalContentSize, trackedDocCount, maxMemoryUsage := si.getMemoryUsage()
  444. return map[string]interface{}{
  445. "document_count": docCount,
  446. "tracked_document_count": trackedDocCount,
  447. "total_content_size": totalContentSize,
  448. "max_memory_usage": maxMemoryUsage,
  449. "memory_usage_percent": float64(totalContentSize) / float64(maxMemoryUsage) * 100,
  450. "index_path": si.indexPath,
  451. }, nil
  452. }
  453. // Close closes the search index and triggers cleanup
  454. func (si *SearchIndexer) Close() error {
  455. if si.cancel != nil {
  456. si.cancel()
  457. }
  458. si.cleanup()
  459. return nil
  460. }
  461. // Convenience functions for different search types
  462. // SearchSites searches only site configurations
  463. func SearchSites(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  464. return GetSearchIndexer().SearchByType(ctx, query, "site", limit)
  465. }
  466. // SearchStreams searches only stream configurations
  467. func SearchStreams(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  468. return GetSearchIndexer().SearchByType(ctx, query, "stream", limit)
  469. }
  470. // SearchConfigs searches only general configurations
  471. func SearchConfigs(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  472. return GetSearchIndexer().SearchByType(ctx, query, "config", limit)
  473. }
  474. // SearchAll searches across all configuration types
  475. func SearchAll(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  476. return GetSearchIndexer().Search(ctx, query, limit)
  477. }
  478. // checkMemoryLimitBeforeIndexing checks if adding new content would exceed memory limits
  479. func (si *SearchIndexer) checkMemoryLimitBeforeIndexing(contentSize int64) bool {
  480. si.memoryMutex.RLock()
  481. defer si.memoryMutex.RUnlock()
  482. // Check if adding this content would exceed the memory limit
  483. newTotalSize := si.totalContentSize + contentSize
  484. if newTotalSize > si.maxMemoryUsage {
  485. logger.Debugf("Memory limit would be exceeded: current=%d, new=%d, limit=%d",
  486. si.totalContentSize, newTotalSize, si.maxMemoryUsage)
  487. return false
  488. }
  489. // Also check document count limit (max 1000 documents)
  490. if si.documentCount >= 1000 {
  491. logger.Debugf("Document count limit reached: %d", si.documentCount)
  492. return false
  493. }
  494. return true
  495. }
  496. // updateMemoryUsage updates the memory usage tracking
  497. func (si *SearchIndexer) updateMemoryUsage(documentID string, contentSize int64, isAddition bool) {
  498. si.memoryMutex.Lock()
  499. defer si.memoryMutex.Unlock()
  500. if isAddition {
  501. si.totalContentSize += contentSize
  502. si.documentCount++
  503. logger.Debugf("Added document %s: size=%d, total_size=%d, count=%d",
  504. documentID, contentSize, si.totalContentSize, si.documentCount)
  505. } else {
  506. si.totalContentSize -= contentSize
  507. si.documentCount--
  508. if si.totalContentSize < 0 {
  509. si.totalContentSize = 0
  510. }
  511. if si.documentCount < 0 {
  512. si.documentCount = 0
  513. }
  514. logger.Debugf("Removed document %s: size=%d, total_size=%d, count=%d",
  515. documentID, contentSize, si.totalContentSize, si.documentCount)
  516. }
  517. }
  518. // getMemoryUsage returns current memory usage statistics
  519. func (si *SearchIndexer) getMemoryUsage() (int64, int64, int64) {
  520. si.memoryMutex.RLock()
  521. defer si.memoryMutex.RUnlock()
  522. return si.totalContentSize, si.documentCount, si.maxMemoryUsage
  523. }
  524. // isConfigFile checks if the content is a text/plain file (most nginx configs)
  525. func isConfigFile(content []byte) bool {
  526. if len(content) == 0 {
  527. return false // Empty files are not useful for configuration
  528. }
  529. // Detect MIME type and only accept text/plain
  530. mtype := mimetype.Detect(content)
  531. if mtype.Is("text/plain") {
  532. return true
  533. }
  534. logger.Debugf("Skipping non-text/plain file with MIME type: %s", mtype.String())
  535. return false
  536. }