From f03fdd11551446569cc93c5b6f41ab9373bf3401 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sat, 12 Aug 2023 18:51:47 +0200 Subject: [PATCH] add object metadata to notification events Signed-off-by: Nicola Murino --- docs/custom-actions.md | 4 +- docs/eventmanager.md | 3 + docs/full-configuration.md | 2 + go.mod | 10 ++-- go.sum | 20 +++---- internal/common/actions.go | 18 ++++-- internal/common/actions_test.go | 28 +++++----- internal/common/common.go | 12 +++- internal/common/connection.go | 8 +-- internal/common/eventmanager.go | 41 +++++++++----- internal/common/eventmanager_test.go | 17 +++++- internal/common/transfer.go | 14 +++-- internal/common/transfer_test.go | 1 + internal/config/config.go | 6 ++ internal/ftpd/internal_test.go | 2 +- internal/ftpd/transfer.go | 7 ++- internal/httpd/file.go | 7 ++- internal/sftpd/internal_test.go | 4 +- internal/sftpd/ssh_cmd.go | 2 +- internal/sftpd/transfer.go | 7 ++- internal/vfs/azblobfs.go | 13 +++-- internal/vfs/cryptfs.go | 5 +- internal/vfs/gcsfs.go | 16 +++++- internal/vfs/httpfs.go | 5 +- internal/vfs/osfs.go | 5 +- internal/vfs/s3fs.go | 15 ++++- internal/vfs/sftpfs.go | 6 +- internal/vfs/vfs.go | 84 +++++++++++++++++++++++----- internal/webdavd/file.go | 3 + internal/webdavd/internal_test.go | 4 +- sftpgo.json | 3 + templates/webadmin/eventaction.html | 9 +++ 32 files changed, 276 insertions(+), 105 deletions(-) diff --git a/docs/custom-actions.md b/docs/custom-actions.md index 661dde82..4360d811 100644 --- a/docs/custom-actions.md +++ b/docs/custom-actions.md @@ -51,6 +51,7 @@ If the `hook` defines a path to an external program, then this program can read - `SFTPGO_ACTION_OPEN_FLAGS`, integer. File open flags, can be non-zero for `pre-upload` action. If `SFTPGO_ACTION_FILE_SIZE` is greater than zero and `SFTPGO_ACTION_OPEN_FLAGS&512 == 0` the target file will not be truncated - `SFTPGO_ACTION_ROLE`, string. Role of the user who executed the action - `SFTPGO_ACTION_TIMESTAMP`, int64. Event timestamp as nanoseconds since epoch +- `SFTPGO_ACTION_METADATA`, string. Object metadata serialized as JSON. Omitted if there is no metadata Global environment variables are cleared, for security reasons, when the script is called. You can set additional environment variables in the "command" configuration section. The program must finish within 30 seconds. @@ -76,6 +77,7 @@ If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. Th - `open_flags`, integer. File open flags, can be non-zero for `pre-upload` action. If `file_size` is greater than zero and `file_size&512 == 0` the target file will not be truncated - `role`, string. Included if the user who executed the action has a role - `timestamp`, int64. Event timestamp as nanoseconds since epoch +- `metadata`, struct. Object metadata. Both the keys and the values are string. Omitted if there is no metadata The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md). @@ -116,7 +118,7 @@ The program must finish within 15 seconds. If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. The action, username, ip, object_type and object_name and timestamp and role are added to the query string, for example `?action=update&username=admin&ip=127.0.0.1&object_type=user&object_name=user1×tamp=1633860803249`, and the full object is sent serialized as JSON inside the POST body with sensitive fields removed. The role is added only if not empty. -The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md). +The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md). The structure for SFTPGo objects can be found within the [OpenAPI schema](../openapi/openapi.yaml). diff --git a/docs/eventmanager.md b/docs/eventmanager.md index b7c4f52b..4df55a6f 100644 --- a/docs/eventmanager.md +++ b/docs/eventmanager.md @@ -48,8 +48,11 @@ The following placeholders are supported: - `{{Timestamp}}`. Event timestamp as nanoseconds since epoch. - `{{Email}}`. For filesystem events, this is the email associated with the user performing the action. For the provider events, this is the email associated with the affected user or admin. Blank in all other cases. - `{{ObjectData}}`. Provider object data serialized as JSON with sensitive fields removed. +- `{{ObjectDataString}}`. Provider object data as JSON escaped string with sensitive fields removed. - `{{RetentionReports}}`. Data retention reports as zip compressed CSV files. Supported as email attachment, file path for multipart HTTP request and as single parameter for HTTP requests body. Data retention reports contain details on the number of files deleted and the total size deleted for each folder. - `{{IDPField}}`. Identity Provider custom fields containing a string. +- `{{Metadata}}`. Cloud storage metadata for the downloaded file serialized as JSON. +- `{{MetadataString}}`. Cloud storage metadata for the downloaded file as JSON escaped string. Event rules are based on the premise that an event occours. To each rule you can associate one or more actions. The following trigger events are supported: diff --git a/docs/full-configuration.md b/docs/full-configuration.md index 2efbad5c..14771e7e 100644 --- a/docs/full-configuration.md +++ b/docs/full-configuration.md @@ -87,6 +87,8 @@ The configuration file contains the following sections: - `allowlist_status`, integer. Set to `1` to enable the allow list. The allow list can be populated using the WebAdmin or the REST API. If enabled, only the listed IPs/networks can access the configured services, all other client connections will be dropped before they even try to authenticate. Ensure to populate your allow list before enabling this setting. In multi-nodes setups, the list entries propagation between nodes may take some minutes. Default: `0`. - `allow_self_connections`, integer. Allow users on this instance to use other users/virtual folders on this instance as storage backend. Enable this setting if you know what you are doing. Set to `1` to enable. Default: `0`. - `umask`, string. Set the file mode creation mask, for example `002`. Leave blank to use the system umask. Supported on *NIX platforms. Default: blank. + - `metadata`, struct containing the configuration for managing the Cloud Storage backends metadata. + - `read`, integer. Set to `1` to read metadata before downloading files from Cloud Storage backends and making them available in notification events. Default: `0`. - `defender`, struct containing the defender configuration. See [Defender](./defender.md) for more details. - `enabled`, boolean. Default `false`. - `driver`, string. Supported drivers are `memory` and `provider`. The `provider` driver will use the configured data provider to store defender events and it is supported for `MySQL`, `PostgreSQL` and `CockroachDB` data providers. Using the `provider` driver you can share the defender events among multiple SFTPGO instances. For a single instance the `memory` driver will be much faster. Default: `memory`. diff --git a/go.mod b/go.mod index f6e07ba8..91dab418 100644 --- a/go.mod +++ b/go.mod @@ -16,14 +16,14 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.77 github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2 github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2 - github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2 + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0 github.com/aws/aws-sdk-go-v2/service/sts v1.21.2 github.com/bmatcuk/doublestar/v4 v4.6.0 github.com/cockroachdb/cockroach-go/v2 v2.3.5 github.com/coreos/go-oidc/v3 v3.6.0 github.com/drakkan/webdav v0.0.0-20230227175313-32996838bcd8 github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 - github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda + github.com/fclairamb/ftpserverlib v0.22.0 github.com/fclairamb/go-log v0.4.1 github.com/go-acme/lego/v4 v4.13.3 github.com/go-chi/chi/v5 v5.0.10 @@ -46,14 +46,14 @@ require ( github.com/minio/sio v0.3.1 github.com/otiai10/copy v1.12.0 github.com/pires/go-proxyproto v0.7.0 - github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6 + github.com/pkg/sftp v1.13.6 github.com/pquerna/otp v1.4.0 github.com/prometheus/client_golang v1.16.0 github.com/robfig/cron/v3 v3.0.1 github.com/rs/cors v1.9.0 github.com/rs/xid v1.5.0 github.com/rs/zerolog v1.30.0 - github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745 + github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639 github.com/shirou/gopsutil/v3 v3.23.7 github.com/spf13/afero v1.9.5 github.com/spf13/cobra v1.7.0 @@ -113,7 +113,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/s2a-go v0.1.4 // indirect + github.com/google/s2a-go v0.1.5 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index 013db285..03fafcf0 100644 --- a/go.sum +++ b/go.sum @@ -104,8 +104,8 @@ github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2 h1:Sn0OY6ZvpkzD github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2/go.mod h1:Q+KOs5c1mTtEvycj41l1xy9v7QxojZ/c0NhABlYJthY= github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2 h1:v346f1h8sUBKXnEbrv43L37MTBlFHyKXQPIZHNAaghA= github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2/go.mod h1:cwCATiyNrXK9P2FsWdZ89g9mpsYv2rhk0UA/KByl5fY= -github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2 h1:vlkGQk8JiUo1KmZF4wsZP3qclbyQHSUvLMf8aPOS79g= -github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2/go.mod h1:Z6Oq1mXqvgwmUxvMrV/jMkQhwm06A9XO015dzGnS8TM= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0 h1:z9faFYBvadv9HdY+oFBgxqCnew9TK+jp9ccxktB5fl4= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0/go.mod h1:Z6Oq1mXqvgwmUxvMrV/jMkQhwm06A9XO015dzGnS8TM= github.com/aws/aws-sdk-go-v2/service/sso v1.13.2 h1:A2RlEMo4SJSwbNoUUgkxTAEMduAy/8wG3eB2b2lP4gY= github.com/aws/aws-sdk-go-v2/service/sso v1.13.2/go.mod h1:ju+nNXUunfIFamXUIZQiICjnO/TPlOmWcYhZcSy7xaE= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.2 h1:OJELEgyaT2kmaBGZ+myyZbTTLobfe3ox3FSh5eYK9Qs= @@ -174,8 +174,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= -github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda h1:yoDuXaChUZkcC+lBTbQcFJq78j9lt+7WyAtuop2UVE8= -github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda/go.mod h1:y3T8eZqo8jAYQ0/wnDVRrcGeTTHV8S7ex6mAf8vQ8A0= +github.com/fclairamb/ftpserverlib v0.22.0 h1:PqzyD6YxS5sdb4fAdXUFSODTo8DelsVAOh3LgeR4VXs= +github.com/fclairamb/ftpserverlib v0.22.0/go.mod h1:dI9/yw/KfJ0g4wmRK8ZukUfqakLr6ZTf9VDydKoLy90= github.com/fclairamb/go-log v0.4.1 h1:rLtdSG9x2pK41AIAnE8WYpl05xBJfw1ZyYxZaXFcBsM= github.com/fclairamb/go-log v0.4.1/go.mod h1:sw1KvnkZ4wKCYkvy4SL3qVZcJSWFP8Ure4pM3z+KNn4= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= @@ -278,8 +278,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= -github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= +github.com/google/s2a-go v0.1.5 h1:8IYp3w9nysqv3JH+NJgXJzGbDHzLOTj43BmSkp+O7qg= +github.com/google/s2a-go v0.1.5/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -407,8 +407,8 @@ github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9 github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= -github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6 h1:5TvW1dv00Y13njmQ1AWkxSWtPkwE7ZEF6yDuv9q+Als= -github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= +github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= +github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= @@ -443,8 +443,8 @@ github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJ github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= -github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745 h1:IRKfXQ0/P0ON9UzltTmgLKU0HWYSkuafARw3Pv3hDRU= -github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745/go.mod h1:TjeoMWS0JEXt9RukJveTnaiHj4+MVLtUiDC+mY++Odk= +github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639 h1:KIFQY//0+OslF42WM7Jw24dnkKoHQV0QsW2CWa7Ac2A= +github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639/go.mod h1:TjeoMWS0JEXt9RukJveTnaiHj4+MVLtUiDC+mY++Odk= github.com/shirou/gopsutil/v3 v3.23.7 h1:C+fHO8hfIppoJ1WdsVm1RoI0RwXoNdfTK7yWXV0wVj4= github.com/shirou/gopsutil/v3 v3.23.7/go.mod h1:c4gnmoRC0hQuaLqvxnx1//VXQ0Ms/X9UnJF8pddY5z4= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= diff --git a/internal/common/actions.go b/internal/common/actions.go index 3463fd1d..2d99b264 100644 --- a/internal/common/actions.go +++ b/internal/common/actions.go @@ -92,7 +92,7 @@ func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath str return 0, nil } event = newActionNotification(&conn.User, operation, filePath, virtualPath, "", "", "", - conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, openFlags, conn.getNotificationStatus(nil), 0) + conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, openFlags, conn.getNotificationStatus(nil), 0, nil) if hasNotifiersPlugin { plugin.Handler.NotifyFsEvent(event) } @@ -128,7 +128,7 @@ func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath str // ExecuteActionNotification executes the defined hook, if any, for the specified action func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtualPath, target, virtualTarget, sshCmd string, - fileSize int64, err error, elapsed int64, + fileSize int64, err error, elapsed int64, metadata map[string]string, ) error { hasNotifiersPlugin := plugin.Handler.HasNotifiers() hasHook := util.Contains(Config.Actions.ExecuteOn, operation) @@ -137,7 +137,7 @@ func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtua return nil } notification := newActionNotification(&conn.User, operation, filePath, virtualPath, target, virtualTarget, sshCmd, - conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, 0, conn.getNotificationStatus(err), elapsed) + conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, 0, conn.getNotificationStatus(err), elapsed, metadata) if hasNotifiersPlugin { plugin.Handler.NotifyFsEvent(notification) } @@ -160,6 +160,7 @@ func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtua Timestamp: notification.Timestamp, Email: conn.User.Email, Object: nil, + Metadata: metadata, } if err != nil { params.AddError(fmt.Errorf("%q failed: %w", params.Event, err)) @@ -194,6 +195,7 @@ func newActionNotification( operation, filePath, virtualPath, target, virtualTarget, sshCmd, protocol, ip, sessionID string, fileSize int64, openFlags, status int, elapsed int64, + metadata map[string]string, ) *notifier.FsEvent { var bucket, endpoint string @@ -236,6 +238,7 @@ func newActionNotification( Role: user.Role, Timestamp: time.Now().UnixNano(), Elapsed: elapsed, + Metadata: metadata, } } @@ -316,7 +319,7 @@ func (h *defaultActionHandler) handleCommand(event *notifier.FsEvent) error { } func notificationAsEnvVars(event *notifier.FsEvent) []string { - return []string{ + result := []string{ fmt.Sprintf("SFTPGO_ACTION=%s", event.Action), fmt.Sprintf("SFTPGO_ACTION_USERNAME=%s", event.Username), fmt.Sprintf("SFTPGO_ACTION_PATH=%s", event.Path), @@ -337,4 +340,11 @@ func notificationAsEnvVars(event *notifier.FsEvent) []string { fmt.Sprintf("SFTPGO_ACTION_TIMESTAMP=%d", event.Timestamp), fmt.Sprintf("SFTPGO_ACTION_ROLE=%s", event.Role), } + if len(event.Metadata) > 0 { + data, err := json.Marshal(event.Metadata) + if err == nil { + result = append(result, fmt.Sprintf("SFTPGO_ACTION_METADATA=%s", string(data))) + } + } + return result } diff --git a/internal/common/actions_test.go b/internal/common/actions_test.go index e980c3bf..9376da67 100644 --- a/internal/common/actions_test.go +++ b/internal/common/actions_test.go @@ -71,7 +71,7 @@ func TestNewActionNotification(t *testing.T) { c := NewBaseConnection("id", ProtocolSSH, "", "", user) sessionID := xid.New().String() a := newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID, - 123, 0, c.getNotificationStatus(errors.New("fake error")), 0) + 123, 0, c.getNotificationStatus(errors.New("fake error")), 0, nil) assert.Equal(t, user.Username, a.Username) assert.Equal(t, 0, len(a.Bucket)) assert.Equal(t, 0, len(a.Endpoint)) @@ -79,38 +79,38 @@ func TestNewActionNotification(t *testing.T) { user.FsConfig.Provider = sdk.S3FilesystemProvider a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSSH, "", sessionID, - 123, 0, c.getNotificationStatus(nil), 0) + 123, 0, c.getNotificationStatus(nil), 0, nil) assert.Equal(t, "s3bucket", a.Bucket) assert.Equal(t, "endpoint", a.Endpoint) assert.Equal(t, 1, a.Status) user.FsConfig.Provider = sdk.GCSFilesystemProvider a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID, - 123, 0, c.getNotificationStatus(ErrQuotaExceeded), 0) + 123, 0, c.getNotificationStatus(ErrQuotaExceeded), 0, nil) assert.Equal(t, "gcsbucket", a.Bucket) assert.Equal(t, 0, len(a.Endpoint)) assert.Equal(t, 3, a.Status) a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID, - 123, 0, c.getNotificationStatus(fmt.Errorf("wrapper quota error: %w", ErrQuotaExceeded)), 0) + 123, 0, c.getNotificationStatus(fmt.Errorf("wrapper quota error: %w", ErrQuotaExceeded)), 0, nil) assert.Equal(t, "gcsbucket", a.Bucket) assert.Equal(t, 0, len(a.Endpoint)) assert.Equal(t, 3, a.Status) user.FsConfig.Provider = sdk.HTTPFilesystemProvider a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSSH, "", sessionID, - 123, 0, c.getNotificationStatus(nil), 0) + 123, 0, c.getNotificationStatus(nil), 0, nil) assert.Equal(t, "httpendpoint", a.Endpoint) assert.Equal(t, 1, a.Status) user.FsConfig.Provider = sdk.AzureBlobFilesystemProvider a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID, - 123, 0, c.getNotificationStatus(nil), 0) + 123, 0, c.getNotificationStatus(nil), 0, nil) assert.Equal(t, "azcontainer", a.Bucket) assert.Equal(t, "azendpoint", a.Endpoint) assert.Equal(t, 1, a.Status) a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID, - 123, os.O_APPEND, c.getNotificationStatus(nil), 0) + 123, os.O_APPEND, c.getNotificationStatus(nil), 0, nil) assert.Equal(t, "azcontainer", a.Bucket) assert.Equal(t, "azendpoint", a.Endpoint) assert.Equal(t, 1, a.Status) @@ -118,7 +118,7 @@ func TestNewActionNotification(t *testing.T) { user.FsConfig.Provider = sdk.SFTPFilesystemProvider a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID, - 123, 0, c.getNotificationStatus(nil), 0) + 123, 0, c.getNotificationStatus(nil), 0, nil) assert.Equal(t, "sftpendpoint", a.Endpoint) } @@ -135,7 +135,7 @@ func TestActionHTTP(t *testing.T) { }, } a := newActionNotification(user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", - xid.New().String(), 123, 0, 1, 0) + xid.New().String(), 123, 0, 1, 0, nil) status, err := actionHandler.Handle(a) assert.NoError(t, err) assert.Equal(t, 1, status) @@ -175,16 +175,16 @@ func TestActionCMD(t *testing.T) { } sessionID := shortuuid.New() a := newActionNotification(user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID, - 123, 0, 1, 0) + 123, 0, 1, 0, map[string]string{"key": "value"}) status, err := actionHandler.Handle(a) assert.NoError(t, err) assert.Equal(t, 1, status) c := NewBaseConnection("id", ProtocolSFTP, "", "", *user) - err = ExecuteActionNotification(c, OperationSSHCmd, "path", "vpath", "target", "vtarget", "sha1sum", 0, nil, 0) + err = ExecuteActionNotification(c, OperationSSHCmd, "path", "vpath", "target", "vtarget", "sha1sum", 0, nil, 0, nil) assert.NoError(t, err) - err = ExecuteActionNotification(c, operationDownload, "path", "vpath", "", "", "", 0, nil, 0) + err = ExecuteActionNotification(c, operationDownload, "path", "vpath", "", "", "", 0, nil, 0, nil) assert.NoError(t, err) Config.Actions = actionsCopy @@ -208,7 +208,7 @@ func TestWrongActions(t *testing.T) { } a := newActionNotification(user, operationUpload, "", "", "", "", "", ProtocolSFTP, "", xid.New().String(), - 123, 0, 1, 0) + 123, 0, 1, 0, nil) status, err := actionHandler.Handle(a) assert.Error(t, err, "action with bad command must fail") assert.Equal(t, 1, status) @@ -307,7 +307,7 @@ func TestUnconfiguredHook(t *testing.T) { assert.NoError(t, err) assert.Equal(t, status, 0) - err = ExecuteActionNotification(c, operationDownload, "", "", "", "", "", 0, nil, 0) + err = ExecuteActionNotification(c, operationDownload, "", "", "", "", "", 0, nil, 0, nil) assert.NoError(t, err) err = plugin.Initialize(nil, "debug") diff --git a/internal/common/common.go b/internal/common/common.go index 6100afab..3854a347 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -229,6 +229,7 @@ func Initialize(c Configuration, isShared int) error { dataprovider.SetTempPath(c.TempPath) vfs.SetAllowSelfConnections(c.AllowSelfConnections) vfs.SetRenameMode(c.RenameMode) + vfs.SetReadMetadataMode(c.Metadata.Read) dataprovider.SetAllowSelfConnections(c.AllowSelfConnections) transfersChecker = getTransfersChecker(isShared) return nil @@ -487,6 +488,13 @@ func (t *ConnectionTransfer) getConnectionTransferAsString() string { return result } +// MetadataConfig defines how to handle metadata for cloud storage backends +type MetadataConfig struct { + // If not zero the metadata will be read before downloads and will be + // available in notifications + Read int `json:"read" mapstructure:"read"` +} + // Configuration defines configuration parameters common to all supported protocols type Configuration struct { // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected. @@ -572,7 +580,9 @@ type Configuration struct { // Rate limiter configurations RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"` // Umask for new uploads. Leave blank to use the system default. - Umask string `json:"umask" mapstructure:"umask"` + Umask string `json:"umask" mapstructure:"umask"` + // Metadata configuration + Metadata MetadataConfig `json:"metadata" mapstructure:"metadata"` idleTimeoutAsDuration time.Duration idleLoginTimeout time.Duration defender Defender diff --git a/internal/common/connection.go b/internal/common/connection.go index 01800d2c..20d08aa4 100644 --- a/internal/common/connection.go +++ b/internal/common/connection.go @@ -381,7 +381,7 @@ func (c *BaseConnection) CreateDir(virtualPath string, checkFilePatterns bool) e logger.CommandLog(mkdirLogSender, fsPath, "", c.User.Username, "", c.ID, c.protocol, -1, -1, "", "", "", -1, c.localAddr, c.remoteAddr, elapsed) - ExecuteActionNotification(c, operationMkdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed) //nolint:errcheck + ExecuteActionNotification(c, operationMkdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed, nil) //nolint:errcheck return nil } @@ -436,7 +436,7 @@ func (c *BaseConnection) RemoveFile(fs vfs.Fs, fsPath, virtualPath string, info dataprovider.UpdateUserQuota(&c.User, -1, -size, false) //nolint:errcheck } } - ExecuteActionNotification(c, operationDelete, fsPath, virtualPath, "", "", "", size, nil, elapsed) //nolint:errcheck + ExecuteActionNotification(c, operationDelete, fsPath, virtualPath, "", "", "", size, nil, elapsed, nil) //nolint:errcheck return nil } @@ -502,7 +502,7 @@ func (c *BaseConnection) RemoveDir(virtualPath string) error { logger.CommandLog(rmdirLogSender, fsPath, "", c.User.Username, "", c.ID, c.protocol, -1, -1, "", "", "", -1, c.localAddr, c.remoteAddr, elapsed) - ExecuteActionNotification(c, operationRmdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed) //nolint:errcheck + ExecuteActionNotification(c, operationRmdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed, nil) //nolint:errcheck return nil } @@ -785,7 +785,7 @@ func (c *BaseConnection) renameInternal(virtualSourcePath, virtualTargetPath str logger.CommandLog(renameLogSender, fsSourcePath, fsTargetPath, c.User.Username, "", c.ID, c.protocol, -1, -1, "", "", "", -1, c.localAddr, c.remoteAddr, elapsed) ExecuteActionNotification(c, operationRename, fsSourcePath, virtualSourcePath, fsTargetPath, //nolint:errcheck - virtualTargetPath, "", 0, nil, elapsed) + virtualTargetPath, "", 0, nil, elapsed, nil) return nil } diff --git a/internal/common/eventmanager.go b/internal/common/eventmanager.go index ecfbda70..64be612e 100644 --- a/internal/common/eventmanager.go +++ b/internal/common/eventmanager.go @@ -53,9 +53,10 @@ import ( ) const ( - ipBlockedEventName = "IP Blocked" - maxAttachmentsSize = int64(10 * 1024 * 1024) - objDataPlaceholder = "{{ObjectData}}" + ipBlockedEventName = "IP Blocked" + maxAttachmentsSize = int64(10 * 1024 * 1024) + objDataPlaceholder = "{{ObjectData}}" + objDataPlaceholderString = "{{ObjectDataString}}" ) // Supported IDP login events @@ -554,6 +555,7 @@ type EventParams struct { Timestamp int64 IDPCustomFields *map[string]string Object plugin.Renderer + Metadata map[string]string sender string updateStatusFromError bool errors []string @@ -587,7 +589,7 @@ func (p *EventParams) getACopy() *EventParams { } func (p *EventParams) addIDPCustomFields(customFields *map[string]any) { - if customFields == nil { + if customFields == nil || len(*customFields) == 0 { return } @@ -785,11 +787,14 @@ func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []s } else { replacements = append(replacements, "{{ErrorString}}", "") } - replacements = append(replacements, objDataPlaceholder, "") + replacements = append(replacements, objDataPlaceholder, "{}") + replacements = append(replacements, objDataPlaceholderString, "") if addObjectData { data, err := p.Object.RenderAsJSON(p.Event != operationDelete) if err == nil { - replacements[len(replacements)-1] = p.getStringReplacement(string(data), jsonEscaped) + dataString := string(data) + replacements[len(replacements)-3] = p.getStringReplacement(dataString, false) + replacements[len(replacements)-1] = p.getStringReplacement(dataString, true) } } if p.IDPCustomFields != nil { @@ -797,6 +802,16 @@ func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []s replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped)) } } + replacements = append(replacements, "{{Metadata}}", "{}") + replacements = append(replacements, "{{MetadataString}}", "") + if len(p.Metadata) > 0 { + data, err := json.Marshal(p.Metadata) + if err == nil { + dataString := string(data) + replacements[len(replacements)-3] = p.getStringReplacement(dataString, false) + replacements[len(replacements)-1] = p.getStringReplacement(dataString, true) + } + } return replacements } @@ -857,7 +872,7 @@ func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSo logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1, "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed) } - ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck + ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck } } else { eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err) @@ -1227,7 +1242,7 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto. } if part.Body != "" { cType := h.Get("Content-Type") - if part.Body != objDataPlaceholder && strings.Contains(strings.ToLower(cType), "application/json") { + if strings.Contains(strings.ToLower(cType), "application/json") { replacements := params.getStringReplacements(addObjectData, true) jsonReplacer := strings.NewReplacer(replacements...) _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer))) @@ -1260,10 +1275,6 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto. return nil } -func jsonEscapeRuleActionBody(c *dataprovider.EventActionHTTPConfig) bool { - return c.Body != objDataPlaceholder && c.HasJSONBody() -} - func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer, cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool, ) (io.Reader, string, error) { @@ -1279,7 +1290,7 @@ func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *stri } return bytes.NewBuffer(data), "", nil } - if jsonEscapeRuleActionBody(c) { + if c.HasJSONBody() { replacements := params.getStringReplacements(addObjectData, true) jsonReplacer := strings.NewReplacer(replacements...) return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil @@ -1425,7 +1436,7 @@ func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *E addObjectData := false if params.Object != nil { for _, k := range c.EnvVars { - if strings.Contains(k.Value, objDataPlaceholder) { + if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) { addObjectData = true break } @@ -1474,7 +1485,7 @@ func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) [ func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error { addObjectData := false if params.Object != nil { - if strings.Contains(c.Body, objDataPlaceholder) { + if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) { addObjectData = true } } diff --git a/internal/common/eventmanager_test.go b/internal/common/eventmanager_test.go index 296e4a84..e78ff1cd 100644 --- a/internal/common/eventmanager_test.go +++ b/internal/common/eventmanager_test.go @@ -817,7 +817,7 @@ func TestEventRuleActions(t *testing.T) { HTTPConfig: dataprovider.EventActionHTTPConfig{ Endpoint: "http://foo\x7f.com/", // invalid URL SkipTLSVerify: true, - Body: `"data": "{{ObjectData}}"`, + Body: `"data": "{{ObjectDataString}}"`, Method: http.MethodPost, QueryParameters: []dataprovider.KeyValue{ { @@ -2265,3 +2265,18 @@ func TestHTTPEndpointWithPlaceholders(t *testing.T) { expected = c.Endpoint + "?p=" + url.QueryEscape(vPath) + "&u=" + url.QueryEscape(name) assert.Equal(t, expected, u) } + +func TestMetadataReplacement(t *testing.T) { + params := &EventParams{ + Metadata: map[string]string{ + "key": "value", + }, + } + replacements := params.getStringReplacements(false, false) + replacer := strings.NewReplacer(replacements...) + reader, _, err := getHTTPRuleActionBody(&dataprovider.EventActionHTTPConfig{Body: "{{Metadata}} {{MetadataString}}"}, replacer, nil, dataprovider.User{}, params, false) + require.NoError(t, err) + data, err := io.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, `{"key":"value"} {\"key\":\"value\"}`, string(data)) +} diff --git a/internal/common/transfer.go b/internal/common/transfer.go index 5e02afbe..24ae1d11 100644 --- a/internal/common/transfer.go +++ b/internal/common/transfer.go @@ -56,6 +56,7 @@ type BaseTransfer struct { //nolint:maligned aTime time.Time mTime time.Time transferQuota dataprovider.TransferQuota + metadata map[string]string sync.Mutex errAbort error ErrTransfer error @@ -206,6 +207,11 @@ func (t *BaseTransfer) GetRealFsPath(fsPath string) string { return "" } +// SetMetadata sets the metadata for the file +func (t *BaseTransfer) SetMetadata(val map[string]string) { + t.metadata = val +} + // SetCancelFn sets the cancel function for the transfer func (t *BaseTransfer) SetCancelFn(cancelFn func()) { t.cancelFn = cancelFn @@ -405,7 +411,7 @@ func (t *BaseTransfer) Close() error { logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username, t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode) ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck - t.BytesSent.Load(), t.ErrTransfer, elapsed) + t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata) } else { statSize, deletedFiles, errStat := t.getUploadFileSize() if errStat == nil { @@ -449,7 +455,7 @@ func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) { if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, true); err == nil { t.Connection.uploadDone.Store(true) ExecuteActionNotification(t.Connection, operationFirstUpload, t.fsPath, t.requestPath, "", //nolint:errcheck - "", "", uploadFileSize, t.ErrTransfer, elapsed) + "", "", uploadFileSize, t.ErrTransfer, elapsed, t.metadata) } } return @@ -458,14 +464,14 @@ func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) { if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil { t.Connection.downloadDone.Store(true) ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck - "", "", t.BytesSent.Load(), t.ErrTransfer, elapsed) + "", "", t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata) } } } func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize, elapsed int64) (int, int64) { err := ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "", - fileSize, t.ErrTransfer, elapsed) + fileSize, t.ErrTransfer, elapsed, t.metadata) if err != nil { if t.ErrTransfer == nil { t.ErrTransfer = err diff --git a/internal/common/transfer_test.go b/internal/common/transfer_test.go index 8f8203bd..2b4ddc80 100644 --- a/internal/common/transfer_test.go +++ b/internal/common/transfer_test.go @@ -232,6 +232,7 @@ func TestTransferErrors(t *testing.T) { assert.ErrorIs(t, err, sftp.ErrSSHFxPermissionDenied) assert.Nil(t, transfer.cancelFn) assert.Equal(t, testFile, transfer.GetFsPath()) + transfer.SetMetadata(map[string]string{"key": "val"}) transfer.SetCancelFn(cancelFn) errFake := errors.New("err fake") transfer.BytesReceived.Store(9) diff --git a/internal/config/config.go b/internal/config/config.go index 7725d85e..51f8002d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -229,6 +229,10 @@ func Init() { EntriesHardLimit: 150, }, RateLimitersConfig: []common.RateLimiterConfig{defaultRateLimiter}, + Umask: "", + Metadata: common.MetadataConfig{ + Read: 0, + }, }, ACME: acme.Configuration{ Email: "", @@ -2006,6 +2010,8 @@ func setViperDefaults() { viper.SetDefault("common.defender.observation_time", globalConf.Common.DefenderConfig.ObservationTime) viper.SetDefault("common.defender.entries_soft_limit", globalConf.Common.DefenderConfig.EntriesSoftLimit) viper.SetDefault("common.defender.entries_hard_limit", globalConf.Common.DefenderConfig.EntriesHardLimit) + viper.SetDefault("common.umask", globalConf.Common.Umask) + viper.SetDefault("common.metadata.read", globalConf.Common.Metadata.Read) viper.SetDefault("acme.email", globalConf.ACME.Email) viper.SetDefault("acme.key_type", globalConf.ACME.KeyType) viper.SetDefault("acme.certs_path", globalConf.ACME.CertsPath) diff --git a/internal/ftpd/internal_test.go b/internal/ftpd/internal_test.go index 3d88e4cd..afd33835 100644 --- a/internal/ftpd/internal_test.go +++ b/internal/ftpd/internal_test.go @@ -853,7 +853,7 @@ func TestTransferErrors(t *testing.T) { assert.NoError(t, err) baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile, testfile, common.TransferUpload, 0, 0, 0, 0, false, fs, dataprovider.TransferQuota{}) - tr = newTransfer(baseTransfer, nil, r, 10) + tr = newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), 10) pos, err := tr.Seek(10, 0) assert.NoError(t, err) assert.Equal(t, pos, tr.expectedOffset) diff --git a/internal/ftpd/transfer.go b/internal/ftpd/transfer.go index 3bc62b65..230df835 100644 --- a/internal/ftpd/transfer.go +++ b/internal/ftpd/transfer.go @@ -18,8 +18,6 @@ import ( "errors" "io" - "github.com/eikenb/pipeat" - "github.com/drakkan/sftpgo/v2/internal/common" "github.com/drakkan/sftpgo/v2/internal/vfs" ) @@ -34,7 +32,7 @@ type transfer struct { expectedOffset int64 } -func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt, +func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, expectedOffset int64) *transfer { var writer io.WriteCloser var reader io.ReadCloser @@ -137,6 +135,9 @@ func (t *transfer) closeIO() error { t.Unlock() } else if t.reader != nil { err = t.reader.Close() + if metadater, ok := t.reader.(vfs.Metadater); ok { + t.BaseTransfer.SetMetadata(metadater.Metadata()) + } } return err } diff --git a/internal/httpd/file.go b/internal/httpd/file.go index ba428de9..20b801f8 100644 --- a/internal/httpd/file.go +++ b/internal/httpd/file.go @@ -17,8 +17,6 @@ package httpd import ( "io" - "github.com/eikenb/pipeat" - "github.com/drakkan/sftpgo/v2/internal/common" "github.com/drakkan/sftpgo/v2/internal/vfs" ) @@ -30,7 +28,7 @@ type httpdFile struct { isFinished bool } -func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *httpdFile { +func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile { var writer io.WriteCloser var reader io.ReadCloser if baseTransfer.File != nil { @@ -127,6 +125,9 @@ func (f *httpdFile) closeIO() error { f.Unlock() } else if f.reader != nil { err = f.reader.Close() + if metadater, ok := f.reader.(vfs.Metadater); ok { + f.BaseTransfer.SetMetadata(metadater.Metadata()) + } } return err } diff --git a/internal/sftpd/internal_test.go b/internal/sftpd/internal_test.go index baddaea9..a32ddd43 100644 --- a/internal/sftpd/internal_test.go +++ b/internal/sftpd/internal_test.go @@ -233,7 +233,7 @@ func TestReadWriteErrors(t *testing.T) { assert.NoError(t, err) baseTransfer = common.NewBaseTransfer(nil, conn, nil, file.Name(), file.Name(), testfile, common.TransferDownload, 0, 0, 0, 0, false, fs, dataprovider.TransferQuota{}) - transfer = newTransfer(baseTransfer, nil, r, nil) + transfer = newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), nil) err = transfer.Close() assert.NoError(t, err) _, err = transfer.ReadAt(buf, 0) @@ -1807,7 +1807,7 @@ func TestTransferFailingReader(t *testing.T) { baseTransfer := common.NewBaseTransfer(nil, connection.BaseConnection, nil, fsPath, fsPath, filepath.Base(fsPath), common.TransferUpload, 0, 0, 0, 0, false, fs, dataprovider.TransferQuota{}) errRead := errors.New("read is not allowed") - tr := newTransfer(baseTransfer, nil, r, errRead) + tr := newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), errRead) _, err = tr.ReadAt(buf, 0) assert.EqualError(t, err, errRead.Error()) diff --git a/internal/sftpd/ssh_cmd.go b/internal/sftpd/ssh_cmd.go index 458b38fe..81042a6a 100644 --- a/internal/sftpd/ssh_cmd.go +++ b/internal/sftpd/ssh_cmd.go @@ -568,7 +568,7 @@ func (c *sshCommand) sendExitStatus(err error) { } } common.ExecuteActionNotification(c.connection.BaseConnection, common.OperationSSHCmd, cmdPath, vCmdPath, //nolint:errcheck - targetPath, vTargetPath, c.command, 0, err, elapsed) + targetPath, vTargetPath, c.command, 0, err, elapsed, nil) if err == nil { logger.CommandLog(sshCommandLogSender, cmdPath, targetPath, c.connection.User.Username, "", c.connection.ID, common.ProtocolSSH, -1, -1, "", "", c.connection.command, -1, c.connection.GetLocalAddress(), diff --git a/internal/sftpd/transfer.go b/internal/sftpd/transfer.go index 1919d574..b53977a4 100644 --- a/internal/sftpd/transfer.go +++ b/internal/sftpd/transfer.go @@ -18,8 +18,6 @@ import ( "fmt" "io" - "github.com/eikenb/pipeat" - "github.com/drakkan/sftpgo/v2/internal/common" "github.com/drakkan/sftpgo/v2/internal/metric" "github.com/drakkan/sftpgo/v2/internal/vfs" @@ -60,7 +58,7 @@ type transfer struct { isFinished bool } -func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt, +func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, errForRead error) *transfer { var writer writerAtCloser var reader readerAtCloser @@ -178,6 +176,9 @@ func (t *transfer) closeIO() error { t.Unlock() } else if t.readerAt != nil { err = t.readerAt.Close() + if metadater, ok := t.readerAt.(vfs.Metadater); ok { + t.BaseTransfer.SetMetadata(metadater.Metadata()) + } } return err } diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index 18c19eca..100ff73e 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -206,24 +206,25 @@ func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *AzureBlobFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err } + p := NewPipeReader(r) ctx, cancelFn := context.WithCancel(context.Background()) go func() { defer cancelFn() blockBlob := fs.containerClient.NewBlockBlobClient(name) - err := fs.handleMultipartDownload(ctx, blockBlob, offset, w) + err := fs.handleMultipartDownload(ctx, blockBlob, offset, w, p) w.CloseWithError(err) //nolint:errcheck fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, w.GetWrittenBytes(), err) metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err) }() - return nil, r, cancelFn, nil + return nil, p, cancelFn, nil } // Create creates or opens the named file for writing @@ -960,13 +961,17 @@ func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *blockblob.Cl } func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *blockblob.Client, - offset int64, writer io.WriterAt, + offset int64, writer io.WriterAt, pipeReader *PipeReader, ) error { props, err := blockBlob.GetProperties(ctx, &blob.GetPropertiesOptions{}) + metric.AZHeadObjectCompleted(err) if err != nil { fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err) return err } + if readMetadata > 0 { + pipeReader.setMetadataFromPointerVal(props.Metadata) + } contentLength := util.GetIntFromPointer(props.ContentLength) sizeToDownload := contentLength - offset if sizeToDownload < 0 { diff --git a/internal/vfs/cryptfs.go b/internal/vfs/cryptfs.go index 2a15d213..337b2c52 100644 --- a/internal/vfs/cryptfs.go +++ b/internal/vfs/cryptfs.go @@ -79,7 +79,7 @@ func (fs *CryptFs) Name() string { } // Open opens the named file for reading -func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *CryptFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { f, key, err := fs.getFileAndEncryptionKey(name) if err != nil { return nil, nil, nil, err @@ -94,6 +94,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f.Close() return nil, nil, nil, err } + p := NewPipeReader(r) go func() { if isZeroDownload { @@ -149,7 +150,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err) }() - return nil, r, nil, nil + return nil, p, nil, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index a55f6433..eec9e88f 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -129,17 +129,27 @@ func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *GCSFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err } + p := NewPipeReader(r) + if readMetadata > 0 { + attrs, err := fs.headObject(name) + if err != nil { + r.Close() + w.Close() + return nil, nil, nil, err + } + p.setMetadata(attrs.Metadata) + } bkt := fs.svc.Bucket(fs.config.Bucket) obj := bkt.Object(name) ctx, cancelFn := context.WithCancel(context.Background()) objectReader, err := obj.NewRangeReader(ctx, offset, -1) if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" { - err = fmt.Errorf("range request is not possible for gzip content encoding, requested offset %v", offset) + err = fmt.Errorf("range request is not possible for gzip content encoding, requested offset %d", offset) objectReader.Close() } if err != nil { @@ -157,7 +167,7 @@ func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fu fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err) metric.GCSTransferCompleted(n, 1, err) }() - return nil, r, cancelFn, nil + return nil, p, cancelFn, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/httpfs.go b/internal/vfs/httpfs.go index 94cccb4d..2961ab32 100644 --- a/internal/vfs/httpfs.go +++ b/internal/vfs/httpfs.go @@ -297,11 +297,12 @@ func (fs *HTTPFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *HTTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *HTTPFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err } + p := NewPipeReader(r) ctx, cancelFn := context.WithCancel(context.Background()) var queryString string @@ -326,7 +327,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f metric.HTTPFsTransferCompleted(n, 1, err) }() - return nil, r, cancelFn, nil + return nil, p, cancelFn, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/osfs.go b/internal/vfs/osfs.go index 5d721cd1..5c519f98 100644 --- a/internal/vfs/osfs.go +++ b/internal/vfs/osfs.go @@ -107,7 +107,7 @@ func (fs *OsFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *OsFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { f, err := os.Open(name) if err != nil { return nil, nil, nil, err @@ -127,6 +127,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun f.Close() return nil, nil, nil, err } + p := NewPipeReader(r) go func() { br := bufio.NewReaderSize(f, fs.readBufferSize) n, err := doCopy(w, br, nil) @@ -135,7 +136,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err) }() - return nil, r, nil, nil + return nil, p, nil, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index 1a53ce80..c4e22979 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -197,11 +197,22 @@ func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err } + p := NewPipeReader(r) + if readMetadata > 0 { + attrs, err := fs.headObject(name) + if err != nil { + r.Close() + w.Close() + return nil, nil, nil, err + } + p.setMetadata(attrs.Metadata) + } + ctx, cancelFn := context.WithCancel(context.Background()) downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) { d.Concurrency = fs.config.DownloadConcurrency @@ -230,7 +241,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err) metric.S3TransferCompleted(n, 1, err) }() - return nil, r, cancelFn, nil + return nil, p, cancelFn, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/sftpfs.go b/internal/vfs/sftpfs.go index df3f03ca..532e892d 100644 --- a/internal/vfs/sftpfs.go +++ b/internal/vfs/sftpfs.go @@ -336,7 +336,7 @@ func (fs *SFTPFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { +func (fs *SFTPFs) Open(name string, offset int64) (File, *PipeReader, func(), error) { client, err := fs.conn.getClient() if err != nil { return nil, nil, nil, err @@ -360,6 +360,8 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f f.Close() return nil, nil, nil, err } + p := NewPipeReader(r) + go func() { // if we enable buffering the client stalls //br := bufio.NewReaderSize(f, int(fs.config.BufferSize)*1024*1024) @@ -370,7 +372,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err) }() - return nil, r, nil, nil + return nil, p, nil, nil } // Create creates or opens the named file for writing diff --git a/internal/vfs/vfs.go b/internal/vfs/vfs.go index f8a8ad01..fb26d3c0 100644 --- a/internal/vfs/vfs.go +++ b/internal/vfs/vfs.go @@ -25,6 +25,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "time" "github.com/eikenb/pipeat" @@ -60,6 +61,7 @@ var ( sftpFingerprints []string allowSelfConnections int renameMode int + readMetadata int ) // SetAllowSelfConnections sets the desired behaviour for self connections @@ -87,13 +89,18 @@ func SetRenameMode(val int) { renameMode = val } +// SetReadMetadataMode sets the read metadata mode +func SetReadMetadataMode(val int) { + readMetadata = val +} + // Fs defines the interface for filesystem backends type Fs interface { Name() string ConnectionID() string Stat(name string) (os.FileInfo, error) Lstat(name string) (os.FileInfo, error) - Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) + Open(name string, offset int64) (File, *PipeReader, func(), error) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) Rename(source, target string) (int, int64, error) Remove(name string, isDir bool) error @@ -157,6 +164,11 @@ type File interface { Truncate(size int64) error } +// Metadater defines an interface to implement to return metadata for a file +type Metadater interface { + Metadata() map[string]string +} + // QuotaCheckResult defines the result for a quota check type QuotaCheckResult struct { HasSpace bool @@ -687,23 +699,23 @@ func (c *CryptFsConfig) validate() error { // PipeWriter defines a wrapper for pipeat.PipeWriterAt. type PipeWriter struct { - writer *pipeat.PipeWriterAt - err error - done chan bool + *pipeat.PipeWriterAt + err error + done chan bool } // NewPipeWriter initializes a new PipeWriter func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter { return &PipeWriter{ - writer: w, - err: nil, - done: make(chan bool), + PipeWriterAt: w, + err: nil, + done: make(chan bool), } } // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any. func (p *PipeWriter) Close() error { - p.writer.Close() //nolint:errcheck // the returned error is always null + p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null <-p.done return p.err } @@ -715,14 +727,58 @@ func (p *PipeWriter) Done(err error) { p.done <- true } -// WriteAt is a wrapper for pipeat WriteAt -func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) { - return p.writer.WriteAt(data, off) +// NewPipeReader initializes a new PipeReader +func NewPipeReader(r *pipeat.PipeReaderAt) *PipeReader { + return &PipeReader{ + PipeReaderAt: r, + } } -// Write is a wrapper for pipeat Write -func (p *PipeWriter) Write(data []byte) (int, error) { - return p.writer.Write(data) +// PipeReader defines a wrapper for pipeat.PipeReaderAt. +type PipeReader struct { + *pipeat.PipeReaderAt + mu sync.RWMutex + metadata map[string]string +} + +func (p *PipeReader) setMetadata(value map[string]string) { + p.mu.Lock() + defer p.mu.Unlock() + + p.metadata = value +} + +func (p *PipeReader) setMetadataFromPointerVal(value map[string]*string) { + p.mu.Lock() + defer p.mu.Unlock() + + if len(value) == 0 { + p.metadata = nil + return + } + + p.metadata = map[string]string{} + for k, v := range value { + val := util.GetStringFromPointer(v) + if val != "" { + p.metadata[k] = val + } + } +} + +// Metadata implements the Metadater interface +func (p *PipeReader) Metadata() map[string]string { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.metadata) == 0 { + return nil + } + result := make(map[string]string) + for k, v := range p.metadata { + result[k] = v + } + return result } func isEqualityCheckModeValid(mode int) bool { diff --git a/internal/webdavd/file.go b/internal/webdavd/file.go index 3a7fb81b..1501efce 100644 --- a/internal/webdavd/file.go +++ b/internal/webdavd/file.go @@ -400,6 +400,9 @@ func (f *webDavFile) closeIO() error { f.Unlock() } else if f.reader != nil { err = f.reader.Close() + if metadater, ok := f.reader.(vfs.Metadater); ok { + f.BaseTransfer.SetMetadata(metadater.Metadata()) + } } return err } diff --git a/internal/webdavd/internal_test.go b/internal/webdavd/internal_test.go index 9cd7d505..7ea8f562 100644 --- a/internal/webdavd/internal_test.go +++ b/internal/webdavd/internal_test.go @@ -286,9 +286,9 @@ func (fs *MockOsFs) Name() string { } // Open returns nil -func (fs *MockOsFs) Open(name string, offset int64) (vfs.File, *pipeat.PipeReaderAt, func(), error) { +func (fs *MockOsFs) Open(name string, offset int64) (vfs.File, *vfs.PipeReader, func(), error) { if fs.reader != nil { - return nil, fs.reader, nil, nil + return nil, vfs.NewPipeReader(fs.reader), nil, nil } return fs.Fs.Open(name, offset) } diff --git a/sftpgo.json b/sftpgo.json index 064b7fe5..5a0c2195 100644 --- a/sftpgo.json +++ b/sftpgo.json @@ -22,6 +22,9 @@ "allowlist_status": 0, "allow_self_connections": 0, "umask": "", + "metadata": { + "read": 0 + }, "defender": { "enabled": false, "driver": "memory", diff --git a/templates/webadmin/eventaction.html b/templates/webadmin/eventaction.html index b4ae9fc0..4ad10902 100644 --- a/templates/webadmin/eventaction.html +++ b/templates/webadmin/eventaction.html @@ -834,12 +834,21 @@ along with this program. If not, see .

{{`{{ObjectData}}`}} => Provider object data serialized as JSON with sensitive fields removed.

+

+ {{`{{ObjectDataString}}`}} => Provider object data as JSON escaped string with sensitive fields removed. +

{{`{{RetentionReports}}`}} => Data retention reports as zip compressed CSV files. Supported as email attachment, file path for multipart HTTP request and as single parameter for HTTP requests body.

{{`{{IDPField}}`}} => Identity Provider custom fields containing a string.

+

+ {{`{{Metadata}}`}} => Cloud storage metadata for the downloaded file serialized as JSON. +

+

+ {{`{{MetadataString}}`}} => Cloud storage metadata for the downloaded file as JSON escaped string. +