sqlite.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package sqlite
  2. import (
  3. "fmt"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "github.com/crowdsecurity/crowdsec/pkg/types"
  8. log "github.com/sirupsen/logrus"
  9. "github.com/jinzhu/gorm"
  10. _ "github.com/jinzhu/gorm/dialects/sqlite"
  11. _ "github.com/mattn/go-sqlite3"
  12. "gopkg.in/tomb.v2"
  13. )
  14. type Context struct {
  15. Db *gorm.DB //Pointer to sqlite db
  16. tx *gorm.DB //Pointer to current transaction (flushed on a regular basis)
  17. lastCommit time.Time
  18. flush bool
  19. count int32
  20. lock sync.Mutex //booboo
  21. PusherTomb tomb.Tomb
  22. //to manage auto cleanup : max number of records *or* oldest
  23. maxEventRetention int
  24. maxDurationRetention time.Duration
  25. }
  26. func NewSQLite(cfg map[string]string) (*Context, error) {
  27. var err error
  28. c := &Context{}
  29. if v, ok := cfg["max_records"]; ok {
  30. c.maxEventRetention, err = strconv.Atoi(v)
  31. if err != nil {
  32. log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
  33. }
  34. }
  35. if v, ok := cfg["max_records_duration"]; ok {
  36. c.maxDurationRetention, err = time.ParseDuration(v)
  37. if err != nil {
  38. log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
  39. }
  40. }
  41. log.Warningf("NEW SQLITE : %+v", cfg)
  42. if _, ok := cfg["db_path"]; !ok {
  43. return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
  44. }
  45. if cfg["db_path"] == "" {
  46. return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
  47. }
  48. c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
  49. if err != nil {
  50. return nil, fmt.Errorf("failed to open %s : %s", cfg["db_path"], err)
  51. }
  52. if val, ok := cfg["debug"]; ok && val == "true" {
  53. log.Infof("Enabling debug for sqlite")
  54. c.Db.LogMode(true)
  55. }
  56. c.Db.LogMode(true)
  57. c.flush, _ = strconv.ParseBool(cfg["flush"])
  58. // Migrate the schema
  59. c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
  60. c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
  61. c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
  62. c.tx = c.Db.Begin()
  63. c.lastCommit = time.Now()
  64. ret := c.tx.Commit()
  65. if ret.Error != nil {
  66. return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
  67. }
  68. c.tx = c.Db.Begin()
  69. if c.tx == nil {
  70. return nil, fmt.Errorf("failed to begin sqlite transac : %s", err)
  71. }
  72. //random attempt
  73. //c.maxEventRetention = 100
  74. c.PusherTomb.Go(func() error {
  75. c.AutoCommit()
  76. return nil
  77. })
  78. return c, nil
  79. }