2021-04-26 19:53:09 +00:00
#!/usr/bin/env python
from datetime import datetime
import sys
import time
import pytz
import requests
import urllib.parse as urlparse
except ImportError:
import urlparse
# change base_url to point to your SFTPGo installation
base_url = ""
# set to False if you want to skip TLS certificate validation
verify_tls_cert = True
# set the credentials for a valid admin here
admin_user = "admin"
admin_password = "password"
# set your update conditions here
def needQuotaUpdate(user):
if user["status"] == 0: # inactive user
return False
if user["quota_size"] == 0 and user["quota_files"] == 0: # no quota restrictions
return False
return True
class UpdateQuota:
def __init__(self):
self.limit = 100
self.offset = 0
self.access_token = ""
self.access_token_expiration = None
def printLog(self, message):
print("{} - {}".format(datetime.now(), message))
def checkAccessToken(self):
if self.access_token != "" and self.access_token_expiration:
expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
# we don't use total_seconds to be python 2 compatible
seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
if seconds_to_expire > 180:
auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
2021-04-28 17:16:15 +00:00
r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
2021-04-26 19:53:09 +00:00
if r.status_code != 200:
self.printLog("error getting access token: {}".format(r.text))
self.access_token = r.json()["access_token"]
self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
def getAuthHeader(self):
return {"Authorization": "Bearer " + self.access_token}
def waitForQuotaUpdate(self, username):
while True:
auth_header = self.getAuthHeader()
2021-09-25 10:20:31 +00:00
r = requests.get(urlparse.urljoin(base_url, "api/v2/quotas/users/scans"), headers=auth_header, verify=verify_tls_cert,
2021-04-28 17:16:15 +00:00
2021-04-26 19:53:09 +00:00
if r.status_code != 200:
self.printLog("error getting quota scans while waiting for {}: {}".format(username, r.text))
scanning = False
for scan in r.json():
if scan["username"] == username:
scanning = True
if not scanning:
self.printLog("waiting for the quota scan to complete for user {}".format(username))
self.printLog("quota update for user {} finished".format(username))
def updateUserQuota(self, username):
self.printLog("starting quota update for user {}".format(username))
auth_header = self.getAuthHeader()
2021-09-25 10:20:31 +00:00
r = requests.post(urlparse.urljoin(base_url, "api/v2/quotas/users/" + username + "/scan"), headers=auth_header,
verify=verify_tls_cert, timeout=10)
2021-04-26 19:53:09 +00:00
if r.status_code != 202:
self.printLog("error starting quota scan for user {}: {}".format(username, r.text))
def updateUsersQuota(self):
while True:
self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
auth_header = self.getAuthHeader()
payload = {"limit":self.limit, "offset":self.offset}
r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
2021-04-28 17:16:15 +00:00
verify=verify_tls_cert, timeout=10)
2021-04-26 19:53:09 +00:00
if r.status_code != 200:
self.printLog("error getting users: {}".format(r.text))
users = r.json()
for user in users:
if needQuotaUpdate(user):
self.printLog("user {} does not need a quota update".format(user["username"]))
self.offset += len(users)
if len(users) < self.limit:
if __name__ == '__main__':
q = UpdateQuota()