add metrics for httpgs and sftpfs

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2022-06-14 19:37:25 +02:00
parent 686166f2ce
commit 2b0b19da9e
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB
6 changed files with 138 additions and 10 deletions

View file

@ -247,7 +247,8 @@ func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
if t.MaxWriteSize > 0 {
sizeDiff := initialSize - size
t.MaxWriteSize += sizeDiff
metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
if t.transferQuota.HasSizeLimits() {
go func(ulSize, dlSize int64, user dataprovider.User) {
dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
@ -337,7 +338,7 @@ func (t *BaseTransfer) Close() error {
numFiles = 1
}
metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
t.transferType, t.ErrTransfer)
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
if t.transferQuota.HasSizeLimits() {
dataprovider.UpdateUserTransferQuota(&t.Connection.User, atomic.LoadInt64(&t.BytesReceived), //nolint:errcheck
atomic.LoadInt64(&t.BytesSent), false)

4
go.mod
View file

@ -68,7 +68,7 @@ require (
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/net v0.0.0-20220607020251-c690dde0001d
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
google.golang.org/api v0.83.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
@ -154,7 +154,7 @@ require (
golang.org/x/tools v0.1.11 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220614154056-d2c91c45c995 // indirect
google.golang.org/genproto v0.0.0-20220614165028-45ed7f3ff16e // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect

8
go.sum
View file

@ -957,8 +957,8 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d h1:Zu/JngovGLVi6t2J3nmAf3AoTDwuzw85YZ3b9o4yU7s=
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098 h1:PgOr27OhUx2IRqGJ2RxAWI4dJQ7bi9cSrB82uzFzfUA=
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@ -1202,8 +1202,8 @@ google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220602131408-e326c6e8e9c8/go.mod h1:yKyY4AMRwFiC8yMMNaMi+RkCnjZJt9LoWuvhXjMs+To=
google.golang.org/genproto v0.0.0-20220614154056-d2c91c45c995 h1:RJSqnopW/SLDXggSc2Psf704BMQ0Yz7AE6fjhQ62qYI=
google.golang.org/genproto v0.0.0-20220614154056-d2c91c45c995/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220614165028-45ed7f3ff16e h1:ubR4JUtqN3ffdFjpKylv8scWk/mZstGmzXbgYSkuMl0=
google.golang.org/genproto v0.0.0-20220614165028-45ed7f3ff16e/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=

View file

@ -588,6 +588,78 @@ var (
Name: "sftpgo_az_head_container_errors",
Help: "The total number of Azure head container errors",
})
// totalSFTPFsUploads is the metric that reports the total number of successful SFTPFs uploads
totalSFTPFsUploads = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_uploads_total",
Help: "The total number of successful SFTPFs uploads",
})
// totalSFTPFsDownloads is the metric that reports the total number of successful SFTPFs downloads
totalSFTPFsDownloads = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_downloads_total",
Help: "The total number of successful SFTPFs downloads",
})
// totalSFTPFsUploadErrors is the metric that reports the total number of SFTPFs upload errors
totalSFTPFsUploadErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_upload_errors_total",
Help: "The total number of SFTPFs upload errors",
})
// totalSFTPFsDownloadErrors is the metric that reports the total number of SFTPFs download errors
totalSFTPFsDownloadErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_download_errors_total",
Help: "The total number of SFTPFs download errors",
})
// totalSFTPFsUploadSize is the metric that reports the total SFTPFs uploads size as bytes
totalSFTPFsUploadSize = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_upload_size",
Help: "The total SFTPFs upload size as bytes, partial uploads are included",
})
// totalSFTPFsDownloadSize is the metric that reports the total SFTPFs downloads size as bytes
totalSFTPFsDownloadSize = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_sftpfs_download_size",
Help: "The total SFTPFs download size as bytes, partial downloads are included",
})
// totalHTTPFsUploads is the metric that reports the total number of successful HTTPFs uploads
totalHTTPFsUploads = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_uploads_total",
Help: "The total number of successful HTTPFs uploads",
})
// totalHTTPFsDownloads is the metric that reports the total number of successful HTTPFs downloads
totalHTTPFsDownloads = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_downloads_total",
Help: "The total number of successful HTTPFs downloads",
})
// totalHTTPFsUploadErrors is the metric that reports the total number of HTTPFs upload errors
totalHTTPFsUploadErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_upload_errors_total",
Help: "The total number of HTTPFs upload errors",
})
// totalHTTPFsDownloadErrors is the metric that reports the total number of HTTPFs download errors
totalHTTPFsDownloadErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_download_errors_total",
Help: "The total number of HTTPFs download errors",
})
// totalHTTPFsUploadSize is the metric that reports the total HTTPFs uploads size as bytes
totalHTTPFsUploadSize = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_upload_size",
Help: "The total HTTPFs upload size as bytes, partial uploads are included",
})
// totalHTTPFsDownloadSize is the metric that reports the total HTTPFs downloads size as bytes
totalHTTPFsDownloadSize = promauto.NewCounter(prometheus.CounterOpts{
Name: "sftpgo_httpfs_download_size",
Help: "The total HTTPFs download size as bytes, partial downloads are included",
})
)
// AddMetricsEndpoint exposes metrics to the specified endpoint
@ -596,7 +668,7 @@ func AddMetricsEndpoint(metricsPath string, handler chi.Router) {
}
// TransferCompleted updates metrics after an upload or a download
func TransferCompleted(bytesSent, bytesReceived int64, transferKind int, err error) {
func TransferCompleted(bytesSent, bytesReceived int64, transferKind int, err error, isSFTPFs bool) {
if transferKind == 0 {
// upload
if err == nil {
@ -618,6 +690,9 @@ func TransferCompleted(bytesSent, bytesReceived int64, transferKind int, err err
if bytesSent > 0 {
totalDownloadSize.Add(float64(bytesSent))
}
if isSFTPFs {
sftpFsTransferCompleted(bytesSent, bytesReceived, transferKind, err)
}
}
// S3TransferCompleted updates metrics after an S3 upload or a download
@ -818,6 +893,52 @@ func AZHeadContainerCompleted(err error) {
}
}
// sftpFsTransferCompleted updates metrics after an SFTPFs upload or a download
func sftpFsTransferCompleted(bytesSent, bytesReceived int64, transferKind int, err error) {
if transferKind == 0 {
// upload
if err == nil {
totalSFTPFsUploads.Inc()
} else {
totalSFTPFsUploadErrors.Inc()
}
} else {
// download
if err == nil {
totalSFTPFsDownloads.Inc()
} else {
totalSFTPFsDownloadErrors.Inc()
}
}
if bytesReceived > 0 {
totalSFTPFsUploadSize.Add(float64(bytesReceived))
}
if bytesSent > 0 {
totalSFTPFsDownloadSize.Add(float64(bytesSent))
}
}
// HTTPFsTransferCompleted updates metrics after an HTTPFs upload or a download
func HTTPFsTransferCompleted(bytes int64, transferKind int, err error) {
if transferKind == 0 {
// upload
if err == nil {
totalHTTPFsUploads.Inc()
} else {
totalHTTPFsUploadErrors.Inc()
}
totalHTTPFsUploadSize.Add(float64(bytes))
} else {
// download
if err == nil {
totalHTTPFsDownloads.Inc()
} else {
totalHTTPFsDownloadErrors.Inc()
}
totalHTTPFsDownloadSize.Add(float64(bytes))
}
}
// SSHCommandCompleted update metrics after an SSH command terminates
func SSHCommandCompleted(err error) {
if err == nil {

View file

@ -231,7 +231,8 @@ func (t *transfer) copyFromReaderToWriter(dst io.Writer, src io.Reader) (int64,
}
t.ErrTransfer = err
if written > 0 || err != nil {
metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.GetType(), t.ErrTransfer)
metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.GetType(),
t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
}
return written, err
}

View file

@ -24,6 +24,7 @@ import (
"github.com/drakkan/sftpgo/v2/kms"
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/metric"
"github.com/drakkan/sftpgo/v2/util"
)
@ -289,12 +290,14 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f
if err != nil {
fsLog(fs, logger.LevelError, "download error, path %q, err: %v", name, err)
w.CloseWithError(err) //nolint:errcheck
metric.HTTPFsTransferCompleted(0, 1, err)
return
}
defer resp.Body.Close()
n, err := io.Copy(w, resp.Body)
w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path %q size: %v, err: %+v", name, n, err)
metric.HTTPFsTransferCompleted(n, 1, err)
}()
return nil, r, cancelFn, nil
@ -324,6 +327,7 @@ func (fs *HTTPFs) Create(name string, flag int) (File, *PipeWriter, func(), erro
fsLog(fs, logger.LevelError, "upload error, path %q, err: %v", name, err)
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
metric.HTTPFsTransferCompleted(0, 0, err)
return
}
defer resp.Body.Close()
@ -331,6 +335,7 @@ func (fs *HTTPFs) Create(name string, flag int) (File, *PipeWriter, func(), erro
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, readed bytes: %d", name, r.GetReadedBytes())
metric.HTTPFsTransferCompleted(r.GetReadedBytes(), 0, err)
}()
return nil, p, cancelFn, nil