plugins: add eventsearcher

This commit is contained in:
Nicola Murino 2021-10-17 16:43:05 +02:00
parent f131ef130b
commit 3bbe67571f
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB
13 changed files with 1004 additions and 18 deletions

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os/exec"
"path/filepath"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
@ -116,7 +117,7 @@ func (p *authPlugin) initialize() error {
Managed: false,
Logger: &logger.HCLogAdapter{
Logger: hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("%v.%v", logSender, auth.PluginName),
Name: fmt.Sprintf("%v.%v.%v", logSender, auth.PluginName, filepath.Base(p.config.Cmd)),
Level: pluginsLogLevel,
DisableTime: true,
}),

View file

@ -19,7 +19,7 @@ const (
// Handshake is a common handshake that is shared by plugin and host.
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "SFTPGO_AUTH_PLUGIN",
MagicCookieKey: "SFTPGO_PLUGIN_AUTH",
MagicCookieValue: "d1ed507d-d2be-4a38-a460-6fe0b2cc7efc",
}

View file

@ -0,0 +1,59 @@
// Package eventsearcher defines the implementation for events search plugins.
// Events search plugins allow to search for filesystem and provider events.
package eventsearcher
import (
"context"
"time"
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher/proto"
)
const (
// PluginName defines the name for an events search plugin
PluginName = "eventsearcher"
)
// Handshake is a common handshake that is shared by plugin and host.
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "SFTPGO_PLUGIN_EVENTSEARCHER",
MagicCookieValue: "2b523805-0279-471c-895e-6c0d39002ca4",
}
// PluginMap is the map of plugins we can dispense.
var PluginMap = map[string]plugin.Plugin{
PluginName: &Plugin{},
}
// Searcher defines the interface for events search plugins
type Searcher interface {
SearchFsEvents(startTimestamp, endTimestamp time.Time, action, username, ip, sshCmd, protocol,
instanceID, continuationToken string, status, limit int) (string, []byte, error)
SearchProviderEvents(startTimestamp, endTimestamp time.Time, action, username, ip, objectType,
objectName, instanceID, continuationToken string, limit int) (string, []byte, error)
}
// Plugin defines the implementation to serve/connect to a notifier plugin
type Plugin struct {
plugin.Plugin
Impl Searcher
}
// GRPCServer defines the GRPC server implementation for this plugin
func (p *Plugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterSearcherServer(s, &GRPCServer{
Impl: p.Impl,
})
return nil
}
// GRPCClient defines the GRPC client implementation for this plugin
func (p *Plugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &GRPCClient{
client: proto.NewSearcherClient(c),
}, nil
}

View file

