mirror of
https://github.com/drakkan/sftpgo.git
synced 2024-11-25 09:00:27 +00:00
da5a061b65
Fixes #495
115 lines
3.6 KiB
Python
Executable file
115 lines
3.6 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
from datetime import datetime
|
|
import sys
|
|
import time
|
|
|
|
import pytz
|
|
import requests
|
|
|
|
try:
|
|
import urllib.parse as urlparse
|
|
except ImportError:
|
|
import urlparse
|
|
|
|
# change base_url to point to your SFTPGo installation
|
|
base_url = "http://127.0.0.1:8080"
|
|
# set to False if you want to skip TLS certificate validation
|
|
verify_tls_cert = True
|
|
# set the credentials for a valid admin here
|
|
admin_user = "admin"
|
|
admin_password = "password"
|
|
|
|
|
|
class CheckRetention:
|
|
|
|
def __init__(self):
|
|
self.limit = 100
|
|
self.offset = 0
|
|
self.access_token = ""
|
|
self.access_token_expiration = None
|
|
|
|
def printLog(self, message):
|
|
print("{} - {}".format(datetime.now(), message))
|
|
|
|
def checkAccessToken(self):
|
|
if self.access_token != "" and self.access_token_expiration:
|
|
expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
|
|
# we don't use total_seconds to be python 2 compatible
|
|
seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
|
|
if seconds_to_expire > 180:
|
|
return
|
|
|
|
auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
|
|
r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
|
|
if r.status_code != 200:
|
|
self.printLog("error getting access token: {}".format(r.text))
|
|
sys.exit(1)
|
|
self.access_token = r.json()["access_token"]
|
|
self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
|
|
"%Y-%m-%dT%H:%M:%SZ"))
|
|
|
|
def getAuthHeader(self):
|
|
self.checkAccessToken()
|
|
return {"Authorization": "Bearer " + self.access_token}
|
|
|
|
def waitForRentionCheck(self, username):
|
|
while True:
|
|
auth_header = self.getAuthHeader()
|
|
r = requests.get(urlparse.urljoin(base_url, "api/v2/retention/users/checks"), headers=auth_header, verify=verify_tls_cert,
|
|
timeout=10)
|
|
if r.status_code != 200:
|
|
self.printLog("error getting retention checks while waiting for {}: {}".format(username, r.text))
|
|
sys.exit(1)
|
|
|
|
checking = False
|
|
for check in r.json():
|
|
if check["username"] == username:
|
|
checking = True
|
|
if not checking:
|
|
break
|
|
self.printLog("waiting for the retention check to complete for user {}".format(username))
|
|
time.sleep(2)
|
|
|
|
self.printLog("retention check for user {} finished".format(username))
|
|
|
|
def checkUserRetention(self, username):
|
|
self.printLog("starting retention check for user {}".format(username))
|
|
auth_header = self.getAuthHeader()
|
|
retention = [
|
|
{
|
|
"path": "/",
|
|
"retention": 168,
|
|
"delete_empty_dirs": True,
|
|
"ignore_user_permissions": False
|
|
}
|
|
]
|
|
r = requests.post(urlparse.urljoin(base_url, "api/v2/retention/users/" + username + "/check"), headers=auth_header,
|
|
json=retention, verify=verify_tls_cert, timeout=10)
|
|
if r.status_code != 202:
|
|
self.printLog("error starting retention check for user {}: {}".format(username, r.text))
|
|
sys.exit(1)
|
|
self.waitForRentionCheck(username)
|
|
|
|
def checkUsersRetention(self):
|
|
while True:
|
|
self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
|
|
auth_header = self.getAuthHeader()
|
|
payload = {"limit":self.limit, "offset":self.offset}
|
|
r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
|
|
verify=verify_tls_cert, timeout=10)
|
|
if r.status_code != 200:
|
|
self.printLog("error getting users: {}".format(r.text))
|
|
sys.exit(1)
|
|
users = r.json()
|
|
for user in users:
|
|
self.checkUserRetention(user["username"])
|
|
|
|
self.offset += len(users)
|
|
if len(users) < self.limit:
|
|
break
|
|
|
|
|
|
if __name__ == '__main__':
|
|
c = CheckRetention()
|
|
c.checkUsersRetention()
|