add the 'expire by age' func

This commit is contained in:
Thibault bui Koechlin 2020-06-26 10:09:06 +02:00
parent ff9e00d411
commit 853dd773f8

View file

@ -35,8 +35,47 @@ func (c *Context) Flush() error {
}
c.tx = c.Db.Begin()
c.lastCommit = time.Now()
c.DeleteExpired()
return nil
}
func (c *Context) CleanUpRecordsByAge() error {
//let's fetch all expired records that are more than XX days olds
sos := []types.BanApplication{}
if c.maxDurationRetention == 0 {
return nil
}
//look for soft-deleted events that are OLDER than maxDurationRetention
ret := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").
Where(fmt.Sprintf("deleted_at > data('now','-%d minutes')", int(c.maxDurationRetention.Minutes()))).
Order("updated_at desc").Find(&sos)
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to get count of old records")
}
//no events elligible
if len(sos) == 0 || ret.RowsAffected == 0 {
return nil
}
//let's do it in a single transaction
delTx := c.Db.Unscoped().Begin()
delRecords := 0
for _, record := range sos {
copy := record
delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
delTx.Unscoped().Table("ban_applications").Delete(&copy)
//we need to delete associations : event_sequences, signal_occurences
delRecords++
}
ret = delTx.Unscoped().Commit()
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to delete records")
}
log.Printf("Deleted %d expired records older than %s", delRecords, c.maxDurationRetention)
return nil
}
@ -95,6 +134,7 @@ func (c *Context) AutoCommit() {
log.Warningf("starting autocommit")
ticker := time.NewTicker(200 * time.Millisecond)
cleanUpTicker := time.NewTicker(1 * time.Minute)
expireTicker := time.NewTicker(1 * time.Second)
for {
select {
case <-c.PusherTomb.Dying():
@ -113,6 +153,10 @@ func (c *Context) AutoCommit() {
log.Errorf("error while closing db : %s", err)
}
return
case <-expireTicker.C:
if err := c.DeleteExpired(); err != nil {
log.Errorf("Error while deleting expired records: %s", err)
}
case <-ticker.C:
if atomic.LoadInt32(&c.count) != 0 &&
(atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
@ -123,9 +167,12 @@ func (c *Context) AutoCommit() {
}
case <-cleanUpTicker.C:
if err := c.CleanUpRecordsByCount(); err != nil {
log.Errorf("error in auto-cleanup : %s", err)
log.Errorf("error in max records cleanup : %s", err)
}
if err := c.CleanUpRecordsByAge(); err != nil {
log.Errorf("error in old records cleanup : %s", err)
}
}
}
}