You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
716 lines
24 KiB
716 lines
24 KiB
|
13 years ago
|
# -*- coding: utf-8 -*-
|
||
|
|
#
|
||
|
|
# vim:syntax=python:sw=4:ts=4:expandtab
|
||
|
|
|
||
|
|
# ovpntools.py
|
||
|
|
#
|
||
|
|
# Copyright (C) Adelux - 2009
|
||
|
|
#
|
||
|
|
"""
|
||
|
|
OpenVPN tools module :
|
||
|
|
CA management Class
|
||
|
|
openssl config file generation
|
||
|
|
certificate generation tools
|
||
|
|
certificate verification tools
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import subprocess
|
||
|
|
|
||
|
|
class OVPNCA:
|
||
|
|
"""
|
||
|
|
Class used to manage openvpn certificates
|
||
|
|
"""
|
||
|
|
|
||
|
|
# class constructor
|
||
|
|
def __init__(self):
|
||
|
|
self.commonName = "OpenVPN CA"
|
||
|
|
self.country = "FR"
|
||
|
|
self.state = "France"
|
||
|
|
self.city = "Poitiers"
|
||
|
|
self.organization = "OpenVPN"
|
||
|
|
self.validity_in_days = 365
|
||
|
|
self.email = "openvpn@localhost"
|
||
|
|
self.key_size = 1024
|
||
|
|
self.certs_dir = '/tmp/certs'
|
||
|
|
self.crtfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.crt'
|
||
|
|
self.keyfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.key'
|
||
|
|
self.csrfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.csr'
|
||
|
|
self.crlfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.pem'
|
||
|
|
self.interkeyfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.key'
|
||
|
|
self.intercsrfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.csr'
|
||
|
|
self.intercrtfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.crt'
|
||
|
|
self.dhfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '%s.dh' % self.key_size
|
||
|
|
self.config_file = self.certs_dir + '/' + 'openssl.cnf'
|
||
|
|
|
||
|
|
|
||
|
|
# Public Methods
|
||
|
|
def generate_ossl_config(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.mkdir(self.certs_dir)
|
||
|
|
output = ''
|
||
|
|
output += newline('HOME = ' + self.certs_dir)
|
||
|
|
output += newline('RADNFILE = ' + self.certs_dir + '/.rnd')
|
||
|
|
output += newline('oid_section = new_oids')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[new_oids]')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ ca ]')
|
||
|
|
output += newline('default_ca = CA_default')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ CA_default ]')
|
||
|
|
output += newline('dir = ' + self.certs_dir)
|
||
|
|
output += newline('certs = $dir')
|
||
|
|
output += newline('crl_dir = $dir')
|
||
|
|
output += newline('database = $dir/index.txt')
|
||
|
|
output += newline('new_certs_dir = $dir')
|
||
|
|
output += newline('certificate = %s') % self.crtfile
|
||
|
|
output += newline('serial = $dir/serial')
|
||
|
|
output += newline('crl = %s') % self.crlfile
|
||
|
|
output += newline('private_key = %s') % self.keyfile
|
||
|
|
output += newline('RANDFILE = $dir/.rand')
|
||
|
|
output += newline('x509_extensions = usr_cert')
|
||
|
|
output += newline('default_days = %d' % self.validity_in_days)
|
||
|
|
output += newline('default_crl_days= 30')
|
||
|
|
output += newline('default_md = md5')
|
||
|
|
output += newline('preserve = no')
|
||
|
|
output += newline('policy = policy_match')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ policy_match ]')
|
||
|
|
output += newline('countryName = match')
|
||
|
|
output += newline('stateOrProvinceName = match')
|
||
|
|
output += newline('organizationName = match')
|
||
|
|
output += newline('organizationalUnitName = optional')
|
||
|
|
output += newline('commonName = supplied')
|
||
|
|
output += newline('emailAddress = optional')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ policy_anything ]')
|
||
|
|
output += newline('countryName = optional')
|
||
|
|
output += newline('stateOrProvinceName = optional')
|
||
|
|
output += newline('localityName = optional')
|
||
|
|
output += newline('organizationName = optional')
|
||
|
|
output += newline('organizationalUnitName = optional')
|
||
|
|
output += newline('commonName = supplied')
|
||
|
|
output += newline('emailAddress = optional')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ req ]')
|
||
|
|
output += newline('default_bits = %s' % self.key_size)
|
||
|
|
output += newline('default_keyfile = privkey.pem')
|
||
|
|
output += newline('distinguished_name = req_distinguished_name')
|
||
|
|
output += newline('attributes = req_attributes')
|
||
|
|
output += newline('x509_extensions = v3_ca ')
|
||
|
|
output += newline('string_mask = nombstr')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ req_distinguished_name ]')
|
||
|
|
output += newline('countryName = Country Name (2 letter code)')
|
||
|
|
output += newline('countryName_default = ' + self.country)
|
||
|
|
output += newline('countryName_min = 2')
|
||
|
|
output += newline('countryName_max = 2')
|
||
|
|
output += newline('stateOrProvinceName = State or Province Name')
|
||
|
|
output += newline('stateOrProvinceName_default = ' + self.state)
|
||
|
|
output += newline('localityName = Locality Name (eg, city)')
|
||
|
|
output += newline('localityName_default = ' + self.city)
|
||
|
|
output += newline('0.organizationName = Organization Name')
|
||
|
|
output += newline('0.organizationName_default = ' + self.organization)
|
||
|
|
output += newline('organizationalUnitName = Organizational Unit Name')
|
||
|
|
output += newline('commonName = Common Name ')
|
||
|
|
output += newline('commonName_default = ' + self.commonName)
|
||
|
|
output += newline('commonName_max = 64')
|
||
|
|
output += newline('emailAddress = Email Address')
|
||
|
|
output += newline('emailAddress_default = ' + self.email)
|
||
|
|
output += newline('emailAddress_max = 40')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ req_attributes ]')
|
||
|
|
output += newline('challengePassword = A challenge password')
|
||
|
|
output += newline('challengePassword_min = 4')
|
||
|
|
output += newline('challengePassword_max = 20')
|
||
|
|
output += newline('unstructuredName = An optional company name')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ usr_cert ]')
|
||
|
|
output += newline('basicConstraints=CA:FALSE')
|
||
|
|
output += newline('nsComment = "OpenSSL Generated Certificate"')
|
||
|
|
output += newline('subjectKeyIdentifier=hash')
|
||
|
|
output += newline('authorityKeyIdentifier=keyid,issuer:always')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ v3_req ]')
|
||
|
|
output += newline('basicConstraints = CA:FALSE')
|
||
|
|
output += newline('keyUsage = nonRepudiation, digitalSignature, keyEncipherment')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ v3_ca ]')
|
||
|
|
output += newline('subjectKeyIdentifier=hash')
|
||
|
|
output += newline('authorityKeyIdentifier=keyid:always,issuer:always')
|
||
|
|
output += newline('basicConstraints = CA:true')
|
||
|
|
output += newline('')
|
||
|
|
output += newline('[ crl_ext ]')
|
||
|
|
output += newline('authorityKeyIdentifier=keyid:always,issuer:always')
|
||
|
|
f = open(self.config_file ,'w')
|
||
|
|
print >> f, output
|
||
|
|
f.close()
|
||
|
|
|
||
|
|
def read_ossl_config(self):
|
||
|
|
if os.path.exists(self.config_file):
|
||
|
|
f = open(self.config_file, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def create_CA_certs(self):
|
||
|
|
self.generate_crt_key()
|
||
|
|
self.generate_inter_csr_key()
|
||
|
|
self.generate_inter_crt()
|
||
|
|
self.generate_dh()
|
||
|
|
self.generate_crl()
|
||
|
|
|
||
|
|
def create_user_certs(self,commonName):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
|
||
|
|
|
||
|
|
def generate_crt_key(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
cmd = 'openssl req -batch -days %d -nodes -new -x509 -keyout %s -out %s -config %s' % (self.validity_in_days,self.keyfile,self.crtfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating CRT/KEY with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_ca_key(self):
|
||
|
|
if os.path.exists(self.keyfile):
|
||
|
|
f = open(self.keyfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_ca(self):
|
||
|
|
if os.path.exists(self.crtfile):
|
||
|
|
f = open(self.crtfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_csr(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,self.keyfile,self.csrfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating CSR with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def generate_inter_csr_key(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,self.interkeyfile,self.intercsrfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating inter CSR/KEY with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_inter_csr(self):
|
||
|
|
if os.path.exists(self.intercsrfile):
|
||
|
|
f = open(self.intercsrfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_key(self):
|
||
|
|
if os.path.exists(self.interkeyfile):
|
||
|
|
f = open(self.interkeyfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_inter_crt(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
if not os.path.exists(self.certs_dir + '/index.txt'):
|
||
|
|
f = open(self.certs_dir + '/index.txt','w')
|
||
|
|
f.close()
|
||
|
|
if not os.path.exists(self.certs_dir + '/serial'):
|
||
|
|
f = open(self.certs_dir + '/serial','w')
|
||
|
|
f.write('00\n')
|
||
|
|
f.close()
|
||
|
|
cmd = 'openssl ca -batch -extensions v3_ca -days %d -out %s -in %s -config %s' % (self.validity_in_days,self.intercrtfile,self.intercsrfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating inter CRT with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_certificate(self):
|
||
|
|
if os.path.exists(self.intercrtfile):
|
||
|
|
f = open(self.intercrtfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_dh(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
cmd = 'openssl dhparam -out %s %s' % (self.dhfile,self.key_size)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating DH with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_dh(self):
|
||
|
|
if os.path.exists(self.dhfile):
|
||
|
|
f = open(self.dhfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_crl_verify(self):
|
||
|
|
if os.path.exists(self.crlfile):
|
||
|
|
f = open(self.crlfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_crl(self):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
cmd = 'openssl ca -gencrl -out %s -config %s' % (self.crlfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating CRL with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_crl(self):
|
||
|
|
if os.path.exists(self.crlfile):
|
||
|
|
f = open(self.crlfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_user_csr_key(self,commonName):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
userkeyfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.key'
|
||
|
|
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
|
||
|
|
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,userkeyfile,usercsrfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating user CSR/KEY with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_user_key(self,commonName):
|
||
|
|
userkeyfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.key'
|
||
|
|
if os.path.exists(userkeyfile):
|
||
|
|
f = open(userkeyfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_user_csr(self,commonName):
|
||
|
|
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
|
||
|
|
if os.path.exists(usercsrfile):
|
||
|
|
f = open(usercsrfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_user_crt(self,commonName):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
usercrtfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.crt'
|
||
|
|
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
|
||
|
|
cmd = 'openssl ca -batch -days %d -out %s -in %s -config %s' % (self.validity_in_days,usercrtfile,usercsrfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating user CRT with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_user_certificate(self,commonName):
|
||
|
|
usercrtfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.crt'
|
||
|
|
if os.path.exists(usercrtfile):
|
||
|
|
f = open(usercrtfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_user_pkcs(self,commonName,password=None,basedir=None):
|
||
|
|
if basedir is None:
|
||
|
|
basedir = self.certs_dir
|
||
|
|
if not os.path.exists(basedir):
|
||
|
|
os.makedirs(basedir)
|
||
|
|
usercrt = basedir + '/' + commonName.replace(' ','_') + '.crt'
|
||
|
|
userp12 = basedir + '/' + commonName.replace(' ','_') + '.p12'
|
||
|
|
userkey = basedir + '/' + commonName.replace(' ','_') + '.key'
|
||
|
|
crtfile = basedir + '/' + self.commonName.replace(' ','_') + '.crt'
|
||
|
|
if password == None:
|
||
|
|
cmd = 'openssl pkcs12 -export -in %s -inkey %s -certfile %s -out %s -passout pass:%s' % (usercrt,userkey,crtfile,userp12,password)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
else:
|
||
|
|
cmd = 'openssl pkcs12 -export -in %s -inkey %s -certfile %s -out %s -passout pass:%s' % (usercrt,userkey,crtfile,userp12,password)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
|
||
|
|
if proc.returncode:
|
||
|
|
output = "Error while creating user PKCS with return code %d: \n " % proc.returncode
|
||
|
|
output += stderr_value
|
||
|
|
return str(output)
|
||
|
|
else:
|
||
|
|
return str(stdout_value)
|
||
|
|
|
||
|
|
def read_user_pkcs(self,commonName,basedir=None):
|
||
|
|
if basedir is None:
|
||
|
|
basedir = self.certs_dir
|
||
|
|
##userp12 = basedir + '/' + commonName.replace(' ','_') + '.p12'
|
||
|
|
userp12 = self.certs_dir + '/' + commonName.replace(' ','_') + '.p12'
|
||
|
|
if os.path.exists(userp12):
|
||
|
|
f = open(userp12, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def generate_user_crl(self,commonName):
|
||
|
|
if not os.path.exists(self.certs_dir):
|
||
|
|
os.makedirs(self.certs_dir)
|
||
|
|
usercrlfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.pem'
|
||
|
|
cmd = 'openssl ca -gencrl -out %s -config %s' % (usercrlfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while creating user CRL with return code %d: " % proc.returncode
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
def read_user_crl(self,commonName):
|
||
|
|
usercrlfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.pem'
|
||
|
|
if os.path.exists(usercrlfile):
|
||
|
|
f = open(usercrlfile, 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def clean_all(self):
|
||
|
|
if os.path.exists(self.certs_dir):
|
||
|
|
for file in os.listdir(self.certs_dir):
|
||
|
|
os.remove(self.certs_dir + "/" + file)
|
||
|
|
os.rmdir(self.certs_dir)
|
||
|
|
|
||
|
|
def read_index(self):
|
||
|
|
if os.path.exists(self.certs_dir + '/' + 'index.txt'):
|
||
|
|
f = open(self.certs_dir + '/' + 'index.txt', 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_index_old(self):
|
||
|
|
if os.path.exists(self.certs_dir + '/' + 'index.txt.old'):
|
||
|
|
f = open(self.certs_dir + '/' + 'index.txt.old', 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_index_attr(self):
|
||
|
|
if os.path.exists(self.certs_dir + '/' + 'index.txt.attr'):
|
||
|
|
f = open(self.certs_dir + '/' + 'index.txt.attr', 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_serial(self):
|
||
|
|
if os.path.exists(self.certs_dir + '/' + 'serial'):
|
||
|
|
f = open(self.certs_dir + '/' + 'serial', 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def read_user_cert_index(self):
|
||
|
|
if os.path.exists(self.certs_dir + '/' + 'serial'):
|
||
|
|
f = open(self.certs_dir + '/' + 'serial', 'r')
|
||
|
|
output = f.read()
|
||
|
|
f.close()
|
||
|
|
# convert value in dec, substract 1
|
||
|
|
intval = int(output,16) -1
|
||
|
|
# convert value as 2 digits
|
||
|
|
output = '%.2d' % intval
|
||
|
|
return output
|
||
|
|
else:
|
||
|
|
return 'File not found'
|
||
|
|
|
||
|
|
def revoke_user(self,commonName):
|
||
|
|
cmd='openssl ca -revoke %s/%s.crt -cert %s -keyfile %s -config %s' % (self.certs_dir,commonName,self.crtfile,self.keyfile,self.config_file)
|
||
|
|
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||
|
|
stdout_value, stderr_value = proc.communicate()
|
||
|
|
if proc.returncode:
|
||
|
|
print "Error while revoking user %s with return code %d: " % (commonName,proc.returncode)
|
||
|
|
print str(stderr_value)
|
||
|
|
return 1
|
||
|
|
else:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
# Private Methods
|
||
|
|
|
||
|
|
|
||
|
|
class OVPNConfig:
|
||
|
|
""" Object class used to manage and generate openvpn config """
|
||
|
|
|
||
|
|
|
||
|
|
# constructor/destructor
|
||
|
|
def __init__(self):
|
||
|
|
self.server_config_dir = "/etc/openvpn"
|
||
|
|
self.client_config_dir = self.server_config_dir + "clients"
|
||
|
|
self.ca_cert_file = "/etc/vpnusers/ca.crt"
|
||
|
|
self.inter_cert_file = "/etc/vpnusers/inter.crt"
|
||
|
|
self.inter_key_file = "/etc/vpnusers/inter.key"
|
||
|
|
self.crl_verify_file = "/etc/vpnusers/crl.pem"
|
||
|
|
self.dh_key_file = "/etc/vpnusers/dh1024.pem"
|
||
|
|
self.client_pkcs_file = "pkcs.12"
|
||
|
|
self.server_mode = "server"
|
||
|
|
self.client_to_client = True
|
||
|
|
self.ip_address = "0.0.0.0"
|
||
|
|
self.port = "0"
|
||
|
|
self.protocol = "tcp"
|
||
|
|
self.device = "tun"
|
||
|
|
self.vpn_network = "0.0.0.0"
|
||
|
|
self.vpn_mask = "0.0.0.0"
|
||
|
|
self.dns_list = "0.0.0.0"
|
||
|
|
self.wins_list = "0.0.0.0"
|
||
|
|
self.routes_list = "0.0.0.0"
|
||
|
|
self.suffix_dns = None
|
||
|
|
self.compress_data = False
|
||
|
|
self.floating_server = False
|
||
|
|
self.redirect_gw = True
|
||
|
|
self.log_file = "/dev/null/log"
|
||
|
|
self.log_verbosity = "3"
|
||
|
|
self.status_file = "/dev/null/status"
|
||
|
|
self.status_version = 2
|
||
|
|
self.mute = "0"
|
||
|
|
self.daemon_user = "nobody"
|
||
|
|
self.daemon_group = "nogroup"
|
||
|
|
self.dns_name = None
|
||
|
|
self.keep_alive = 10
|
||
|
|
self.keep_alive_retry = 120
|
||
|
|
self.mtu_test = True
|
||
|
|
self.tls_server = True
|
||
|
|
self.max_clients = 100
|
||
|
|
self.persist_key = True
|
||
|
|
self.persist_tun = True
|
||
|
|
self.management_address = "0.0.0.0"
|
||
|
|
self.management_port = "5555"
|
||
|
|
|
||
|
|
# public methods
|
||
|
|
def generate_server_config(self):
|
||
|
|
config = newline("## OpenVPN Server Config ##")
|
||
|
|
config += newline("# Dir configuration")
|
||
|
|
config += newline("cd %s" % self.server_config_dir)
|
||
|
|
config += newline("client-config-dir %s" % self.client_config_dir)
|
||
|
|
config += newline("")
|
||
|
|
config += newline("# Certificates configuration")
|
||
|
|
config += newline("ca %s" % self.ca_cert_file)
|
||
|
|
config += newline("cert %s" % self.inter_cert_file)
|
||
|
|
config += newline("key %s" % self.inter_key_file)
|
||
|
|
config += newline("crl-verify %s" % self.crl_verify_file)
|
||
|
|
config += newline("dh %s" % self.dh_key_file)
|
||
|
|
config += newline("")
|
||
|
|
config += newline("# Network configuration")
|
||
|
|
config += newline("local %s" % self.ip_address)
|
||
|
|
config += newline("port %s" % self.port)
|
||
|
|
config += newline("proto %s" % self.protocol.lower())
|
||
|
|
config += newline("dev %s" % self.device.lower())
|
||
|
|
config += newline("server %s %s" % (self.vpn_network, self.vpn_mask))
|
||
|
|
config += newline("route %s %s" % (self.vpn_network, self.vpn_mask))
|
||
|
|
config += newline("")
|
||
|
|
config += newline("# Client configuration")
|
||
|
|
if self.redirect_gw:
|
||
|
|
config += newline("push \"redirect-gateway def1\"")
|
||
|
|
else:
|
||
|
|
config += newline("# push redirect-gateway def1")
|
||
|
|
dns_list = self.dns_list.split(',')
|
||
|
|
for dns_address in dns_list:
|
||
|
|
dns_address = dns_address.replace(" ","")
|
||
|
|
if dns_address:
|
||
|
|
config += newline("push \"dhcp-option DNS %s\"" % dns_address)
|
||
|
|
if self.suffix_dns is not None:
|
||
|
|
config += newline("push \"dhcp-option DOMAIN %s\"" % self.suffix_dns)
|
||
|
|
wins_list = self.wins_list.split(',')
|
||
|
|
for wins_address in wins_list:
|
||
|
|
wins_address = wins_address.replace(" ","")
|
||
|
|
if wins_address:
|
||
|
|
config += newline("push \"dhcp-option WINS %s\"" % wins_address)
|
||
|
|
routes_list = self.routes_list.split(',')
|
||
|
|
for route in routes_list:
|
||
|
|
route = route.replace(" ","")
|
||
|
|
if route:
|
||
|
|
config += newline("push \"route %s\"" % route)
|
||
|
|
config += newline("")
|
||
|
|
config += newline("# General configuration")
|
||
|
|
if self.compress_data:
|
||
|
|
config += newline("comp-lzo")
|
||
|
|
else:
|
||
|
|
config += newline("# comp-lzo")
|
||
|
|
if self.floating_server:
|
||
|
|
config += newline("float")
|
||
|
|
else:
|
||
|
|
config += newline("# float")
|
||
|
|
config += newline("log-append %s" % self.log_file)
|
||
|
|
config += newline("status %s" % self.status_file)
|
||
|
|
config += newline("status-version %s" % self.status_version)
|
||
|
|
config += newline("verb %s" % self.log_verbosity)
|
||
|
|
config += newline("mute %s" % self.mute)
|
||
|
|
config += newline("mode %s" % self.server_mode)
|
||
|
|
if self.client_to_client:
|
||
|
|
config += newline("client-to-client")
|
||
|
|
else:
|
||
|
|
config += newline("# client-to-client")
|
||
|
|
config += newline("keepalive %s %s" % (self.keep_alive, self.keep_alive_retry))
|
||
|
|
if self.mtu_test and self.protocol.lower == 'udp':
|
||
|
|
config += newline("mtu-test")
|
||
|
|
else:
|
||
|
|
config += newline("# mtu-test")
|
||
|
|
if self.tls_server:
|
||
|
|
config += newline("tls-server")
|
||
|
|
else:
|
||
|
|
config += newline("# tls-server")
|
||
|
|
config += newline("max-clients %s" % self.max_clients)
|
||
|
|
if self.persist_key:
|
||
|
|
config += newline("persist-key")
|
||
|
|
else:
|
||
|
|
config += newline("# persist-key")
|
||
|
|
if self.persist_tun:
|
||
|
|
config += newline("persist-tun")
|
||
|
|
else:
|
||
|
|
config += newline("# persist-tun")
|
||
|
|
config += newline("management %s %s" % (self.management_address, self.management_port))
|
||
|
|
return config
|
||
|
|
|
||
|
|
def generate_client_config(self):
|
||
|
|
config = newline("## OpenVPN Client configuration ##")
|
||
|
|
config += newline("remote %s" % self.ip_address)
|
||
|
|
if self.dns_name is not None:
|
||
|
|
config += newline("remote %s" % self.dns_name)
|
||
|
|
config += newline("port %s" % self.port)
|
||
|
|
config += newline("proto %s" % self.protocol)
|
||
|
|
config += newline("dev %s" % self.device)
|
||
|
|
config += newline("pull")
|
||
|
|
config += newline("ping %s" % self.keep_alive)
|
||
|
|
if self.compress_data:
|
||
|
|
config += newline("comp-lzo")
|
||
|
|
else:
|
||
|
|
config += newline("# comp-lzo")
|
||
|
|
config += newline("verb %s" % self.log_verbosity)
|
||
|
|
config += newline("mute %s" % self.mute)
|
||
|
|
if self.tls_server:
|
||
|
|
config += newline("tls-client")
|
||
|
|
else:
|
||
|
|
config += newline("# tls-client")
|
||
|
|
config += newline("pkcs12 %s" % self.client_pkcs_file)
|
||
|
|
return config
|
||
|
|
|
||
|
|
# private methods
|
||
|
|
|
||
|
|
|
||
|
|
# Other Funtions
|
||
|
|
def newline(line):
|
||
|
|
return line + '\n'
|
||
|
|
|
||
|
|
|
||
|
|
## TESTS OVPNCA
|
||
|
|
#newca = OVPNCA()
|
||
|
|
|
||
|
|
#newca.generate_ossl_config()
|
||
|
|
#newca.create_CA_certs()
|
||
|
|
|
||
|
|
#print "private_key :"
|
||
|
|
#print newca.read_private_key()
|
||
|
|
|
||
|
|
#print "certificate :"
|
||
|
|
#print newca.read_certificate()
|
||
|
|
|
||
|
|
#newca.generate_user_csr_key('toto')
|
||
|
|
#print "user key :"
|
||
|
|
#print newca.read_user_key('toto')
|
||
|
|
#print "user csr :"
|
||
|
|
#print newca.read_user_csr('toto')
|
||
|
|
|
||
|
|
#newca.generate_user_crt('toto')
|
||
|
|
#print "user certificate :"
|
||
|
|
#print newca.read_user_certificate('toto')
|
||
|
|
|
||
|
|
#newca.generate_user_pkcs('toto')
|
||
|
|
|
||
|
|
#newca.generate_user_crl('toto')
|
||
|
|
#print "user crl :"
|
||
|
|
#print newca.read_user_crl('toto')
|
||
|
|
|
||
|
|
#newca.clean_all()
|
||
|
|
|
||
|
|
## TESTS OVPNConfig
|
||
|
|
#newconfig = OVPNConfig()
|
||
|
|
#print newconfig.generate_server_config()
|
||
|
|
#print newconfig.generate_client_config()
|