@ -0,0 +1,99 @@
package eventsearcher
import (
"context"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher/proto"
)
const (
rpcTimeout = 30 * time.Second
)
// GRPCClient is an implementation of Notifier interface that talks over RPC.
type GRPCClient struct {
client proto.SearcherClient
}
// SearchFsEvents implements the Searcher interface
func (c *GRPCClient) SearchFsEvents(startTimestamp, endTimestamp time.Time, action, username, ip, sshCmd, protocol,
instanceID, continuationToken string, status, limit int) (string, []byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()
resp, err := c.client.SearchFsEvents(ctx, &proto.FsEventsFilter{
StartTimestamp: timestamppb.New(startTimestamp),
EndTimestamp: timestamppb.New(endTimestamp),
Action: action,
Username: username,
Ip: ip,
SshCmd: sshCmd,
Protocol: protocol,
InstanceId: instanceID,
ContinuationToken: continuationToken,
Status: int32(status),
Limit: int32(limit),
})
if err != nil {
return "", nil, err
}
return resp.ContinuationToken, resp.ResponseData, nil
}
// SearchProviderEvents implements the Searcher interface
func (c *GRPCClient) SearchProviderEvents(startTimestamp, endTimestamp time.Time, action, username, ip, objectType,
objectName, instanceID, continuationToken string, limit int) (string, []byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()
resp, err := c.client.SearchProviderEvents(ctx, &proto.ProviderEventsFilter{
StartTimestamp: timestamppb.New(startTimestamp),
EndTimestamp: timestamppb.New(endTimestamp),
Action: action,
Username: username,
Ip: ip,
ObjectType: objectType,
ObjectName: objectName,
InstanceId: instanceID,
ContinuationToken: continuationToken,
Limit: int32(limit),
})
if err != nil {
return "", nil, err
}
return resp.ContinuationToken, resp.ResponseData, nil
}
// GRPCServer defines the gRPC server that GRPCClient talks to.
type GRPCServer struct {
Impl Searcher
}
// SearchFsEvents implement the server side fs events search method
func (s *GRPCServer) SearchFsEvents(ctx context.Context, req *proto.FsEventsFilter) (*proto.SearchResponse, error) {
continuationToken, responseData, err := s.Impl.SearchFsEvents(req.StartTimestamp.AsTime(),
req.EndTimestamp.AsTime(), req.Action, req.Username, req.Ip, req.SshCmd, req.Protocol, req.InstanceId,
req.ContinuationToken, int(req.Status), int(req.Limit))
return &proto.SearchResponse{
ContinuationToken: continuationToken,
ResponseData: responseData,
}, err
}
// SearchProviderEvents implement the server side provider events search method
func (s *GRPCServer) SearchProviderEvents(ctx context.Context, req *proto.ProviderEventsFilter) (*proto.SearchResponse, error) {
continuationToken, responseData, err := s.Impl.SearchProviderEvents(req.StartTimestamp.AsTime(),
req.EndTimestamp.AsTime(), req.Action, req.Username, req.Ip, req.ObjectType, req.ObjectName,
req.InstanceId, req.ContinuationToken, int(req.Limit))
return &proto.SearchResponse{
ContinuationToken: continuationToken,
ResponseData: responseData,
}, err
}

View file

