Jelajahi Sumber

push to CAPI in go routine (#489)

Co-authored-by: AlteredCoder <AlteredCoder>
AlteredCoder 4 tahun lalu
induk
melakukan
c6eb2afa20
1 mengubah file dengan 35 tambahan dan 16 penghapusan
  1. 35 16
      pkg/apiserver/apic.go

+ 35 - 16
pkg/apiserver/apic.go

@@ -151,8 +151,8 @@ func (a *apic) Push() error {
 			if len(cache) == 0 {
 			if len(cache) == 0 {
 				return nil
 				return nil
 			}
 			}
-			err := a.Send(&cache)
-			return err
+			go a.Send(&cache)
+			return nil
 		case <-ticker.C:
 		case <-ticker.C:
 			if len(cache) > 0 {
 			if len(cache) > 0 {
 				a.mu.Lock()
 				a.mu.Lock()
@@ -160,15 +160,7 @@ func (a *apic) Push() error {
 				cache = make(models.AddSignalsRequest, 0)
 				cache = make(models.AddSignalsRequest, 0)
 				a.mu.Unlock()
 				a.mu.Unlock()
 				log.Infof("Signal push: %d signals to push", len(cacheCopy))
 				log.Infof("Signal push: %d signals to push", len(cacheCopy))
-				err := a.Send(&cacheCopy)
-				if err != nil {
-					log.Errorf("while sending signal to Central API : %s", err)
-					log.Debugf("dump: %+v", cacheCopy)
-					/*
-						even in case of error, we don't want to return here, or we need to kill everything.
-						this go-routine is in charge of pushing the signals to LAPI and is emptying the CAPIChan
-					*/
-				}
+				go a.Send(&cacheCopy)
 			}
 			}
 		case alerts := <-a.alertToPush:
 		case alerts := <-a.alertToPush:
 			var signals []*models.AddSignalsRequestItem
 			var signals []*models.AddSignalsRequestItem
@@ -182,7 +174,7 @@ func (a *apic) Push() error {
 	}
 	}
 }
 }
 
 
-func (a *apic) Send(cache *models.AddSignalsRequest) error {
+func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
 	/*we do have a problem with this :
 	/*we do have a problem with this :
 	The apic.Push background routine reads from alertToPush chan.
 	The apic.Push background routine reads from alertToPush chan.
 	This chan is filled by Controller.CreateAlert
 	This chan is filled by Controller.CreateAlert
@@ -194,10 +186,37 @@ func (a *apic) Send(cache *models.AddSignalsRequest) error {
 
 
 	I don't know enough about gin to tell how much of an issue it can be.
 	I don't know enough about gin to tell how much of an issue it can be.
 	*/
 	*/
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	defer cancel()
-	_, _, err := a.apiClient.Signal.Add(ctx, cache)
-	return err
+	var cache []*models.AddSignalsRequestItem = *cacheOrig
+	var send models.AddSignalsRequest
+
+	bulkSize := 50
+	pageStart := 0
+	pageEnd := bulkSize
+
+	for {
+
+		if pageEnd >= len(cache) {
+			send = cache[pageStart:]
+			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			_, _, err := a.apiClient.Signal.Add(ctx, &send)
+			if err != nil {
+				log.Errorf("Error while sending final chunk to central API : %s", err)
+				return
+			}
+			break
+		}
+		send = cache[pageStart:pageEnd]
+		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		defer cancel()
+		_, _, err := a.apiClient.Signal.Add(ctx, &send)
+		if err != nil {
+			//we log it here as well, because the return value of func might be discarded
+			log.Errorf("Error while sending chunk to central API : %s", err)
+		}
+		pageStart += bulkSize
+		pageEnd += bulkSize
+	}
 }
 }
 
 
 func (a *apic) PullTop() error {
 func (a *apic) PullTop() error {