sftpgo/examples/convertusers/convertusers
2021-03-27 19:18:01 +01:00

208 lines
7.1 KiB
Python
Executable file

#!/usr/bin/env python
import argparse
import json
import sys
import time
try:
import pwd
import spwd
except ImportError:
pwd = None
class ConvertUsers:
def __init__(self, input_file, users_format, output_file, min_uid, max_uid, usernames, force_uid, force_gid):
self.input_file = input_file
self.users_format = users_format
self.output_file = output_file
self.min_uid = min_uid
self.max_uid = max_uid
self.usernames = usernames
self.force_uid = force_uid
self.force_gid = force_gid
self.SFTPGoUsers = []
def buildUserObject(self, username, password, home_dir, uid, gid, max_sessions, quota_size, quota_files, upload_bandwidth,
download_bandwidth, status, expiration_date, allowed_ip=[], denied_ip=[]):
return {'id':0, 'username':username, 'password':password, 'home_dir':home_dir, 'uid':uid, 'gid':gid,
'max_sessions':max_sessions, 'quota_size':quota_size, 'quota_files':quota_files, 'permissions':{'/':["*"]},
'upload_bandwidth':upload_bandwidth, 'download_bandwidth':download_bandwidth,
'status':status, 'expiration_date':expiration_date,
'filters':{'allowed_ip':allowed_ip, 'denied_ip':denied_ip}}
def addUser(self, user):
user['id'] = len(self.SFTPGoUsers) + 1
print('')
print('New user imported: {}'.format(user))
print('')
self.SFTPGoUsers.append(user)
def saveUsers(self):
if self.SFTPGoUsers:
data = {'users':self.SFTPGoUsers}
jsonData = json.dumps(data)
with open(self.output_file, 'w') as f:
f.write(jsonData)
print()
print('Number of users saved to "{}": {}. You can import them using loaddata'.format(self.output_file,
len(self.SFTPGoUsers)))
print()
sys.exit(0)
else:
print('No user imported')
sys.exit(1)
def convert(self):
if self.users_format == 'unix-passwd':
self.convertFromUnixPasswd()
elif self.users_format == 'pure-ftpd':
self.convertFromPureFTPD()
else:
self.convertFromProFTPD()
self.saveUsers()
def isUserValid(self, username, uid):
if self.usernames and not username in self.usernames:
return False
if self.min_uid >= 0 and uid < self.min_uid:
return False
if self.max_uid >= 0 and uid > self.max_uid:
return False
return True
def convertFromUnixPasswd(self):
days_from_epoch_time = time.time() / 86400
for user in pwd.getpwall():
username = user.pw_name
password = user.pw_passwd
uid = user.pw_uid
gid = user.pw_gid
home_dir = user.pw_dir
status = 1
expiration_date = 0
if not self.isUserValid(username, uid):
continue
if self.force_uid >= 0:
uid = self.force_uid
if self.force_gid >= 0:
gid = self.force_gid
# FIXME: if the passwords aren't in /etc/shadow they are probably DES encrypted and we don't support them
if password == 'x' or password == '*':
user_info = spwd.getspnam(username)
password = user_info.sp_pwdp
if not password or password == '!!' or password == '!*':
print('cannot import user "{}" without a password'.format(username))
continue
if user_info.sp_inact > 0:
last_pwd_change_diff = days_from_epoch_time - user_info.sp_lstchg
if last_pwd_change_diff > user_info.sp_inact:
status = 0
if user_info.sp_expire > 0:
expiration_date = user_info.sp_expire * 86400
self.addUser(self.buildUserObject(username, password, home_dir, uid, gid, 0, 0, 0, 0, 0, status,
expiration_date))
def convertFromProFTPD(self):
with open(self.input_file, 'r') as f:
for line in f:
fields = line.split(':')
if len(fields) > 6:
username = fields[0]
password = fields[1]
uid = int(fields[2])
gid = int(fields[3])
home_dir = fields[5]
if not self.isUserValid(username, uid):
continue
if self.force_uid >= 0:
uid = self.force_uid
if self.force_gid >= 0:
gid = self.force_gid
self.addUser(self.buildUserObject(username, password, home_dir, uid, gid, 0, 0, 0, 0, 0, 1, 0))
def convertPureFTPDIP(self, fields):
result = []
if not fields:
return result
for v in fields.split(','):
ip_mask = v.strip()
if not ip_mask:
continue
if ip_mask.count('.') < 3 and ip_mask.count(':') < 3:
print('cannot import pure-ftpd IP: {}'.format(ip_mask))
continue
if '/' not in ip_mask:
ip_mask += '/32'
result.append(ip_mask)
return result
def convertFromPureFTPD(self):
with open(self.input_file, 'r') as f:
for line in f:
fields = line.split(':')
if len(fields) > 16:
username = fields[0]
password = fields[1]
uid = int(fields[2])
gid = int(fields[3])
home_dir = fields[5]
upload_bandwidth = 0
if fields[6]:
upload_bandwidth = int(int(fields[6]) / 1024)
download_bandwidth = 0
if fields[7]:
download_bandwidth = int(int(fields[7]) / 1024)
max_sessions = 0
if fields[10]:
max_sessions = int(fields[10])
quota_files = 0
if fields[11]:
quota_files = int(fields[11])
quota_size = 0
if fields[12]:
quota_size = int(fields[12])
allowed_ip = self.convertPureFTPDIP(fields[15])
denied_ip = self.convertPureFTPDIP(fields[16])
if not self.isUserValid(username, uid):
continue
if self.force_uid >= 0:
uid = self.force_uid
if self.force_gid >= 0:
gid = self.force_gid
self.addUser(self.buildUserObject(username, password, home_dir, uid, gid, max_sessions, quota_size,
quota_files, upload_bandwidth, download_bandwidth, 1, 0, allowed_ip,
denied_ip))
if __name__ == '__main__':
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=
'Convert users to a JSON format suitable to use with loadddata')
supportedUsersFormats = []
help_text = ''
if pwd is not None:
supportedUsersFormats.append('unix-passwd')
help_text = 'To import from unix-passwd format you need the permission to read /etc/shadow that is typically granted to the root user only'
supportedUsersFormats.append('pure-ftpd')
supportedUsersFormats.append('proftpd')
parser.add_argument('input_file', type=str)
parser.add_argument('users_format', type=str, choices=supportedUsersFormats, help=help_text)
parser.add_argument('output_file', type=str)
parser.add_argument('--min-uid', type=int, default=-1, help='if >= 0 only import users with UID greater or equal ' +
'to this value. Default: %(default)s')
parser.add_argument('--max-uid', type=int, default=-1, help='if >= 0 only import users with UID lesser or equal ' +
'to this value. Default: %(default)s')
parser.add_argument('--usernames', type=str, nargs='+', default=[], help='Only import users with these usernames. ' +
'Default: %(default)s')
parser.add_argument('--force-uid', type=int, default=-1, help='if >= 0 the imported users will have this UID in ' +
'SFTPGo. Default: %(default)s')
parser.add_argument('--force-gid', type=int, default=-1, help='if >= 0 the imported users will have this GID in ' +
'SFTPGo. Default: %(default)s')
args = parser.parse_args()
convertUsers = ConvertUsers(args.input_file, args.users_format, args.output_file, args.min_uid, args.max_uid,
args.usernames, args.force_uid, args.force_gid)
convertUsers.convert()