@ -0,0 +1,619 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.17.3
// source: eventsearcher/proto/search.proto
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type FsEventsFilter struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StartTimestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=start_timestamp,json=startTimestamp,proto3" json:"start_timestamp,omitempty"`
EndTimestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=end_timestamp,json=endTimestamp,proto3" json:"end_timestamp,omitempty"`
Action string `protobuf:"bytes,3,opt,name=action,proto3" json:"action,omitempty"`
Username string `protobuf:"bytes,4,opt,name=username,proto3" json:"username,omitempty"`
Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip,omitempty"`
SshCmd string `protobuf:"bytes,6,opt,name=ssh_cmd,json=sshCmd,proto3" json:"ssh_cmd,omitempty"`
Protocol string `protobuf:"bytes,7,opt,name=protocol,proto3" json:"protocol,omitempty"`
Status int32 `protobuf:"varint,8,opt,name=status,proto3" json:"status,omitempty"`
InstanceId string `protobuf:"bytes,9,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"`
Limit int32 `protobuf:"varint,10,opt,name=limit,proto3" json:"limit,omitempty"`
ContinuationToken string `protobuf:"bytes,11,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"`
}
func (x *FsEventsFilter) Reset() {
*x = FsEventsFilter{}
if protoimpl.UnsafeEnabled {
mi := &file_eventsearcher_proto_search_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FsEventsFilter) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FsEventsFilter) ProtoMessage() {}
func (x *FsEventsFilter) ProtoReflect() protoreflect.Message {
mi := &file_eventsearcher_proto_search_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FsEventsFilter.ProtoReflect.Descriptor instead.
func (*FsEventsFilter) Descriptor() ([]byte, []int) {
return file_eventsearcher_proto_search_proto_rawDescGZIP(), []int{0}
}
func (x *FsEventsFilter) GetStartTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.StartTimestamp
}
return nil
}
func (x *FsEventsFilter) GetEndTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.EndTimestamp
}
return nil
}
func (x *FsEventsFilter) GetAction() string {
if x != nil {
return x.Action
}
return ""
}
func (x *FsEventsFilter) GetUsername() string {
if x != nil {
return x.Username
}
return ""
}
func (x *FsEventsFilter) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
func (x *FsEventsFilter) GetSshCmd() string {
if x != nil {
return x.SshCmd
}
return ""
}
func (x *FsEventsFilter) GetProtocol() string {
if x != nil {
return x.Protocol
}
return ""
}
func (x *FsEventsFilter) GetStatus() int32 {
if x != nil {
return x.Status
}
return 0
}
func (x *FsEventsFilter) GetInstanceId() string {
if x != nil {
return x.InstanceId
}
return ""
}
func (x *FsEventsFilter) GetLimit() int32 {
if x != nil {
return x.Limit
}
return 0
}
func (x *FsEventsFilter) GetContinuationToken() string {
if x != nil {
return x.ContinuationToken
}
return ""
}
type ProviderEventsFilter struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StartTimestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=start_timestamp,json=startTimestamp,proto3" json:"start_timestamp,omitempty"`
EndTimestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=end_timestamp,json=endTimestamp,proto3" json:"end_timestamp,omitempty"`
Action string `protobuf:"bytes,3,opt,name=action,proto3" json:"action,omitempty"`
Username string `protobuf:"bytes,4,opt,name=username,proto3" json:"username,omitempty"`
Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip,omitempty"`
ObjectType string `protobuf:"bytes,6,opt,name=object_type,json=objectType,proto3" json:"object_type,omitempty"`
ObjectName string `protobuf:"bytes,7,opt,name=object_name,json=objectName,proto3" json:"object_name,omitempty"`
InstanceId string `protobuf:"bytes,8,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"`
Limit int32 `protobuf:"varint,9,opt,name=limit,proto3" json:"limit,omitempty"`
ContinuationToken string `protobuf:"bytes,10,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"`
}
func (x *ProviderEventsFilter) Reset() {
*x = ProviderEventsFilter{}
if protoimpl.UnsafeEnabled {
mi := &file_eventsearcher_proto_search_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ProviderEventsFilter) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProviderEventsFilter) ProtoMessage() {}
func (x *ProviderEventsFilter) ProtoReflect() protoreflect.Message {
mi := &file_eventsearcher_proto_search_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ProviderEventsFilter.ProtoReflect.Descriptor instead.
func (*ProviderEventsFilter) Descriptor() ([]byte, []int) {
return file_eventsearcher_proto_search_proto_rawDescGZIP(), []int{1}
}
func (x *ProviderEventsFilter) GetStartTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.StartTimestamp
}
return nil
}
func (x *ProviderEventsFilter) GetEndTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.EndTimestamp
}
return nil
}
func (x *ProviderEventsFilter) GetAction() string {
if x != nil {
return x.Action
}
return ""
}
func (x *ProviderEventsFilter) GetUsername() string {
if x != nil {
return x.Username
}
return ""
}
func (x *ProviderEventsFilter) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
func (x *ProviderEventsFilter) GetObjectType() string {
if x != nil {
return x.ObjectType
}
return ""
}
func (x *ProviderEventsFilter) GetObjectName() string {
if x != nil {
return x.ObjectName
}
return ""
}
func (x *ProviderEventsFilter) GetInstanceId() string {
if x != nil {
return x.InstanceId
}
return ""
}
func (x *ProviderEventsFilter) GetLimit() int32 {
if x != nil {
return x.Limit
}
return 0
}
func (x *ProviderEventsFilter) GetContinuationToken() string {
if x != nil {
return x.ContinuationToken
}
return ""
}
type SearchResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ContinuationToken string `protobuf:"bytes,1,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"`
ResponseData []byte `protobuf:"bytes,2,opt,name=response_data,json=responseData,proto3" json:"response_data,omitempty"` // JSON serialized response to return
}
func (x *SearchResponse) Reset() {
*x = SearchResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_eventsearcher_proto_search_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SearchResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SearchResponse) ProtoMessage() {}
func (x *SearchResponse) ProtoReflect() protoreflect.Message {
mi := &file_eventsearcher_proto_search_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SearchResponse.ProtoReflect.Descriptor instead.
func (*SearchResponse) Descriptor() ([]byte, []int) {
return file_eventsearcher_proto_search_proto_rawDescGZIP(), []int{2}
}
func (x *SearchResponse) GetContinuationToken() string {
if x != nil {
return x.ContinuationToken
}
return ""
}
func (x *SearchResponse) GetResponseData() []byte {
if x != nil {
return x.ResponseData
}
return nil
}
var File_eventsearcher_proto_search_proto protoreflect.FileDescriptor
var file_eventsearcher_proto_search_proto_rawDesc = []byte{
0x0a, 0x20, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x65, 0x72, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8d, 0x03, 0x0a, 0x0e, 0x46,
0x73, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x43, 0x0a,
0x0f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x0e, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x12, 0x3f, 0x0a, 0x0d, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65,
0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x75,
0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75,
0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x05, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x73, 0x68, 0x5f, 0x63,
0x6d, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x73, 0x68, 0x43, 0x6d, 0x64,
0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x07, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x16, 0x0a, 0x06,
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x74,
0x61, 0x74, 0x75, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
0x5f, 0x69, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61,
0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0a,
0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x63,
0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65,
0x6e, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x88, 0x03, 0x0a, 0x14, 0x50,
0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x46, 0x69, 0x6c,
0x74, 0x65, 0x72, 0x12, 0x43, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3f, 0x0a, 0x0d, 0x65, 0x6e, 0x64, 0x5f,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x65, 0x6e, 0x64,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a,
0x02, 0x69, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x1f, 0x0a,
0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f,
0x0a, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0a, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12,
0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x08,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64,
0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52,
0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e,
0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x0a, 0x20, 0x01,
0x28, 0x09, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x64, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x74, 0x69,
0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x72,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x32, 0x96, 0x01, 0x0a, 0x08,
0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3e, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72,
0x63, 0x68, 0x46, 0x73, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x2e, 0x46, 0x73, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65,
0x72, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4a, 0x0a, 0x14, 0x53, 0x65, 0x61, 0x72,
0x63, 0x68, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73,
0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x1a, 0x15, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x42, 0x20, 0x5a, 0x1e, 0x73, 0x64, 0x6b, 0x2f, 0x70, 0x6c, 0x75, 0x67,
0x69, 0x6e, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x65, 0x72,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_eventsearcher_proto_search_proto_rawDescOnce sync.Once
file_eventsearcher_proto_search_proto_rawDescData = file_eventsearcher_proto_search_proto_rawDesc
)
func file_eventsearcher_proto_search_proto_rawDescGZIP() []byte {
file_eventsearcher_proto_search_proto_rawDescOnce.Do(func() {
file_eventsearcher_proto_search_proto_rawDescData = protoimpl.X.CompressGZIP(file_eventsearcher_proto_search_proto_rawDescData)
})
return file_eventsearcher_proto_search_proto_rawDescData
}
var file_eventsearcher_proto_search_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_eventsearcher_proto_search_proto_goTypes = []interface{}{
(*FsEventsFilter)(nil), // 0: proto.FsEventsFilter
(*ProviderEventsFilter)(nil), // 1: proto.ProviderEventsFilter
(*SearchResponse)(nil), // 2: proto.SearchResponse
(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
}
var file_eventsearcher_proto_search_proto_depIdxs = []int32{
3, // 0: proto.FsEventsFilter.start_timestamp:type_name -> google.protobuf.Timestamp
3, // 1: proto.FsEventsFilter.end_timestamp:type_name -> google.protobuf.Timestamp
3, // 2: proto.ProviderEventsFilter.start_timestamp:type_name -> google.protobuf.Timestamp
3, // 3: proto.ProviderEventsFilter.end_timestamp:type_name -> google.protobuf.Timestamp
0, // 4: proto.Searcher.SearchFsEvents:input_type -> proto.FsEventsFilter
1, // 5: proto.Searcher.SearchProviderEvents:input_type -> proto.ProviderEventsFilter
2, // 6: proto.Searcher.SearchFsEvents:output_type -> proto.SearchResponse
2, // 7: proto.Searcher.SearchProviderEvents:output_type -> proto.SearchResponse
6, // [6:8] is the sub-list for method output_type
4, // [4:6] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_eventsearcher_proto_search_proto_init() }
func file_eventsearcher_proto_search_proto_init() {
if File_eventsearcher_proto_search_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_eventsearcher_proto_search_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FsEventsFilter); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_eventsearcher_proto_search_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ProviderEventsFilter); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_eventsearcher_proto_search_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SearchResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_eventsearcher_proto_search_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_eventsearcher_proto_search_proto_goTypes,
DependencyIndexes: file_eventsearcher_proto_search_proto_depIdxs,
MessageInfos: file_eventsearcher_proto_search_proto_msgTypes,
}.Build()
File_eventsearcher_proto_search_proto = out.File
file_eventsearcher_proto_search_proto_rawDesc = nil
file_eventsearcher_proto_search_proto_goTypes = nil
file_eventsearcher_proto_search_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// SearcherClient is the client API for Searcher service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type SearcherClient interface {
SearchFsEvents(ctx context.Context, in *FsEventsFilter, opts ...grpc.CallOption) (*SearchResponse, error)
SearchProviderEvents(ctx context.Context, in *ProviderEventsFilter, opts ...grpc.CallOption) (*SearchResponse, error)
}
type searcherClient struct {
cc grpc.ClientConnInterface
}
func NewSearcherClient(cc grpc.ClientConnInterface) SearcherClient {
return &searcherClient{cc}
}
func (c *searcherClient) SearchFsEvents(ctx context.Context, in *FsEventsFilter, opts ...grpc.CallOption) (*SearchResponse, error) {
out := new(SearchResponse)
err := c.cc.Invoke(ctx, "/proto.Searcher/SearchFsEvents", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *searcherClient) SearchProviderEvents(ctx context.Context, in *ProviderEventsFilter, opts ...grpc.CallOption) (*SearchResponse, error) {
out := new(SearchResponse)
err := c.cc.Invoke(ctx, "/proto.Searcher/SearchProviderEvents", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// SearcherServer is the server API for Searcher service.
type SearcherServer interface {
SearchFsEvents(context.Context, *FsEventsFilter) (*SearchResponse, error)
SearchProviderEvents(context.Context, *ProviderEventsFilter) (*SearchResponse, error)
}
// UnimplementedSearcherServer can be embedded to have forward compatible implementations.
type UnimplementedSearcherServer struct {
}
func (*UnimplementedSearcherServer) SearchFsEvents(context.Context, *FsEventsFilter) (*SearchResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SearchFsEvents not implemented")
}
func (*UnimplementedSearcherServer) SearchProviderEvents(context.Context, *ProviderEventsFilter) (*SearchResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SearchProviderEvents not implemented")
}
func RegisterSearcherServer(s *grpc.Server, srv SearcherServer) {
s.RegisterService(&_Searcher_serviceDesc, srv)
}
func _Searcher_SearchFsEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FsEventsFilter)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SearcherServer).SearchFsEvents(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Searcher/SearchFsEvents",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SearcherServer).SearchFsEvents(ctx, req.(*FsEventsFilter))
}
return interceptor(ctx, in, info, handler)
}
func _Searcher_SearchProviderEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProviderEventsFilter)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SearcherServer).SearchProviderEvents(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Searcher/SearchProviderEvents",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SearcherServer).SearchProviderEvents(ctx, req.(*ProviderEventsFilter))
}
return interceptor(ctx, in, info, handler)
}
var _Searcher_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.Searcher",
HandlerType: (*SearcherServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SearchFsEvents",
Handler: _Searcher_SearchFsEvents_Handler,
},
{
MethodName: "SearchProviderEvents",
Handler: _Searcher_SearchProviderEvents_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "eventsearcher/proto/search.proto",
}

