Pārlūkot izejas kodu

chore: simplify pkg/database/alerts (#2062)

mmetc 2 gadi atpakaļ
vecāks
revīzija
20a1bc7d44
1 mainītis faili ar 241 papildinājumiem un 226 dzēšanām
  1. 241 226
      pkg/database/alerts.go

+ 241 - 226
pkg/database/alerts.go

@@ -32,37 +32,45 @@ const (
 	decisionBulkSize = 50
 )
 
-func formatAlertAsString(machineId string, alert *models.Alert) []string {
-	var retStr []string
+func formatAlertCN(source models.Source) string {
+	cn := source.Cn
 
-	/**/
-	src := ""
-	if alert.Source != nil {
-		if *alert.Source.Scope == types.Ip {
-			src = fmt.Sprintf("ip %s", *alert.Source.Value)
-			if alert.Source.Cn != "" {
-				src += " (" + alert.Source.Cn
-				if alert.Source.AsNumber != "" {
-					src += "/" + alert.Source.AsNumber
-				}
-				src += ")"
-			}
-		} else if *alert.Source.Scope == types.Range {
-			src = fmt.Sprintf("range %s", *alert.Source.Value)
-			if alert.Source.Cn != "" {
-				src += " (" + alert.Source.Cn
-				if alert.Source.AsNumber != "" {
-					src += "/" + alert.Source.AsNumber
-				}
-				src += ")"
-			}
-		} else {
-			src = fmt.Sprintf("%s %s", *alert.Source.Scope, *alert.Source.Value)
+	if source.AsNumber != "" {
+		cn += "/" + source.AsNumber
+	}
+
+	return cn
+}
+
+func formatAlertSource(alert *models.Alert) string {
+	if alert.Source == nil {
+		return "empty source"
+	}
+
+	if *alert.Source.Scope == types.Ip {
+		ret := "ip " + *alert.Source.Value
+		cn := formatAlertCN(*alert.Source)
+		if cn != "" {
+			ret += " (" + cn + ")"
 		}
-	} else {
-		src = "empty source"
+		return ret
 	}
 
+	if *alert.Source.Scope == types.Range {
+		ret := "range " + *alert.Source.Value
+		cn := formatAlertCN(*alert.Source)
+		if cn != "" {
+			ret += " (" + cn + ")"
+		}
+		return ret
+	}
+
+	return *alert.Source.Scope + " " + *alert.Source.Value
+}
+
+func formatAlertAsString(machineId string, alert *models.Alert) []string {
+	src := formatAlertSource(alert)
+
 	/**/
 	msg := ""
 	if alert.Scenario != nil && *alert.Scenario != "" {
@@ -75,35 +83,37 @@ func formatAlertAsString(machineId string, alert *models.Alert) []string {
 
 	reason := fmt.Sprintf("%s by %s", msg, src)
 
-	if len(alert.Decisions) > 0 {
-		for i, decisionItem := range alert.Decisions {
-			decision := ""
-			if alert.Simulated != nil && *alert.Simulated {
-				decision = "(simulated alert)"
-			} else if decisionItem.Simulated != nil && *decisionItem.Simulated {
-				decision = "(simulated decision)"
-			}
-			if log.GetLevel() >= log.DebugLevel {
-				/*spew is expensive*/
-				log.Debugf("%s", spew.Sdump(decisionItem))
-			}
-			if len(alert.Decisions) > 1 {
-				reason = fmt.Sprintf("%s for %d/%d decisions", msg, i+1, len(alert.Decisions))
-			}
-			machineIdOrigin := ""
-			if machineId == "" {
-				machineIdOrigin = *decisionItem.Origin
-			} else {
-				machineIdOrigin = fmt.Sprintf("%s/%s", machineId, *decisionItem.Origin)
-			}
+	if len(alert.Decisions) == 0 {
+		return []string{fmt.Sprintf("(%s) alert : %s", machineId, reason)}
+	}
+
+	var retStr []string
 
-			decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
-				*decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
-			retStr = append(retStr,
-				fmt.Sprintf("(%s) %s : %s", machineIdOrigin, reason, decision))
+	for i, decisionItem := range alert.Decisions {
+		decision := ""
+		if alert.Simulated != nil && *alert.Simulated {
+			decision = "(simulated alert)"
+		} else if decisionItem.Simulated != nil && *decisionItem.Simulated {
+			decision = "(simulated decision)"
 		}
-	} else {
-		retStr = append(retStr, fmt.Sprintf("(%s) alert : %s", machineId, reason))
+		if log.GetLevel() >= log.DebugLevel {
+			/*spew is expensive*/
+			log.Debugf("%s", spew.Sdump(decisionItem))
+		}
+		if len(alert.Decisions) > 1 {
+			reason = fmt.Sprintf("%s for %d/%d decisions", msg, i+1, len(alert.Decisions))
+		}
+		machineIdOrigin := ""
+		if machineId == "" {
+			machineIdOrigin = *decisionItem.Origin
+		} else {
+			machineIdOrigin = fmt.Sprintf("%s/%s", machineId, *decisionItem.Origin)
+		}
+
+		decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
+			*decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
+		retStr = append(retStr,
+			fmt.Sprintf("(%s) %s : %s", machineIdOrigin, reason, decision))
 	}
 	return retStr
 }
@@ -161,90 +171,91 @@ func (c *Client) CreateOrUpdateAlert(machineID string, alertItem *models.Alert)
 		}
 	}
 
+	if len(missingUuids) == 0 {
+		log.Warningf("alert %s was already complete with decisions %+v", alertItem.UUID, foundUuids)
+		return "", nil
+	}
+
 	//add any and all missing decisions based on their uuids
-	if len(missingUuids) > 0 {
-		//prepare missing decisions
-		missingDecisions := []*models.Decision{}
-		for _, uuid := range missingUuids {
-			for _, newDecision := range alertItem.Decisions {
-				if newDecision.UUID == uuid {
-					missingDecisions = append(missingDecisions, newDecision)
-				}
+	//prepare missing decisions
+	missingDecisions := []*models.Decision{}
+	for _, uuid := range missingUuids {
+		for _, newDecision := range alertItem.Decisions {
+			if newDecision.UUID == uuid {
+				missingDecisions = append(missingDecisions, newDecision)
 			}
 		}
+	}
 
-		//add missing decisions
-		log.Debugf("Adding %d missing decisions to alert %s", len(missingDecisions), foundAlert.UUID)
+	//add missing decisions
+	log.Debugf("Adding %d missing decisions to alert %s", len(missingDecisions), foundAlert.UUID)
 
-		decisions := make([]*ent.Decision, 0)
-		decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
+	decisions := make([]*ent.Decision, 0)
+	decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
 
-		for i, decisionItem := range missingDecisions {
-			var start_ip, start_sfx, end_ip, end_sfx int64
-			var sz int
+	for i, decisionItem := range missingDecisions {
+		var start_ip, start_sfx, end_ip, end_sfx int64
+		var sz int
 
-			/*if the scope is IP or Range, convert the value to integers */
-			if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
-				sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
-				if err != nil {
-					return "", errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
-				}
-			}
-			decisionDuration, err := time.ParseDuration(*decisionItem.Duration)
-			if err != nil {
-				log.Warningf("invalid duration %s for decision %s", *decisionItem.Duration, decisionItem.UUID)
-				continue
-			}
-			//use the created_at from the alert instead
-			alertTime, err := time.Parse(time.RFC3339, alertItem.CreatedAt)
+		/*if the scope is IP or Range, convert the value to integers */
+		if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
+			sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
 			if err != nil {
-				log.Errorf("unable to parse alert time %s : %s", alertItem.CreatedAt, err)
-				alertTime = time.Now()
-			}
-			decisionUntil := alertTime.UTC().Add(decisionDuration)
-
-			decisionCreate := c.Ent.Decision.Create().
-				SetUntil(decisionUntil).
-				SetScenario(*decisionItem.Scenario).
-				SetType(*decisionItem.Type).
-				SetStartIP(start_ip).
-				SetStartSuffix(start_sfx).
-				SetEndIP(end_ip).
-				SetEndSuffix(end_sfx).
-				SetIPSize(int64(sz)).
-				SetValue(*decisionItem.Value).
-				SetScope(*decisionItem.Scope).
-				SetOrigin(*decisionItem.Origin).
-				SetSimulated(*alertItem.Simulated).
-				SetUUID(decisionItem.UUID)
-
-			decisionBulk = append(decisionBulk, decisionCreate)
-			if len(decisionBulk) == decisionBulkSize {
-				decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
-				if err != nil {
-					return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
-
-				}
-				decisions = append(decisions, decisionsCreateRet...)
-				if len(missingDecisions)-i <= decisionBulkSize {
-					decisionBulk = make([]*ent.DecisionCreate, 0, (len(missingDecisions) - i))
-				} else {
-					decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
-				}
+				return "", errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
 			}
 		}
-		decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
+		decisionDuration, err := time.ParseDuration(*decisionItem.Duration)
 		if err != nil {
-			return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
+			log.Warningf("invalid duration %s for decision %s", *decisionItem.Duration, decisionItem.UUID)
+			continue
 		}
-		decisions = append(decisions, decisionsCreateRet...)
-		//now that we bulk created missing decisions, let's update the alert
-		err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisions...).Exec(c.CTX)
+		//use the created_at from the alert instead
+		alertTime, err := time.Parse(time.RFC3339, alertItem.CreatedAt)
 		if err != nil {
-			return "", errors.Wrapf(err, "updating alert %s : %s", alertItem.UUID, err)
+			log.Errorf("unable to parse alert time %s : %s", alertItem.CreatedAt, err)
+			alertTime = time.Now()
 		}
-	} else {
-		log.Warningf("alert %s was already complete with decisions %+v", alertItem.UUID, foundUuids)
+		decisionUntil := alertTime.UTC().Add(decisionDuration)
+
+		decisionCreate := c.Ent.Decision.Create().
+			SetUntil(decisionUntil).
+			SetScenario(*decisionItem.Scenario).
+			SetType(*decisionItem.Type).
+			SetStartIP(start_ip).
+			SetStartSuffix(start_sfx).
+			SetEndIP(end_ip).
+			SetEndSuffix(end_sfx).
+			SetIPSize(int64(sz)).
+			SetValue(*decisionItem.Value).
+			SetScope(*decisionItem.Scope).
+			SetOrigin(*decisionItem.Origin).
+			SetSimulated(*alertItem.Simulated).
+			SetUUID(decisionItem.UUID)
+
+		decisionBulk = append(decisionBulk, decisionCreate)
+		if len(decisionBulk) == decisionBulkSize {
+			decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
+			if err != nil {
+				return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
+
+			}
+			decisions = append(decisions, decisionsCreateRet...)
+			if len(missingDecisions)-i <= decisionBulkSize {
+				decisionBulk = make([]*ent.DecisionCreate, 0, (len(missingDecisions) - i))
+			} else {
+				decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
+			}
+		}
+	}
+	decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
+	if err != nil {
+		return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
+	}
+	decisions = append(decisions, decisionsCreateRet...)
+	//now that we bulk created missing decisions, let's update the alert
+	err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisions...).Exec(c.CTX)
+	if err != nil {
+		return "", errors.Wrapf(err, "updating alert %s : %s", alertItem.UUID, err)
 	}
 
 	return "", nil
@@ -280,9 +291,7 @@ func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]str
 // 1st pull, you get decisions [1,2,3]. it inserts [1,2,3]
 // 2nd pull, you get decisions [1,2,3,4]. it inserts [1,2,3,4] and will try to delete [1,2,3,4] with a different alert ID and same origin
 func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
-
 	var err error
-	var deleted, inserted int
 
 	if alertItem == nil {
 		return 0, 0, 0, fmt.Errorf("nil alert")
@@ -335,118 +344,92 @@ func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, in
 		return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
 	}
 
-	if len(alertItem.Decisions) > 0 {
-		txClient, err := c.Ent.Tx(c.CTX)
+	if len(alertItem.Decisions) == 0 {
+		return alertRef.ID, 0, 0, nil
+	}
+
+	txClient, err := c.Ent.Tx(c.CTX)
+	if err != nil {
+		return 0, 0, 0, errors.Wrapf(BulkError, "error creating transaction : %s", err)
+	}
+	decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
+	valueList := make([]string, 0, decisionBulkSize)
+	DecOrigin := CapiMachineID
+	if *alertItem.Decisions[0].Origin == CapiMachineID || *alertItem.Decisions[0].Origin == CapiListsMachineID {
+		DecOrigin = *alertItem.Decisions[0].Origin
+	} else {
+		log.Warningf("unexpected origin %s", *alertItem.Decisions[0].Origin)
+	}
+
+	deleted := 0
+	inserted := 0
+
+	for i, decisionItem := range alertItem.Decisions {
+		var start_ip, start_sfx, end_ip, end_sfx int64
+		var sz int
+		if decisionItem.Duration == nil {
+			log.Warning("nil duration in community decision")
+			continue
+		}
+		duration, err := time.ParseDuration(*decisionItem.Duration)
 		if err != nil {
-			return 0, 0, 0, errors.Wrapf(BulkError, "error creating transaction : %s", err)
+			rollbackErr := txClient.Rollback()
+			if rollbackErr != nil {
+				log.Errorf("rollback error: %s", rollbackErr)
+			}
+			return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
 		}
-		decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
-		valueList := make([]string, 0, decisionBulkSize)
-		DecOrigin := CapiMachineID
-		if *alertItem.Decisions[0].Origin == CapiMachineID || *alertItem.Decisions[0].Origin == CapiListsMachineID {
-			DecOrigin = *alertItem.Decisions[0].Origin
-		} else {
-			log.Warningf("unexpected origin %s", *alertItem.Decisions[0].Origin)
+		if decisionItem.Scope == nil {
+			log.Warning("nil scope in community decision")
+			continue
 		}
-		for i, decisionItem := range alertItem.Decisions {
-			var start_ip, start_sfx, end_ip, end_sfx int64
-			var sz int
-			if decisionItem.Duration == nil {
-				log.Warning("nil duration in community decision")
-				continue
-			}
-			duration, err := time.ParseDuration(*decisionItem.Duration)
+		/*if the scope is IP or Range, convert the value to integers */
+		if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
+			sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
 			if err != nil {
 				rollbackErr := txClient.Rollback()
 				if rollbackErr != nil {
 					log.Errorf("rollback error: %s", rollbackErr)
 				}
-				return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
-			}
-			if decisionItem.Scope == nil {
-				log.Warning("nil scope in community decision")
-				continue
-			}
-			/*if the scope is IP or Range, convert the value to integers */
-			if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
-				sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
-				if err != nil {
-					rollbackErr := txClient.Rollback()
-					if rollbackErr != nil {
-						log.Errorf("rollback error: %s", rollbackErr)
-					}
-					return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
-				}
-			}
-			/*bulk insert some new decisions*/
-			decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
-				SetUntil(ts.Add(duration)).
-				SetScenario(*decisionItem.Scenario).
-				SetType(*decisionItem.Type).
-				SetStartIP(start_ip).
-				SetStartSuffix(start_sfx).
-				SetEndIP(end_ip).
-				SetEndSuffix(end_sfx).
-				SetIPSize(int64(sz)).
-				SetValue(*decisionItem.Value).
-				SetScope(*decisionItem.Scope).
-				SetOrigin(*decisionItem.Origin).
-				SetSimulated(*alertItem.Simulated).
-				SetOwner(alertRef))
-
-			/*for bulk delete of duplicate decisions*/
-			if decisionItem.Value == nil {
-				log.Warning("nil value in community decision")
-				continue
+				return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
 			}
-			valueList = append(valueList, *decisionItem.Value)
+		}
+		/*bulk insert some new decisions*/
+		decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
+			SetUntil(ts.Add(duration)).
+			SetScenario(*decisionItem.Scenario).
+			SetType(*decisionItem.Type).
+			SetStartIP(start_ip).
+			SetStartSuffix(start_sfx).
+			SetEndIP(end_ip).
+			SetEndSuffix(end_sfx).
+			SetIPSize(int64(sz)).
+			SetValue(*decisionItem.Value).
+			SetScope(*decisionItem.Scope).
+			SetOrigin(*decisionItem.Origin).
+			SetSimulated(*alertItem.Simulated).
+			SetOwner(alertRef))
 
-			if len(decisionBulk) == decisionBulkSize {
+		/*for bulk delete of duplicate decisions*/
+		if decisionItem.Value == nil {
+			log.Warning("nil value in community decision")
+			continue
+		}
+		valueList = append(valueList, *decisionItem.Value)
 
-				insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
-				if err != nil {
-					rollbackErr := txClient.Rollback()
-					if rollbackErr != nil {
-						log.Errorf("rollback error: %s", rollbackErr)
-					}
-					return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
-				}
-				inserted += len(insertedDecisions)
-
-				/*Deleting older decisions from capi*/
-				deletedDecisions, err := txClient.Decision.Delete().
-					Where(decision.And(
-						decision.OriginEQ(DecOrigin),
-						decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
-						decision.ValueIn(valueList...),
-					)).Exec(c.CTX)
-				if err != nil {
-					rollbackErr := txClient.Rollback()
-					if rollbackErr != nil {
-						log.Errorf("rollback error: %s", rollbackErr)
-					}
-					return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
-				}
-				deleted += deletedDecisions
-
-				if len(alertItem.Decisions)-i <= decisionBulkSize {
-					decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
-					valueList = make([]string, 0, (len(alertItem.Decisions) - i))
-				} else {
-					decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
-					valueList = make([]string, 0, decisionBulkSize)
+		if len(decisionBulk) == decisionBulkSize {
+
+			insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
+			if err != nil {
+				rollbackErr := txClient.Rollback()
+				if rollbackErr != nil {
+					log.Errorf("rollback error: %s", rollbackErr)
 				}
+				return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
 			}
+			inserted += len(insertedDecisions)
 
-		}
-		log.Debugf("deleted %d decisions for %s vs %s", deleted, DecOrigin, *alertItem.Decisions[0].Origin)
-		insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
-		if err != nil {
-			return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
-		}
-		inserted += len(insertedDecisions)
-		/*Deleting older decisions from capi*/
-		if len(valueList) > 0 {
+			/*Deleting older decisions from capi*/
 			deletedDecisions, err := txClient.Decision.Delete().
 				Where(decision.And(
 					decision.OriginEQ(DecOrigin),
@@ -461,15 +444,47 @@ func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, in
 				return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
 			}
 			deleted += deletedDecisions
+
+			if len(alertItem.Decisions)-i <= decisionBulkSize {
+				decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
+				valueList = make([]string, 0, (len(alertItem.Decisions) - i))
+			} else {
+				decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
+				valueList = make([]string, 0, decisionBulkSize)
+			}
 		}
-		err = txClient.Commit()
+
+	}
+	log.Debugf("deleted %d decisions for %s vs %s", deleted, DecOrigin, *alertItem.Decisions[0].Origin)
+	insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
+	if err != nil {
+		return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
+	}
+	inserted += len(insertedDecisions)
+	/*Deleting older decisions from capi*/
+	if len(valueList) > 0 {
+		deletedDecisions, err := txClient.Decision.Delete().
+			Where(decision.And(
+				decision.OriginEQ(DecOrigin),
+				decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
+				decision.ValueIn(valueList...),
+			)).Exec(c.CTX)
 		if err != nil {
 			rollbackErr := txClient.Rollback()
 			if rollbackErr != nil {
 				log.Errorf("rollback error: %s", rollbackErr)
 			}
-			return 0, 0, 0, errors.Wrapf(BulkError, "error committing transaction : %s", err)
+			return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
+		}
+		deleted += deletedDecisions
+	}
+	err = txClient.Commit()
+	if err != nil {
+		rollbackErr := txClient.Rollback()
+		if rollbackErr != nil {
+			log.Errorf("rollback error: %s", rollbackErr)
 		}
+		return 0, 0, 0, errors.Wrapf(BulkError, "error committing transaction : %s", err)
 	}
 
 	return alertRef.ID, inserted, deleted, nil