#!/usr/bin/env python from datetime import datetime import sys import time import pytz import requests try: import urllib.parse as urlparse except ImportError: import urlparse # change base_url to point to your SFTPGo installation base_url = "http://127.0.0.1:8080" # set to False if you want to skip TLS certificate validation verify_tls_cert = True # set the credentials for a valid admin here admin_user = "admin" admin_password = "password" # set your update conditions here def needQuotaUpdate(user): if user["status"] == 0: # inactive user return False if user["quota_size"] == 0 and user["quota_files"] == 0: # no quota restrictions return False return True class UpdateQuota: def __init__(self): self.limit = 100 self.offset = 0 self.access_token = "" self.access_token_expiration = None def printLog(self, message): print("{} - {}".format(datetime.now(), message)) def checkAccessToken(self): if self.access_token != "" and self.access_token_expiration: expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC) # we don't use total_seconds to be python 2 compatible seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds if seconds_to_expire > 180: return auth = requests.auth.HTTPBasicAuth(admin_user, admin_password) r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting access token: {}".format(r.text)) sys.exit(1) self.access_token = r.json()["access_token"] self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"], "%Y-%m-%dT%H:%M:%SZ")) def getAuthHeader(self): self.checkAccessToken() return {"Authorization": "Bearer " + self.access_token} def waitForQuotaUpdate(self, username): while True: auth_header = self.getAuthHeader() r = requests.get(urlparse.urljoin(base_url, "api/v2/quotas/users/scans"), headers=auth_header, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting quota scans while waiting for {}: {}".format(username, r.text)) sys.exit(1) scanning = False for scan in r.json(): if scan["username"] == username: scanning = True if not scanning: break self.printLog("waiting for the quota scan to complete for user {}".format(username)) time.sleep(2) self.printLog("quota update for user {} finished".format(username)) def updateUserQuota(self, username): self.printLog("starting quota update for user {}".format(username)) auth_header = self.getAuthHeader() r = requests.post(urlparse.urljoin(base_url, "api/v2/quotas/users/" + username + "/scan"), headers=auth_header, verify=verify_tls_cert, timeout=10) if r.status_code != 202: self.printLog("error starting quota scan for user {}: {}".format(username, r.text)) sys.exit(1) self.waitForQuotaUpdate(username) def updateUsersQuota(self): while True: self.printLog("get users, limit {} offset {}".format(self.limit, self.offset)) auth_header = self.getAuthHeader() payload = {"limit":self.limit, "offset":self.offset} r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting users: {}".format(r.text)) sys.exit(1) users = r.json() for user in users: if needQuotaUpdate(user): self.updateUserQuota(user["username"]) else: self.printLog("user {} does not need a quota update".format(user["username"])) self.offset += len(users) if len(users) < self.limit: break if __name__ == '__main__': q = UpdateQuota() q.updateUsersQuota()