View file

@ -0,0 +1,43 @@
syntax = "proto3";
package proto;
import "google/protobuf/timestamp.proto";
option go_package = "sdk/plugin/eventsearcher/proto";
message FsEventsFilter {
google.protobuf.Timestamp start_timestamp = 1;
google.protobuf.Timestamp end_timestamp = 2;
string action = 3;
string username = 4;
string ip = 5;
string ssh_cmd = 6;
string protocol = 7;
int32 status = 8;
string instance_id = 9;
int32 limit = 10;
string continuation_token = 11;
}
message ProviderEventsFilter {
google.protobuf.Timestamp start_timestamp = 1;
google.protobuf.Timestamp end_timestamp = 2;
string action = 3;
string username = 4;
string ip = 5;
string object_type = 6;
string object_name = 7;
string instance_id = 8;
int32 limit = 9;
string continuation_token = 10;
}
message SearchResponse {
string continuation_token = 1;
bytes response_data = 2; // JSON serialized response to return
}
service Searcher {
rpc SearchFsEvents(FsEventsFilter) returns (SearchResponse);
rpc SearchProviderEvents(ProviderEventsFilter) returns (SearchResponse);
}

View file

@ -77,7 +77,7 @@ func (p *kmsPlugin) initialize() error {
Managed: false,
Logger: &logger.HCLogAdapter{
Logger: hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("%v.%v", logSender, kmsplugin.PluginName),
Name: fmt.Sprintf("%v.%v.%v", logSender, kmsplugin.PluginName, filepath.Base(p.config.Cmd)),
Level: pluginsLogLevel,
DisableTime: true,
}),

