grpc.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package notifier
  2. import (
  3. "context"
  4. "time"
  5. "google.golang.org/protobuf/types/known/emptypb"
  6. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
  7. )
  8. const (
  9. rpcTimeout = 20 * time.Second
  10. )
  11. // GRPCClient is an implementation of Notifier interface that talks over RPC.
  12. type GRPCClient struct {
  13. client proto.NotifierClient
  14. }
  15. // NotifyFsEvent implements the Notifier interface
  16. func (c *GRPCClient) NotifyFsEvent(event *FsEvent) error {
  17. ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
  18. defer cancel()
  19. _, err := c.client.SendFsEvent(ctx, &proto.FsEvent{
  20. Timestamp: event.Timestamp,
  21. Action: event.Action,
  22. Username: event.Username,
  23. FsPath: event.Path,
  24. FsTargetPath: event.TargetPath,
  25. SshCmd: event.SSHCmd,
  26. FileSize: event.FileSize,
  27. Protocol: event.Protocol,
  28. Ip: event.IP,
  29. Status: int32(event.Status),
  30. VirtualPath: event.VirtualPath,
  31. VirtualTargetPath: event.VirtualTargetPath,
  32. SessionId: event.SessionID,
  33. FsProvider: int32(event.FsProvider),
  34. Bucket: event.Bucket,
  35. Endpoint: event.Endpoint,
  36. OpenFlags: int32(event.OpenFlags),
  37. })
  38. return err
  39. }
  40. // NotifyProviderEvent implements the Notifier interface
  41. func (c *GRPCClient) NotifyProviderEvent(event *ProviderEvent) error {
  42. ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
  43. defer cancel()
  44. _, err := c.client.SendProviderEvent(ctx, &proto.ProviderEvent{
  45. Timestamp: event.Timestamp,
  46. Action: event.Action,
  47. ObjectType: event.ObjectType,
  48. Username: event.Username,
  49. Ip: event.IP,
  50. ObjectName: event.ObjectName,
  51. ObjectData: event.ObjectData,
  52. })
  53. return err
  54. }
  55. // GRPCServer defines the gRPC server that GRPCClient talks to.
  56. type GRPCServer struct {
  57. Impl Notifier
  58. }
  59. // SendFsEvent implements the serve side fs notify method
  60. func (s *GRPCServer) SendFsEvent(ctx context.Context, req *proto.FsEvent) (*emptypb.Empty, error) {
  61. event := &FsEvent{
  62. Action: req.Action,
  63. Username: req.Username,
  64. Path: req.FsPath,
  65. TargetPath: req.FsTargetPath,
  66. VirtualPath: req.VirtualPath,
  67. SSHCmd: req.SshCmd,
  68. FileSize: req.FileSize,
  69. Status: int(req.Status),
  70. Protocol: req.Protocol,
  71. IP: req.Ip,
  72. SessionID: req.SessionId,
  73. Timestamp: req.Timestamp,
  74. FsProvider: int(req.FsProvider),
  75. Bucket: req.Bucket,
  76. Endpoint: req.Endpoint,
  77. OpenFlags: int(req.OpenFlags),
  78. }
  79. err := s.Impl.NotifyFsEvent(event)
  80. return &emptypb.Empty{}, err
  81. }
  82. // SendProviderEvent implements the serve side provider event notify method
  83. func (s *GRPCServer) SendProviderEvent(ctx context.Context, req *proto.ProviderEvent) (*emptypb.Empty, error) {
  84. event := &ProviderEvent{
  85. Action: req.Action,
  86. Username: req.Username,
  87. ObjectType: req.ObjectType,
  88. ObjectName: req.ObjectName,
  89. IP: req.Ip,
  90. ObjectData: req.ObjectData,
  91. Timestamp: req.Timestamp,
  92. }
  93. err := s.Impl.NotifyProviderEvent(event)
  94. return &emptypb.Empty{}, err
  95. }