Console OpenVPN, développée en Django Permet de gérer des instances, serveurs, et comptes OpenVPN
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

715 lines
24 KiB

# -*- coding: utf-8 -*-
#
# vim:syntax=python:sw=4:ts=4:expandtab
# ovpntools.py
#
# Copyright (C) Adelux - 2009
#
"""
OpenVPN tools module :
CA management Class
openssl config file generation
certificate generation tools
certificate verification tools
"""
import os
import subprocess
class OVPNCA:
"""
Class used to manage openvpn certificates
"""
# class constructor
def __init__(self):
self.commonName = "OpenVPN CA"
self.country = "FR"
self.state = "France"
self.city = "Poitiers"
self.organization = "OpenVPN"
self.validity_in_days = 365
self.email = "openvpn@localhost"
self.key_size = 1024
self.certs_dir = '/tmp/certs'
self.crtfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.crt'
self.keyfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.key'
self.csrfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.csr'
self.crlfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '.pem'
self.interkeyfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.key'
self.intercsrfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.csr'
self.intercrtfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '_inter' + '.crt'
self.dhfile = self.certs_dir + '/' + self.commonName.replace(' ','_') + '%s.dh' % self.key_size
self.config_file = self.certs_dir + '/' + 'openssl.cnf'
# Public Methods
def generate_ossl_config(self):
if not os.path.exists(self.certs_dir):
os.mkdir(self.certs_dir)
output = ''
output += newline('HOME = ' + self.certs_dir)
output += newline('RADNFILE = ' + self.certs_dir + '/.rnd')
output += newline('oid_section = new_oids')
output += newline('')
output += newline('[new_oids]')
output += newline('')
output += newline('[ ca ]')
output += newline('default_ca = CA_default')
output += newline('')
output += newline('[ CA_default ]')
output += newline('dir = ' + self.certs_dir)
output += newline('certs = $dir')
output += newline('crl_dir = $dir')
output += newline('database = $dir/index.txt')
output += newline('new_certs_dir = $dir')
output += newline('certificate = %s') % self.crtfile
output += newline('serial = $dir/serial')
output += newline('crl = %s') % self.crlfile
output += newline('private_key = %s') % self.keyfile
output += newline('RANDFILE = $dir/.rand')
output += newline('x509_extensions = usr_cert')
output += newline('default_days = %d' % self.validity_in_days)
output += newline('default_crl_days= 30')
output += newline('default_md = md5')
output += newline('preserve = no')
output += newline('policy = policy_match')
output += newline('')
output += newline('[ policy_match ]')
output += newline('countryName = match')
output += newline('stateOrProvinceName = match')
output += newline('organizationName = match')
output += newline('organizationalUnitName = optional')
output += newline('commonName = supplied')
output += newline('emailAddress = optional')
output += newline('')
output += newline('[ policy_anything ]')
output += newline('countryName = optional')
output += newline('stateOrProvinceName = optional')
output += newline('localityName = optional')
output += newline('organizationName = optional')
output += newline('organizationalUnitName = optional')
output += newline('commonName = supplied')
output += newline('emailAddress = optional')
output += newline('')
output += newline('[ req ]')
output += newline('default_bits = %s' % self.key_size)
output += newline('default_keyfile = privkey.pem')
output += newline('distinguished_name = req_distinguished_name')
output += newline('attributes = req_attributes')
output += newline('x509_extensions = v3_ca ')
output += newline('string_mask = nombstr')
output += newline('')
output += newline('[ req_distinguished_name ]')
output += newline('countryName = Country Name (2 letter code)')
output += newline('countryName_default = ' + self.country)
output += newline('countryName_min = 2')
output += newline('countryName_max = 2')
output += newline('stateOrProvinceName = State or Province Name')
output += newline('stateOrProvinceName_default = ' + self.state)
output += newline('localityName = Locality Name (eg, city)')
output += newline('localityName_default = ' + self.city)
output += newline('0.organizationName = Organization Name')
output += newline('0.organizationName_default = ' + self.organization)
output += newline('organizationalUnitName = Organizational Unit Name')
output += newline('commonName = Common Name ')
output += newline('commonName_default = ' + self.commonName)
output += newline('commonName_max = 64')
output += newline('emailAddress = Email Address')
output += newline('emailAddress_default = ' + self.email)
output += newline('emailAddress_max = 40')
output += newline('')
output += newline('[ req_attributes ]')
output += newline('challengePassword = A challenge password')
output += newline('challengePassword_min = 4')
output += newline('challengePassword_max = 20')
output += newline('unstructuredName = An optional company name')
output += newline('')
output += newline('[ usr_cert ]')
output += newline('basicConstraints=CA:FALSE')
output += newline('nsComment = "OpenSSL Generated Certificate"')
output += newline('subjectKeyIdentifier=hash')
output += newline('authorityKeyIdentifier=keyid,issuer:always')
output += newline('')
output += newline('[ v3_req ]')
output += newline('basicConstraints = CA:FALSE')
output += newline('keyUsage = nonRepudiation, digitalSignature, keyEncipherment')
output += newline('')
output += newline('[ v3_ca ]')
output += newline('subjectKeyIdentifier=hash')
output += newline('authorityKeyIdentifier=keyid:always,issuer:always')
output += newline('basicConstraints = CA:true')
output += newline('')
output += newline('[ crl_ext ]')
output += newline('authorityKeyIdentifier=keyid:always,issuer:always')
f = open(self.config_file ,'w')
print >> f, output
f.close()
def read_ossl_config(self):
if os.path.exists(self.config_file):
f = open(self.config_file, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def create_CA_certs(self):
self.generate_crt_key()
self.generate_inter_csr_key()
self.generate_inter_crt()
self.generate_dh()
self.generate_crl()
def create_user_certs(self,commonName):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
def generate_crt_key(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
cmd = 'openssl req -batch -days %d -nodes -new -x509 -keyout %s -out %s -config %s' % (self.validity_in_days,self.keyfile,self.crtfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating CRT/KEY with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_ca_key(self):
if os.path.exists(self.keyfile):
f = open(self.keyfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_ca(self):
if os.path.exists(self.crtfile):
f = open(self.crtfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_csr(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,self.keyfile,self.csrfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating CSR with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def generate_inter_csr_key(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,self.interkeyfile,self.intercsrfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating inter CSR/KEY with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_inter_csr(self):
if os.path.exists(self.intercsrfile):
f = open(self.intercsrfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_key(self):
if os.path.exists(self.interkeyfile):
f = open(self.interkeyfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_inter_crt(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
if not os.path.exists(self.certs_dir + '/index.txt'):
f = open(self.certs_dir + '/index.txt','w')
f.close()
if not os.path.exists(self.certs_dir + '/serial'):
f = open(self.certs_dir + '/serial','w')
f.write('00\n')
f.close()
cmd = 'openssl ca -batch -extensions v3_ca -days %d -out %s -in %s -config %s' % (self.validity_in_days,self.intercrtfile,self.intercsrfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating inter CRT with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_certificate(self):
if os.path.exists(self.intercrtfile):
f = open(self.intercrtfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_dh(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
cmd = 'openssl dhparam -out %s %s' % (self.dhfile,self.key_size)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating DH with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_dh(self):
if os.path.exists(self.dhfile):
f = open(self.dhfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_crl_verify(self):
if os.path.exists(self.crlfile):
f = open(self.crlfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_crl(self):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
cmd = 'openssl ca -gencrl -out %s -config %s' % (self.crlfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating CRL with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_crl(self):
if os.path.exists(self.crlfile):
f = open(self.crlfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_user_csr_key(self,commonName):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
userkeyfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.key'
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
cmd = 'openssl req -batch -days %d -nodes -new -keyout %s -out %s -config %s' % (self.validity_in_days,userkeyfile,usercsrfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating user CSR/KEY with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_user_key(self,commonName):
userkeyfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.key'
if os.path.exists(userkeyfile):
f = open(userkeyfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_user_csr(self,commonName):
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
if os.path.exists(usercsrfile):
f = open(usercsrfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_user_crt(self,commonName):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
usercrtfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.crt'
usercsrfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.csr'
cmd = 'openssl ca -batch -days %d -out %s -in %s -config %s' % (self.validity_in_days,usercrtfile,usercsrfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating user CRT with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_user_certificate(self,commonName):
usercrtfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.crt'
if os.path.exists(usercrtfile):
f = open(usercrtfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_user_pkcs(self,commonName,password=None,basedir=None):
if basedir is None:
basedir = self.certs_dir
if not os.path.exists(basedir):
os.makedirs(basedir)
usercrt = basedir + '/' + commonName.replace(' ','_') + '.crt'
userp12 = basedir + '/' + commonName.replace(' ','_') + '.p12'
userkey = basedir + '/' + commonName.replace(' ','_') + '.key'
crtfile = basedir + '/' + self.commonName.replace(' ','_') + '.crt'
if password == None:
cmd = 'openssl pkcs12 -export -in %s -inkey %s -certfile %s -out %s -passout pass:%s' % (usercrt,userkey,crtfile,userp12,password)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
else:
cmd = 'openssl pkcs12 -export -in %s -inkey %s -certfile %s -out %s -passout pass:%s' % (usercrt,userkey,crtfile,userp12,password)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
output = "Error while creating user PKCS with return code %d: \n " % proc.returncode
output += stderr_value
return str(output)
else:
return str(stdout_value)
def read_user_pkcs(self,commonName,basedir=None):
if basedir is None:
basedir = self.certs_dir
##userp12 = basedir + '/' + commonName.replace(' ','_') + '.p12'
userp12 = self.certs_dir + '/' + commonName.replace(' ','_') + '.p12'
if os.path.exists(userp12):
f = open(userp12, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def generate_user_crl(self,commonName):
if not os.path.exists(self.certs_dir):
os.makedirs(self.certs_dir)
usercrlfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.pem'
cmd = 'openssl ca -gencrl -out %s -config %s' % (usercrlfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while creating user CRL with return code %d: " % proc.returncode
print str(stderr_value)
return 1
else:
return 0
def read_user_crl(self,commonName):
usercrlfile = self.certs_dir + '/' + commonName.replace(' ','_') + '.pem'
if os.path.exists(usercrlfile):
f = open(usercrlfile, 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def clean_all(self):
if os.path.exists(self.certs_dir):
for file in os.listdir(self.certs_dir):
os.remove(self.certs_dir + "/" + file)
os.rmdir(self.certs_dir)
def read_index(self):
if os.path.exists(self.certs_dir + '/' + 'index.txt'):
f = open(self.certs_dir + '/' + 'index.txt', 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_index_old(self):
if os.path.exists(self.certs_dir + '/' + 'index.txt.old'):
f = open(self.certs_dir + '/' + 'index.txt.old', 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_index_attr(self):
if os.path.exists(self.certs_dir + '/' + 'index.txt.attr'):
f = open(self.certs_dir + '/' + 'index.txt.attr', 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_serial(self):
if os.path.exists(self.certs_dir + '/' + 'serial'):
f = open(self.certs_dir + '/' + 'serial', 'r')
output = f.read()
f.close()
return output
else:
return 'File not found'
def read_user_cert_index(self):
if os.path.exists(self.certs_dir + '/' + 'serial'):
f = open(self.certs_dir + '/' + 'serial', 'r')
output = f.read()
f.close()
# convert value in dec, substract 1
intval = int(output,16) -1
# convert value as 2 digits
output = '%.2d' % intval
return output
else:
return 'File not found'
def revoke_user(self,commonName):
cmd='openssl ca -revoke %s/%s.crt -cert %s -keyfile %s -config %s' % (self.certs_dir,commonName,self.crtfile,self.keyfile,self.config_file)
proc = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout_value, stderr_value = proc.communicate()
if proc.returncode:
print "Error while revoking user %s with return code %d: " % (commonName,proc.returncode)
print str(stderr_value)
return 1
else:
return 0
# Private Methods
class OVPNConfig:
""" Object class used to manage and generate openvpn config """
# constructor/destructor
def __init__(self):
self.server_config_dir = "/etc/openvpn"
self.client_config_dir = self.server_config_dir + "clients"
self.ca_cert_file = "/etc/vpnusers/ca.crt"
self.inter_cert_file = "/etc/vpnusers/inter.crt"
self.inter_key_file = "/etc/vpnusers/inter.key"
self.crl_verify_file = "/etc/vpnusers/crl.pem"
self.dh_key_file = "/etc/vpnusers/dh1024.pem"
self.client_pkcs_file = "pkcs.12"
self.server_mode = "server"
self.client_to_client = True
self.ip_address = "0.0.0.0"
self.port = "0"
self.protocol = "tcp"
self.device = "tun"
self.vpn_network = "0.0.0.0"
self.vpn_mask = "0.0.0.0"
self.dns_list = "0.0.0.0"
self.wins_list = "0.0.0.0"
self.routes_list = "0.0.0.0"
self.suffix_dns = None
self.compress_data = False
self.floating_server = False
self.redirect_gw = True
self.log_file = "/dev/null/log"
self.log_verbosity = "3"
self.status_file = "/dev/null/status"
self.status_version = 2
self.mute = "0"
self.daemon_user = "nobody"
self.daemon_group = "nogroup"
self.dns_name = None
self.keep_alive = 10
self.keep_alive_retry = 120
self.mtu_test = True
self.tls_server = True
self.max_clients = 100
self.persist_key = True
self.persist_tun = True
self.management_address = "0.0.0.0"
self.management_port = "5555"
# public methods
def generate_server_config(self):
config = newline("## OpenVPN Server Config ##")
config += newline("# Dir configuration")
config += newline("cd %s" % self.server_config_dir)
config += newline("client-config-dir %s" % self.client_config_dir)
config += newline("")
config += newline("# Certificates configuration")
config += newline("ca %s" % self.ca_cert_file)
config += newline("cert %s" % self.inter_cert_file)
config += newline("key %s" % self.inter_key_file)
config += newline("crl-verify %s" % self.crl_verify_file)
config += newline("dh %s" % self.dh_key_file)
config += newline("")
config += newline("# Network configuration")
config += newline("local %s" % self.ip_address)
config += newline("port %s" % self.port)
config += newline("proto %s" % self.protocol.lower())
config += newline("dev %s" % self.device.lower())
config += newline("server %s %s" % (self.vpn_network, self.vpn_mask))
config += newline("route %s %s" % (self.vpn_network, self.vpn_mask))
config += newline("")
config += newline("# Client configuration")
if self.redirect_gw:
config += newline("push \"redirect-gateway def1\"")
else:
config += newline("# push redirect-gateway def1")
dns_list = self.dns_list.split(',')
for dns_address in dns_list:
dns_address = dns_address.replace(" ","")
if dns_address:
config += newline("push \"dhcp-option DNS %s\"" % dns_address)
if self.suffix_dns is not None:
config += newline("push \"dhcp-option DOMAIN %s\"" % self.suffix_dns)
wins_list = self.wins_list.split(',')
for wins_address in wins_list:
wins_address = wins_address.replace(" ","")
if wins_address:
config += newline("push \"dhcp-option WINS %s\"" % wins_address)
routes_list = self.routes_list.split(',')
for route in routes_list:
route = route.replace(" ","")
if route:
config += newline("push \"route %s\"" % route)
config += newline("")
config += newline("# General configuration")
if self.compress_data:
config += newline("comp-lzo")
else:
config += newline("# comp-lzo")
if self.floating_server:
config += newline("float")
else:
config += newline("# float")
config += newline("log-append %s" % self.log_file)
config += newline("status %s" % self.status_file)
config += newline("status-version %s" % self.status_version)
config += newline("verb %s" % self.log_verbosity)
config += newline("mute %s" % self.mute)
config += newline("mode %s" % self.server_mode)
if self.client_to_client:
config += newline("client-to-client")
else:
config += newline("# client-to-client")
config += newline("keepalive %s %s" % (self.keep_alive, self.keep_alive_retry))
if self.mtu_test and self.protocol.lower == 'udp':
config += newline("mtu-test")
else:
config += newline("# mtu-test")
if self.tls_server:
config += newline("tls-server")
else:
config += newline("# tls-server")
config += newline("max-clients %s" % self.max_clients)
if self.persist_key:
config += newline("persist-key")
else:
config += newline("# persist-key")
if self.persist_tun:
config += newline("persist-tun")
else:
config += newline("# persist-tun")
config += newline("management %s %s" % (self.management_address, self.management_port))
return config
def generate_client_config(self):
config = newline("## OpenVPN Client configuration ##")
config += newline("remote %s" % self.ip_address)
if self.dns_name is not None:
config += newline("remote %s" % self.dns_name)
config += newline("port %s" % self.port)
config += newline("proto %s" % self.protocol)
config += newline("dev %s" % self.device)
config += newline("pull")
config += newline("ping %s" % self.keep_alive)
if self.compress_data:
config += newline("comp-lzo")
else:
config += newline("# comp-lzo")
config += newline("verb %s" % self.log_verbosity)
config += newline("mute %s" % self.mute)
if self.tls_server:
config += newline("tls-client")
else:
config += newline("# tls-client")
config += newline("pkcs12 %s" % self.client_pkcs_file)
return config
# private methods
# Other Funtions
def newline(line):
return line + '\n'
## TESTS OVPNCA
#newca = OVPNCA()
#newca.generate_ossl_config()
#newca.create_CA_certs()
#print "private_key :"
#print newca.read_private_key()
#print "certificate :"
#print newca.read_certificate()
#newca.generate_user_csr_key('toto')
#print "user key :"
#print newca.read_user_key('toto')
#print "user csr :"
#print newca.read_user_csr('toto')
#newca.generate_user_crt('toto')
#print "user certificate :"
#print newca.read_user_certificate('toto')
#newca.generate_user_pkcs('toto')
#newca.generate_user_crl('toto')
#print "user crl :"
#print newca.read_user_crl('toto')
#newca.clean_all()
## TESTS OVPNConfig
#newconfig = OVPNConfig()
#print newconfig.generate_server_config()
#print newconfig.generate_client_config()