View file

@ -19,7 +19,7 @@ const (
// Handshake is a common handshake that is shared by plugin and host.
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "SFTPGO_KMS_PLUGIN",
MagicCookieKey: "SFTPGO_PLUGIN_KMS",
MagicCookieValue: "223e3571-7ed2-4b96-b4b3-c7eb87d7ca1d",
}

View file

@ -3,5 +3,4 @@
protoc notifier/proto/notifier.proto --go_out=plugins=grpc:../.. --go_out=../../..
protoc kms/proto/kms.proto --go_out=plugins=grpc:../.. --go_out=../../..
protoc auth/proto/auth.proto --go_out=plugins=grpc:../.. --go_out=../../..
protoc eventsearcher/proto/search.proto --go_out=plugins=grpc:../.. --go_out=../../..

View file

@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"os/exec"
"path/filepath"
"sync"
"time"
@ -166,7 +167,7 @@ func (p *notifierPlugin) initialize() error {
Managed: false,
Logger: &logger.HCLogAdapter{
Logger: hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
Name: fmt.Sprintf("%v.%v.%v", logSender, notifier.PluginName, filepath.Base(p.config.Cmd)),
Level: pluginsLogLevel,
DisableTime: true,
}),

View file

@ -1,7 +1,7 @@
// Package notifier defines the implementation for event notifier plugins.
// Notifier plugins allow to receive notifications for supported filesystem
// events such as file uploads, downloads etc. and user events such as add,
// update, delete.
// events such as file uploads, downloads etc. and provider events such as
// objects add, update, delete.
package notifier
import (
@ -22,7 +22,7 @@ const (
// Handshake is a common handshake that is shared by plugin and host.
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "SFTPGO_NOTIFIER_PLUGIN",
MagicCookieKey: "SFTPGO_PLUGIN_NOTIFIER",
MagicCookieValue: "c499b98b-cd59-4df2-92b3-6268817f4d80",
}

View file

@ -14,6 +14,7 @@ import (
"github.com/drakkan/sftpgo/v2/kms"
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk/plugin/auth"
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher"
kmsplugin "github.com/drakkan/sftpgo/v2/sdk/plugin/kms"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
"github.com/drakkan/sftpgo/v2/util"
@ -27,6 +28,7 @@ var (
// Handler defines the plugins manager
Handler Manager
pluginsLogLevel = hclog.Debug
errNoSearcher = errors.New("no events searcher plugin defined")
)
// Renderer defines the interface for generic objects rendering
@ -75,14 +77,17 @@ type Manager struct {
closed int32
done chan bool
// List of configured plugins
Configs []Config `json:"plugins" mapstructure:"plugins"`
notifLock sync.RWMutex
notifiers []*notifierPlugin
kmsLock sync.RWMutex
kms []*kmsPlugin
authLock sync.RWMutex
auths []*authPlugin
authScopes int
Configs []Config `json:"plugins" mapstructure:"plugins"`
notifLock sync.RWMutex
notifiers []*notifierPlugin
kmsLock sync.RWMutex
kms []*kmsPlugin
authLock sync.RWMutex
auths []*authPlugin
searcherLock sync.RWMutex
searcher *searcherPlugin
authScopes int
hasSearcher bool
}
// Initialize initializes the configured plugins
@ -138,6 +143,12 @@ func Initialize(configs []Config, logVerbose bool) error {
} else {
Handler.authScopes |= config.AuthOptions.Scope
}
case eventsearcher.PluginName:
plugin, err := newSearcherPlugin(config)
if err != nil {
return err
}
Handler.searcher = plugin
default:
return fmt.Errorf("unsupported plugin type: %v", config.Type)
}
@ -149,6 +160,7 @@ func Initialize(configs []Config, logVerbose bool) error {
func (m *Manager) validateConfigs() error {
kmsSchemes := make(map[string]bool)
kmsEncryptions := make(map[string]bool)
m.hasSearcher = false
for _, config := range m.Configs {
if config.Type == kmsplugin.PluginName {
@ -161,6 +173,12 @@ func (m *Manager) validateConfigs() error {
kmsSchemes[config.KMSOptions.Scheme] = true
kmsEncryptions[config.KMSOptions.EncryptedStatus] = true
}
if config.Type == eventsearcher.PluginName {
if m.hasSearcher {
return fmt.Errorf("only one eventsearcher plugin can be defined")
}
m.hasSearcher = true
}
}
return nil
}
@ -190,6 +208,38 @@ func (m *Manager) NotifyProviderEvent(timestamp time.Time, action, username, obj
}
}
// SearchFsEvents returns the filesystem events matching the specified filter and a continuation token
// to use for cursor based pagination
func (m *Manager) SearchFsEvents(startTimestamp, endTimestamp time.Time, action, username, ip, sshCmd, protocol,
instanceID, continuationToken string, status, limit int) (string, []byte, error,
) {
if !m.hasSearcher {
return "", nil, errNoSearcher
}
m.searcherLock.RLock()
plugin := m.searcher
m.searcherLock.RUnlock()
return plugin.searchear.SearchFsEvents(startTimestamp, endTimestamp, action, username, ip, sshCmd, protocol,
instanceID, continuationToken, status, limit)
}
// SearchProviderEvents returns the provider events matching the specified filter and a continuation token
// to use for cursor based pagination
func (m *Manager) SearchProviderEvents(startTimestamp, endTimestamp time.Time, action, username, ip, objectType,
objectName, instanceID, continuationToken string, limit int,
) (string, []byte, error) {
if !m.hasSearcher {
return "", nil, errNoSearcher
}
m.searcherLock.RLock()
plugin := m.searcher
m.searcherLock.RUnlock()
return plugin.searchear.SearchProviderEvents(startTimestamp, endTimestamp, action, username, ip, objectType, objectName,
instanceID, continuationToken, limit)
}
func (m *Manager) kmsEncrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, string, int32, error) {
m.kmsLock.RLock()
plugin := m.kms[kmsID]
@ -365,6 +415,15 @@ func (m *Manager) checkCrashedPlugins() {
}
}
m.authLock.RUnlock()
if m.hasSearcher {
m.searcherLock.RLock()
if m.searcher.exited() {
defer func(cfg Config) {
Handler.restartSearcherPlugin(cfg)
}(m.searcher.config)
}
m.searcherLock.RUnlock()
}
}
func (m *Manager) restartNotifierPlugin(config Config, idx int) {
@ -417,6 +476,22 @@ func (m *Manager) restartAuthPlugin(config Config, idx int) {
m.authLock.Unlock()
}
func (m *Manager) restartSearcherPlugin(config Config) {
if atomic.LoadInt32(&m.closed) == 1 {
return
}
logger.Info(logSender, "", "try to restart crashed searcher plugin %#v", config.Cmd)
plugin, err := newSearcherPlugin(config)
if err != nil {
logger.Warn(logSender, "", "unable to restart searcher plugin %#v, err: %v", config.Cmd, err)
return
}
m.searcherLock.Lock()
m.searcher = plugin
m.searcherLock.Unlock()
}
// Cleanup releases all the active plugins
func (m *Manager) Cleanup() {
atomic.StoreInt32(&m.closed, 1)
@ -441,6 +516,13 @@ func (m *Manager) Cleanup() {
a.cleanup()
}
m.authLock.Unlock()
if m.hasSearcher {
m.searcherLock.Lock()
logger.Debug(logSender, "", "cleanup searcher plugin %v", m.searcher.config.Cmd)
m.searcher.cleanup()
m.searcherLock.Unlock()
}
}
func startCheckTicker() {

83
sdk/plugin/searcher.go Normal file
View file

@ -0,0 +1,83 @@
package plugin
import (
"crypto/sha256"
"fmt"
"os/exec"
"path/filepath"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher"
)
type searcherPlugin struct {
config Config
searchear eventsearcher.Searcher
client *plugin.Client
}
func newSearcherPlugin(config Config) (*searcherPlugin, error) {
p := &searcherPlugin{
config: config,
}
if err := p.initialize(); err != nil {
logger.Warn(logSender, "", "unable to create events searcher plugin: %v, config %+v", err, config)
return nil, err
}
return p, nil
}
func (p *searcherPlugin) exited() bool {
return p.client.Exited()
}
func (p *searcherPlugin) cleanup() {
p.client.Kill()
}
func (p *searcherPlugin) initialize() error {
killProcess(p.config.Cmd)
logger.Debug(logSender, "", "create new searcher plugin %#v", p.config.Cmd)
var secureConfig *plugin.SecureConfig
if p.config.SHA256Sum != "" {
secureConfig.Checksum = []byte(p.config.SHA256Sum)
secureConfig.Hash = sha256.New()
}
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: eventsearcher.Handshake,
Plugins: eventsearcher.PluginMap,
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolGRPC,
},
AutoMTLS: p.config.AutoMTLS,
SecureConfig: secureConfig,
Managed: false,
Logger: &logger.HCLogAdapter{
Logger: hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("%v.%v.%v", logSender, eventsearcher.PluginName, filepath.Base(p.config.Cmd)),
Level: pluginsLogLevel,
DisableTime: true,
}),
},
})
rpcClient, err := client.Client()
if err != nil {
logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
return err
}
raw, err := rpcClient.Dispense(eventsearcher.PluginName)
if err != nil {
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
eventsearcher.PluginName, p.config.Cmd, err)
return err
}
p.client = client
p.searchear = raw.(eventsearcher.Searcher)
return nil
}