You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1032 lines
42 KiB
1032 lines
42 KiB
# -*- coding:utf-8 -*- |
|
# |
|
# vim:syntax=python:sw=4:ts=4:expandtab |
|
|
|
# |
|
# Copyright (C) Adelux - 2009 |
|
# |
|
|
|
from django.db import models |
|
from ovpnmanager.ovpnconsole import * |
|
from django.forms import ModelForm |
|
from django import forms |
|
from django.utils.encoding import force_unicode, smart_str, smart_unicode |
|
from django.conf import settings |
|
from django.contrib.admin import widgets |
|
from django.core.mail import send_mail |
|
from django.contrib.auth.models import * |
|
from django.utils.translation import ugettext as _ |
|
|
|
from ovpntools.ovpntools import * |
|
from synctools.synctools import * |
|
|
|
import os |
|
import sys |
|
import datetime |
|
import tempfile |
|
import re |
|
import shutil |
|
|
|
class OVPNSettings(models.Model): |
|
work_dir = models.CharField(max_length=255, default="/tmp/ovpnmanager") |
|
smtp_server = models.CharField(max_length=255, default="smtp.mydomain.com") |
|
|
|
class OVPNSite(models.Model): |
|
name = models.CharField(max_length=30,unique=True, verbose_name=_("name")) |
|
locality = models.CharField(max_length=30, verbose_name=_("locality")) |
|
|
|
# Private methods |
|
def __unicode__(self): |
|
return self.name |
|
|
|
def _name_in_utf8(self): |
|
""" Public method used to convert the name into utf8 string """ |
|
return smart_str(self.name) |
|
|
|
# Public methods |
|
def save(self): |
|
""" Overload of public method used to save object in database """ |
|
super(OVPNSite, self).save() |
|
|
|
def delete(self): |
|
# Delete each authority from database |
|
for ovpnauthority in self.ovpnauthority_set.all(): |
|
ovpnauthority.delete() |
|
# Delete each server configuration files and dir |
|
for ovpnserver in ovpnauthority.ovpnserver_set.all(): |
|
ovpnserver.delete() |
|
# Remove tmp directory if exist |
|
if os.path.isdir(settings.OUTPUT_DIR + "/" + self.name): |
|
try: |
|
os.rmdir(settings.OUTPUT_DIR + "/" + self.ovpnsite.name) |
|
except: |
|
pass |
|
super(OVPNSite, self).delete() |
|
|
|
class OVPNSiteForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNSite |
|
|
|
class OVPNAuthority(models.Model): |
|
|
|
RSA_SIZE_LIST = ( |
|
('1024','1024'), |
|
) |
|
|
|
name = models.CharField(max_length=30,verbose_name=_("Name")) |
|
country = models.CharField(max_length=2, verbose_name=_("Country Code")) |
|
state = models.CharField(max_length=30,verbose_name=_("State")) |
|
city = models.CharField(max_length=30, verbose_name=_("City")) |
|
organization = models.CharField(max_length=40, verbose_name=_("Organization")) |
|
expiration_date = models.DateField(editable=False,blank=True) |
|
creation_date = models.DateField(editable=False,auto_now=True) |
|
valid_for = models.SmallIntegerField(verbose_name=_("Validity Period"), default=3650) |
|
email = models.CharField(max_length=50, verbose_name=_("email")) |
|
rsa_size = models.IntegerField(choices=RSA_SIZE_LIST, verbose_name=_("RSA Key Size"), default="1024") |
|
is_valid = models.BooleanField(default=True, editable=False, blank=True) |
|
ovpnsite = models.ForeignKey(OVPNSite, verbose_name=_("Site")) |
|
|
|
# Certificates properties |
|
work_dir = models.CharField(max_length=255,editable=False, blank=True, verbose_name=_("Workdir")) |
|
key_file = models.FilePathField(editable=False, blank=True) |
|
crt_file = models.FilePathField(editable=False, blank=True) |
|
crl_file = models.FilePathField(editable=False, blank=True) |
|
dh_file = models.FilePathField(editable=False, blank=True) |
|
interkey_file = models.FilePathField(editable=False, blank=True) |
|
intercsr_file = models.FilePathField(editable=False, blank=True) |
|
intercrt_file = models.FilePathField(editable=False, blank=True) |
|
ossl_config_file = models.FilePathField(editable=False, blank=True) |
|
ca_index_file = models.FilePathField(editable=False, blank=True) |
|
ca_index_old_file = models.FilePathField(editable=False, blank=True) |
|
ca_index_attr_file = models.FilePathField(editable=False, blank=True) |
|
ca_serial_file = models.FilePathField(editable=False, blank=True) |
|
ossl_config_content = models.TextField(editable=False, blank=True) |
|
ca_content = models.TextField(editable=False, blank=True) |
|
certificate_content = models.TextField(editable=False, blank=True) |
|
inter_csr_content = models.TextField(editable=False, blank=True) |
|
server_key_content = models.TextField(editable=False, blank=True) |
|
ca_key_content = models.TextField(editable=False, blank=True) |
|
crl_content = models.TextField(editable=False, blank=True) |
|
dh_content = models.TextField(editable=False, blank=True) |
|
ca_index = models.TextField(editable=False, blank=True) |
|
ca_index_attr = models.TextField(editable=False, blank=True) |
|
ca_index_old = models.TextField(editable=False, blank=True) |
|
ca_serial = models.TextField(editable=False, blank=True) |
|
ca_index_num = models.CharField(max_length=4,editable=False, blank=True) |
|
|
|
## Methods |
|
|
|
def __unicode__(self): |
|
return self.name |
|
|
|
# private methods |
|
def create_authority_dir(self): |
|
""" Private method used to create authority (sub)directory """ |
|
if not os.path.isdir(settings.OUTPUT_DIR): |
|
os.mkdir(settings.OUTPUT_DIR) |
|
if not os.path.isdir(settings.OUTPUT_DIR + "/" + self.ovpnsite.name): |
|
os.mkdir(settings.OUTPUT_DIR + "/" + self.ovpnsite.name) |
|
if not os.path.isdir(self.work_dir): |
|
os.mkdir(self.work_dir) |
|
|
|
# public methods |
|
def create_ovpntools_object(self): |
|
# create ovpntools object |
|
ovpnca = OVPNCA() |
|
ovpnca.commonName = self.name |
|
ovpnca.country = self.country |
|
ovpnca.city = self.city |
|
ovpnca.organization = self.organization |
|
ovpnca.validity_in_days = self.valid_for |
|
ovpnca.email = self.email |
|
ovpnca.keysize = self.rsa_size |
|
ovpnca.certs_dir = self.work_dir |
|
ovpnca.crtfile = self.crt_file |
|
ovpnca.crlfile = self.crl_file |
|
ovpnca.keyfile = self.key_file |
|
ovpnca.interkeyfile = self.interkey_file |
|
ovpnca.intercsrfile = self.intercsr_file |
|
ovpnca.intercrtfile = self.intercrt_file |
|
ovpnca.dhfile = self.dh_file |
|
ovpnca.config_file = self.ossl_config_file |
|
return ovpnca |
|
|
|
def generate_authority(self): |
|
""" private method used to create authority certificate""" |
|
self.ovpnca = self.create_ovpntools_object() |
|
# generate openssl config |
|
self.ovpnca.generate_ossl_config() |
|
self.ossl_config_content = self.ovpnca.read_ossl_config() |
|
|
|
# generates and reads all CA certs, remove temporary files |
|
self.ca_index_num = '00' |
|
self.ovpnca.create_CA_certs() |
|
self.ca_content = self.ovpnca.read_ca().rstrip('\n') |
|
self.certificate_content = self.ovpnca.read_certificate().rstrip('\n') |
|
self.server_key_content = self.ovpnca.read_key().rstrip('\n') |
|
self.inter_csr_content = self.ovpnca.read_inter_csr().rstrip('\n') |
|
self.ca_key_content = self.ovpnca.read_ca_key().rstrip('\n') |
|
self.dh_content = self.ovpnca.read_dh().rstrip('\n') |
|
self.crl_content = self.ovpnca.read_crl_verify().rstrip('\n') |
|
self.ca_index = self.ovpnca.read_index().rstrip('\n') |
|
self.ca_index_attr = self.ovpnca.read_index_attr().rstrip('\n') |
|
self.ca_index_old = self.ovpnca.read_index_old().rstrip('\n') |
|
self.ca_serial = self.ovpnca.read_serial().rstrip('\n') |
|
# clean temporary files |
|
self.ovpnca.clean_all() |
|
self.clean_work_dir() |
|
|
|
def write_authority_files(self): |
|
# generate temporary authority files |
|
self.create_authority_dir() |
|
f = open(self.ossl_config_file,'w') |
|
print >> f, self.ossl_config_content |
|
f.close() |
|
f = open(self.crt_file,'w') |
|
print >> f, self.ca_content |
|
f.close() |
|
f = open(self.crl_file,'w') |
|
print >> f, self.crl_content |
|
f.close() |
|
f = open(self.intercrt_file,'w') |
|
print >> f, self.certificate_content |
|
f.close() |
|
## converts ca_index in int |
|
ca_index_num = int(self.ca_index_num) |
|
f = open(self.work_dir + '/' + '%.2d.pem' % ca_index_num,'w') |
|
print >> f, self.certificate_content |
|
f.close() |
|
f = open(self.intercsr_file,'w') |
|
print >> f, self.inter_csr_content |
|
f.close() |
|
f = open(self.key_file,'w') |
|
print >> f, self.ca_key_content |
|
f.close() |
|
f = open(self.interkey_file,'w') |
|
print >> f, self.server_key_content |
|
f.close() |
|
f = open(self.dh_file,'w') |
|
print >> f, self.dh_content |
|
f.close() |
|
f = open(self.ca_index_file,'w') |
|
print >> f, self.ca_index |
|
f.close() |
|
f = open(self.ca_index_old_file,'w') |
|
print >> f, self.ca_index_old |
|
f.close() |
|
f = open(self.ca_index_attr_file,'w') |
|
print >> f, self.ca_index_attr |
|
f.close() |
|
f = open(self.ca_serial_file,'w') |
|
print >> f, self.ca_serial |
|
f.close() |
|
|
|
def save(self,force_insert=False,force_update=False): |
|
""" Overload of public method used to save object in database """ |
|
|
|
if force_update is False: |
|
# Calculates the expiration_date |
|
self.expiration_date = datetime.datetime.today() + datetime.timedelta(days=self.valid_for) |
|
|
|
# set country in upper-case |
|
self.country = self.country.upper() |
|
|
|
# sets non editable properties |
|
self.isvalid = self.set_validity() |
|
self.work_dir = settings.OUTPUT_DIR + "/" + self.ovpnsite.name + "/" + self.name |
|
self.ossl_config_file = self.work_dir + '/openssl.cnf' |
|
self.key_file = self.work_dir + '/' + self.name + '.key' |
|
self.crt_file = self.work_dir + '/' + self.name + '.crt' |
|
self.crl_file = self.work_dir + '/' + self.name + '.pem' |
|
self.dh_file = self.work_dir + '/' + self.name + '_' + '%d.dh' % self.rsa_size |
|
self.interkey_file = self.work_dir + '/' + self.name + '_inter.key' |
|
self.intercsr_file = self.work_dir + '/' + self.name + '_inter.csr' |
|
self.intercrt_file = self.work_dir + '/' + self.name + '_inter.crt' |
|
self.ossl_config_file = self.work_dir + '/' + self.name + '_ssl.cnf' |
|
self.ca_index_file = self.work_dir + '/' + 'index.txt' |
|
self.ca_index_old_file = self.work_dir + '/' + 'index.txt.old' |
|
self.ca_index_attr_file = self.work_dir + '/' + 'index.txt.attr' |
|
self.ca_serial_file = self.work_dir + '/' + 'serial' |
|
|
|
# creates directory for this authority |
|
self.create_authority_dir() |
|
# generate certificate of authority |
|
self.generate_authority() |
|
|
|
else: |
|
force_insert = False |
|
|
|
super(OVPNAuthority, self).save(force_insert,force_update) |
|
|
|
def delete(self): |
|
# Remove work dir if exist |
|
if os.path.isdir(self.work_dir): |
|
try: |
|
shutil.rmtree(self.work_dir) |
|
except: |
|
pass |
|
super(OVPNAuthority, self).delete() |
|
|
|
def set_validity(self,isvalid=True): |
|
self.is_valid = isvalid |
|
|
|
def clean_work_dir(self): |
|
if os.path.isdir(self.work_dir): |
|
try: |
|
shutil.rmtree(self.work_dir) |
|
self.create_authority_dir() |
|
except: |
|
pass |
|
|
|
class Meta: |
|
unique_together = ("ovpnsite", "name") |
|
|
|
class OVPNAuthorityForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNAuthority |
|
|
|
class OVPNServer(models.Model): |
|
PROTO_LIST = ( |
|
('tcp', 'tcp'), |
|
('udp','udp') |
|
) |
|
|
|
DEVICE_LIST = ( |
|
('tun', 'tun'), |
|
('tap', 'tap') |
|
) |
|
|
|
SERVER_MODE_LIST = ( |
|
('p2p', 'p2p'), |
|
('srv', 'server') |
|
) |
|
|
|
STATUS_VERSION_LIST = ( |
|
('1', '1'), |
|
('2', '2') |
|
) |
|
|
|
# Server configuration |
|
ovpnauthority = models.ForeignKey(OVPNAuthority, verbose_name=_("OVPNAuthority")) |
|
name = models.CharField(max_length=30, verbose_name=_("Server Name")) |
|
openvpn_base_dir = models.CharField(max_length=255, verbose_name=_("OpenVPN Base Dir")) |
|
server_config_dir = models.CharField(max_length=255, editable=False, blank=True) |
|
client_config_dir = models.CharField(max_length=255, editable=False, blank=True) |
|
is_configured = models.BooleanField(default=False, editable=False) |
|
is_up_to_date = models.BooleanField(default=False, editable=False) |
|
# OpenVPN Configuration |
|
certs_dir = models.CharField(max_length=255, editable=False, blank=True) |
|
server_config_content = models.TextField(editable=False, blank=True) |
|
local_ip_address = models.IPAddressField(verbose_name=_("Local IP Address")) |
|
public_ip_address = models.IPAddressField(verbose_name=_("Public IP Address")) |
|
port = models.SmallIntegerField(default=1194, verbose_name=_("Port")) |
|
protocol = models.CharField(max_length=3, choices=PROTO_LIST, default="TCP",verbose_name=_("Protocol")) |
|
device = models.CharField(max_length=3, choices=DEVICE_LIST, default="TUN", verbose_name=_("Device")) |
|
server_mode = models.CharField(max_length=3, choices=SERVER_MODE_LIST, default="srv", verbose_name=_("Server Mode")) |
|
client_to_client = models.BooleanField(default=True, verbose_name=_("Client-to-CLient")) |
|
vpn_network = models.IPAddressField() |
|
vpn_mask = models.IPAddressField() |
|
dns_list = models.TextField(blank="True", verbose_name=_("DNS List")) |
|
suffix_dns = models.CharField(max_length=255, blank="True", verbose_name=_("DNS Suffix")) |
|
wins_list = models.TextField(blank="True", verbose_name=_("WINS Server")) |
|
routes_list = models.TextField(blank="True", verbose_name=_("Routes List")) |
|
compress_data = models.BooleanField(default="True", verbose_name=_("CompressData")) |
|
floating_server = models.BooleanField(default="True", verbose_name=_("FloatingServer")) |
|
redirect_gw = models.BooleanField(default="True", verbose_name=_("RedirectGateway")) |
|
log_file = models.CharField(max_length=100, verbose_name=("LogFile")) |
|
log_verbosity = models.SmallIntegerField(default=3, verbose_name=_("Log Level")) |
|
status_file = models.CharField(max_length=100, verbose_name=_("Status file")) |
|
status_version = models.CharField(max_length="1", choices=STATUS_VERSION_LIST, default="2", verbose_name=_("Status Version")) |
|
mute = models.SmallIntegerField(default=5, verbose_name=_("Mute")) |
|
daemon_user = models.CharField(max_length=30, default="nobody", verbose_name=_("Daemon User")) |
|
daemon_group = models.CharField(max_length=30, default="nogroup", verbose_name=_("Daemon Group")) |
|
dns_name = models.CharField(max_length=50, blank=True, verbose_name=_("DNS Name")) |
|
keep_alive = models.SmallIntegerField(default=10, verbose_name=_("Keep Alive")) |
|
keep_alive_retry = models.SmallIntegerField(default=120, verbose_name=_("Keep Alive retry")) |
|
mtu_test = models.BooleanField(default="True", verbose_name=_("Mtu Test")) |
|
tls_server = models.BooleanField(default="True", verbose_name=_("TLS Server")) |
|
max_clients = models.SmallIntegerField(default=100, verbose_name=_("Max Clients")) |
|
persist_key = models.BooleanField(default="True", verbose_name=_("PersistKey")) |
|
persist_tun = models.BooleanField(default="True", verbose_name=_("Persist Tun")) |
|
management_address = models.IPAddressField(default="0.0.0.0", verbose_name=_("Management Addr")) |
|
management_port = models.SmallIntegerField(default="5555", verbose_name=_("Management Port")) |
|
|
|
def __unicode__(self): |
|
return self.name |
|
|
|
# private methods |
|
|
|
# public methods |
|
def save(self,force_insert=False,force_update=False): |
|
""" Overload of public method used to save object in database """ |
|
|
|
# Generates config dir name |
|
self.server_config_dir = self.openvpn_base_dir + '/' + self.name |
|
|
|
## Creates OVPNConfig Object |
|
ovpnserver_config = OVPNConfig() |
|
ovpnserver_config.server_config_dir = self.server_config_dir |
|
ovpnserver_config.client_config_dir = self.server_config_dir + '/' + 'ccd' |
|
ovpnserver_config.ca_cert_file = self.certs_dir + '/' + self.ovpnauthority.name + '.crt' |
|
ovpnserver_config.inter_cert_file = self.certs_dir + '/' + self.ovpnauthority.name + '_inter.crt' |
|
ovpnserver_config.inter_key_file = self.certs_dir + '/' + self.ovpnauthority.name + '_inter.key' |
|
ovpnserver_config.crl_verify_file = self.certs_dir + '/' + self.ovpnauthority.name + '.pem' |
|
ovpnserver_config.dh_key_file = self.certs_dir + '/' + self.ovpnauthority.name + '_' + '%d.dh' % self.ovpnauthority.rsa_size |
|
ovpnserver_config.server_mode = self.get_server_mode_display() |
|
ovpnserver_config.client_to_client = self.client_to_client |
|
ovpnserver_config.ip_address = self.local_ip_address |
|
ovpnserver_config.port = self.port |
|
ovpnserver_config.protocol = self.protocol |
|
ovpnserver_config.device = self.device |
|
ovpnserver_config.vpn_network = self.vpn_network |
|
ovpnserver_config.vpn_mask = self.vpn_mask |
|
ovpnserver_config.dns_list = self.dns_list |
|
if len(self.suffix_dns) != 0: |
|
ovpnserver_config.suffix_dns = self.suffix_dns |
|
ovpnserver_config.wins_list = self.wins_list |
|
ovpnserver_config.routes_list = self.routes_list |
|
ovpnserver_config.compress_data = self.compress_data |
|
ovpnserver_config.floating_server = self.floating_server |
|
ovpnserver_config.redirect_gw = self.redirect_gw |
|
ovpnserver_config.log_file = self.log_file |
|
ovpnserver_config.log_verbosity = self.log_verbosity |
|
ovpnserver_config.status_file = self.status_file |
|
ovpnserver_config.status_version = self.status_version |
|
ovpnserver_config.mute = self.mute |
|
ovpnserver_config.daemon_user = self.daemon_user |
|
ovpnserver_config.daemon_group = self.daemon_group |
|
ovpnserver_config.dns_name = self.dns_name |
|
ovpnserver_config.keep_alive = self.keep_alive |
|
ovpnserver_config.keep_alive_retry = self.keep_alive_retry |
|
ovpnserver_config.mtu_test = self.mtu_test |
|
ovpnserver_config.tls_server = self.tls_server |
|
ovpnserver_config.max_clients = self.max_clients |
|
ovpnserver_config.persist_key = self.persist_key |
|
ovpnserver_config.persist_tun = self.persist_tun |
|
ovpnserver_config.management_address = self.management_address |
|
ovpnserver_config.management_port = self.management_port |
|
## generates server configuration |
|
self.server_config_content = ovpnserver_config.generate_server_config() |
|
|
|
## Sets non-editable values |
|
self.client_config_dir = 'ccd' |
|
self.certs_dir = 'certs' |
|
|
|
self.is_up_to_date = False |
|
super(OVPNServer, self).save(force_insert,force_update) |
|
|
|
# generate all users' config |
|
for current_user in self.ovpnuser_set.all(): |
|
current_user.generate_user_config() |
|
super(OVPNUser, current_user).save(force_update=False, force_insert=False) |
|
|
|
def write_ovpnserver_files(self,basedir): |
|
## server configuration |
|
f = open(basedir + '/' + self.name + '.conf', 'w') |
|
print >> f, self.server_config_content |
|
f.close() |
|
## server certificates |
|
# openssl config |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '_ssl.cnf', 'w') |
|
print >> f, self.ovpnauthority.ossl_config_content |
|
f.close() |
|
# crt |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '.crt', 'w') |
|
print >> f, self.ovpnauthority.ca_content |
|
f.close() |
|
# crl |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '.crl', 'w') |
|
print >> f, self.ovpnauthority.crl_content |
|
f.close() |
|
# inter_crt |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '_inter.crt', 'w') |
|
print >> f, self.ovpnauthority.certificate_content |
|
f.close() |
|
# pem |
|
ca_index_num = int(self.ovpnauthority.ca_index_num) |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.ca_index_num + '.pem', 'w') |
|
print >> f, self.ovpnauthority.certificate_content |
|
f.close() |
|
# inter key |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '_inter.key', 'w') |
|
print >> f, self.ovpnauthority.server_key_content |
|
f.close() |
|
# authority pem |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '.pem', 'w') |
|
print >> f, self.ovpnauthority.crl_content |
|
f.close() |
|
# rsa |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + self.ovpnauthority.name + '_' + '%d.dh' % self.ovpnauthority.rsa_size, 'w') |
|
print >> f, self.ovpnauthority.dh_content |
|
f.close() |
|
# index |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + 'index.txt' , 'w') |
|
print >> f, self.ovpnauthority.ca_index |
|
f.close() |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + 'index.old' , 'w') |
|
print >> f, self.ovpnauthority.ca_index_old |
|
f.close() |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + 'index.attr' , 'w') |
|
print >> f, self.ovpnauthority.ca_index_attr |
|
f.close() |
|
|
|
## create config and certificates for each user |
|
for current_user in self.ovpnuser_set.all(): |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + current_user.name + '.key', 'w') |
|
print >> f, current_user.user_key_content |
|
f.close() |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + current_user.name + '.csr', 'w') |
|
print >> f, current_user.user_csr_content |
|
f.close() |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + current_user.name + '.crt', 'w') |
|
print >> f, current_user.user_crt_content |
|
f.close() |
|
f = open(basedir + '/' + self.name + '/' + self.certs_dir + '/' + current_user.name + '.crl', 'w') |
|
print >> f, current_user.user_crl_content |
|
f.close() |
|
# generate user pkcs |
|
ovpntools = self.ovpnauthority.create_ovpntools_object() |
|
ovpntools.commonName = current_user.name |
|
ovpntools.email = current_user.email |
|
#ovpntools.certs_dir = basedir + '/' + self.name + '/' + self.certs_dir |
|
ovpntools.generate_user_pkcs(current_user.name,current_user.password,basedir + '/' + self.name + '/' + self.certs_dir) |
|
|
|
if current_user.extra_config: |
|
f = open(basedir + '/' + self.name + '/' + self.client_config_dir + '/' + current_user.name, 'w') |
|
print >> f, current_user.extra_config |
|
f.close() |
|
|
|
def submit_server_config(self): |
|
''' submit all config files and certs ''' |
|
ovpnserversettings = self.ovpnserversettings |
|
|
|
if ovpnserversettings.ovpnserver_type == "loc": |
|
## empty dir, or create if don't exist |
|
self.empty_dirs(self.openvpn_base_dir) |
|
self.create_dirs(self.openvpn_base_dir) |
|
## create server configuration file |
|
self.write_ovpnserver_files(self.openvpn_base_dir) |
|
## creates log and status path if not exist |
|
|
|
## Remove temporary configuration |
|
self.ovpnauthority.clean_work_dir() |
|
output = 'Fichiers copies dans ' + self.openvpn_base_dir |
|
|
|
elif ovpnserversettings.ovpnserver_type == "ssh": |
|
## empty dir, or create if don't exist |
|
self.empty_dirs(self.ovpnauthority.work_dir + '/' + self.name) |
|
self.create_dirs(self.ovpnauthority.work_dir + '/' + self.name) |
|
## Copy all configuration to temporary dir |
|
self.write_ovpnserver_files(self.ovpnauthority.work_dir + '/' + self.name) |
|
|
|
## Try to connect and copy file to server |
|
# create synctools object |
|
syncserver = OVPNServerSync(ovpnserversettings.connection_address, ovpnserversettings.connection_login, ovpnserversettings.connection_password) |
|
syncserver.source_dir = self.ovpnauthority.work_dir + '/' + self.name |
|
syncserver.dst_dir = self.openvpn_base_dir |
|
syncserver.copy_method = 'ssh' |
|
|
|
# sync temporary directory content with distant server |
|
output = syncserver.sync() |
|
|
|
## Remove temporary configuration |
|
self.ovpnauthority.clean_work_dir() |
|
|
|
self.is_up_to_date = True |
|
super(OVPNServer, self).save(False, False) |
|
return 'OK',output |
|
|
|
def create_dirs(self, basedir): |
|
if not os.path.isdir(basedir + '/' + self.name): |
|
os.makedirs(basedir + '/' + self.name) |
|
if not os.path.isdir(basedir + '/' + self.name + '/' + self.client_config_dir): |
|
os.mkdir(basedir + '/' + self.name + '/' + self.client_config_dir) |
|
if not os.path.isdir(basedir + '/' + self.name + '/' + self.certs_dir): |
|
os.mkdir(basedir + '/' + self.name + '/' + self.certs_dir) |
|
|
|
def empty_dirs(self, basedir): |
|
# Clean certs dir |
|
if os.path.exists(basedir + '/' + self.name + '/' + self.certs_dir): |
|
for file in os.listdir(basedir + '/' + self.name + '/' + self.certs_dir): |
|
os.remove(basedir + '/' + self.name + '/' + self.certs_dir + '/' + file) |
|
os.rmdir(basedir + '/' + self.name + '/' + self.certs_dir) |
|
# Clean clients config dir |
|
if os.path.exists(basedir + '/' + self.name + self.client_config_dir): |
|
for file in os.listdir(basedir + '/' + self.name + '/' + self.client_config_dir): |
|
os.remove(basedir + '/' + self.name + '/' + self.client_config_dir + '/' + file) |
|
os.rmdir(basedir + '/' + self.name + '/' + self.client_config_dir) |
|
# Clean server_configuration dir |
|
if os.path.exists(basedir + '/' + self.name + '.conf'): |
|
os.remove(basedir + '/' + self.name + '.conf') |
|
|
|
def delete(self): |
|
# Empty dirs |
|
self.empty_dirs(self.openvpn_base_dir) |
|
# Remove each user from database |
|
for ovpnuser in self.ovpnuser_set.all(): |
|
ovpnuser.delete() |
|
super(OVPNServer, self).delete() |
|
|
|
def get_server_status(self): |
|
#ovpnserversettings = self.ovpnserversettings_set.all()[0] |
|
ovpnserversettings = self.ovpnserversettings |
|
ovpnserver_admin = OVPNServerAdmin() |
|
ovpnserver_admin.server_type = ovpnserversettings.ovpnserver_type |
|
ovpnserver_admin.init_script = ovpnserversettings.daemon_init_script |
|
ovpnserver_admin.hostname = ovpnserversettings.connection_address |
|
ovpnserver_admin.login = ovpnserversettings.connection_login |
|
ovpnserver_admin.password = ovpnserversettings.connection_password |
|
|
|
def get_connected_users(self): |
|
#ovpnserversettings = self.ovpnserversettings_set.all()[0] |
|
ovpnserversettings = self.ovpnserversettings |
|
ovpnserver_admin = OVPNServerAdmin() |
|
if ovpnserversettings.ovpnserver_type == 'ssh': |
|
ovpnserver_admin.server_type = 'ssh' |
|
ovpnserver_admin.init_script = ovpnserversettings.daemon_init_script |
|
ovpnserver_admin.hostname = ovpnserversettings.connection_address |
|
ovpnserver_admin.login = ovpnserversettings.connection_login |
|
ovpnserver_admin.password = ovpnserversettings.connection_password |
|
(ret, connected_users, error) = ovpnserver_admin.get_connected_users(self.status_file) |
|
return connected_users |
|
|
|
def get_connected_users_num(self): |
|
return len(self.get_connected_users()) |
|
|
|
|
|
def restart_server(self): |
|
#ovpnserversettings = self.ovpnserversettings_set.all()[0] |
|
ovpnserversettings = self.ovpnserversettings |
|
ovpnserver_admin = OVPNServerAdmin() |
|
if ovpnserversettings.ovpnserver_type == 'ssh': |
|
ovpnserver_admin.server_type = 'ssh' |
|
ovpnserver_admin.init_script = ovpnserversettings.daemon_init_script |
|
ovpnserver_admin.hostname = ovpnserversettings.connection_address |
|
ovpnserver_admin.login = ovpnserversettings.connection_login |
|
ovpnserver_admin.password = ovpnserversettings.connection_password |
|
ovpnserver_admin.cmd_prefix = ovpnserversettings.prefix_command |
|
(ret, stdout, stderr) = ovpnserver_admin.restart_server(self.name) |
|
output = [] |
|
output.append(ret) |
|
output.append(stdout) |
|
output.append(stderr) |
|
return output |
|
|
|
def ovpnusers_sorted_by_name(self): |
|
return self.ovpnuser_set.all().order_by('name') |
|
|
|
def get_last_five_connhist(self): |
|
return self.ovpnserverconnhist_set.all().order_by('-conn_date', '-date')[:5] |
|
|
|
def get_connhist(self): |
|
return self.ovpnserverconnhist_set.all().order_by('-date') |
|
|
|
|
|
class OVPNServerForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNServer |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(OVPNServerForm, self).__init__(*args, **kwargs) |
|
self.fields['dns_list'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2'}) |
|
self.fields['wins_list'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2'}) |
|
self.fields['routes_list'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2'}) |
|
|
|
def clean_dns_list(self): |
|
if self.cleaned_data: |
|
data = self.cleaned_data.get('dns_list') |
|
if data: |
|
dns_list = data.split(',') |
|
for dns_address in dns_list: |
|
dns_address = dns_address.replace(" ","") |
|
if not re.search("^\d+\.\d+\.\d+\.\d+$", dns_address): |
|
raise forms.ValidationError("Value %s is not a valid address definition" % dns_address) |
|
return data |
|
|
|
def clean_wins_list(self): |
|
if self.cleaned_data: |
|
data = self.cleaned_data.get('wins_list') |
|
if data: |
|
wins_list = data.split(',') |
|
for wins_address in wins_list: |
|
wins_address = wins_address.replace(" ","") |
|
if not re.search("^\d+\.\d+\.\d+\.\d+$", wins_address): |
|
raise forms.ValidationError("Value %s is not a valid address definition" % wins_address) |
|
return data |
|
|
|
def clean_routes_list(self): |
|
if self.cleaned_data: |
|
data = self.cleaned_data.get('routes_list') |
|
if data: |
|
route_list = data.split(',') |
|
for route in route_list: |
|
route = route.replace(" ","") |
|
if not re.search("^\d+\.\d+\.\d+\.\d+\/\d+\.\d+\.\d+\.\d+$", route): |
|
raise forms.ValidationError("Value %s is not a valid network definition" % route) |
|
return data |
|
|
|
class OVPNServerConnHist(models.Model): |
|
''' OVPNServer Connections history class ''' |
|
|
|
STATE_LIST = ( |
|
('c', 'connected'), |
|
('d', 'disconnected'), |
|
) |
|
|
|
date = models.DateTimeField(auto_now=True, verbose_name=_("Date")) |
|
conn_date = models.DateTimeField(auto_now=False, verbose_name=_("Connection")) |
|
disc_date = models.DateTimeField(auto_now=False,blank=True, verbose_name=_("Disonnection")) |
|
recv_from = models.CharField(max_length=128, verbose_name=_("Received from")) |
|
user_name = models.CharField(max_length=255, verbose_name=_("User Name")) |
|
real_ip = models.IPAddressField(verbose_name=_("Real IP")) |
|
virt_ip = models.IPAddressField(verbose_name=_("VPN IP")) |
|
brecv = models.CharField(max_length=255, verbose_name=_("Brecv")) |
|
bsent = models.CharField(max_length=255, verbose_name=_("Bsent")) |
|
connected_since = models.DateTimeField(verbose_name=_("Connected Since")) |
|
state = models.CharField(max_length=50,choices=STATE_LIST, verbose_name=_("State")) |
|
ovpnserver = models.ForeignKey(OVPNServer, verbose_name=_("OVPNServer")) |
|
|
|
|
|
class OVPNServerSettings(models.Model): |
|
''' OvpnServer Settings, used to configure server type, connections settings... ''' |
|
|
|
SERVER_TYPE_LIST = ( |
|
('loc', 'local'), |
|
('ssh', 'Distant (ssh)') |
|
) |
|
|
|
default_email_message = """ |
|
Veuillez trouver ci-joint les fichiers de configuration de votre connexion VPN. |
|
Ces fichiers sont à copier dans le répertoire de configuration du client. |
|
* Client Windows : |
|
Le client OpenVPN est à télécharger ici : |
|
http://www.openvpn.net/release/openvpn-2.1_rc18-install.exe |
|
Copier les fichiers ci-joints dans le répertoire c:\program files\openvpn\config\ |
|
* Client Linux : |
|
Installer le client OpenVpn de votre distribution : |
|
par exemple : apt-get install openvpn |
|
copirer les fichiers ci-joint dans le répertoire /etc/openvpn/, en prenant soin de renommer le fichier .ovpn en .conf |
|
|
|
Pour plus d'informations : http://openvpn.se/install.txt |
|
""" |
|
ovpnserver_type = models.CharField(max_length=3, choices=SERVER_TYPE_LIST, default="LOC", verbose_name=_("Server Type")) |
|
connection_address = models.IPAddressField(blank=True,verbose_name=_("IP Address")) |
|
connection_login = models.CharField(max_length=30, blank=True, verbose_name=_("Login")) |
|
connection_password = models.CharField(max_length=30, blank=True, verbose_name=_("Password")) |
|
daemon_init_script = models.CharField(max_length=255, default='/etc/init.d/openvpn', verbose_name=_("Init Script")) |
|
prefix_command = models.CharField(max_length=255, blank=True, verbose_name=_("Prefix Cmd")) |
|
email_subject = models.CharField(max_length=255,blank=False, default='Compte OpenVPN', verbose_name=_("Email Subject")) |
|
email_message = models.TextField(blank=True, default=default_email_message, verbose_name=_("Email Message")) |
|
ovpnserver = models.OneToOneField(OVPNServer, verbose_name=_("OVPNServer")) |
|
ovpnagent_key = models.CharField(max_length=24, verbose_name=_("OVPNAgentKey")) |
|
|
|
def save(self,force_insert=False,force_update=False): |
|
if force_update is False: |
|
self.ovpnserver.is_configured = True |
|
super(OVPNServer, self.ovpnserver).save(force_update) |
|
super(OVPNServerSettings, self).save(force_insert,force_update) |
|
|
|
class OVPNServerSettingsForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNServerSettings |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(OVPNServerSettingsForm, self).__init__(*args, **kwargs) |
|
self.fields['email_message'].widget.attrs['cols'] = 100 |
|
|
|
class OVPNUser(models.Model): |
|
name = models.CharField(max_length=30, verbose_name=_("Name")) |
|
email = models.EmailField(verbose_name=_("Email")) |
|
is_valid = models.BooleanField(editable=False,blank=True) |
|
type = models.CharField(max_length=20, blank=True, verbose_name=_("Type")) |
|
_country = models.CharField(max_length=2,editable=False,blank=True) |
|
_state = models.CharField(max_length=30,editable=False,blank=True) |
|
_city = models.CharField(max_length=30,editable=False,blank=True) |
|
_organization = models.CharField(max_length=40,editable=False,blank=True) |
|
_validity = models.DateField(editable=False,blank=True) |
|
password = models.CharField(max_length=30, verbose_name=_("Password")) |
|
password_validation = models.CharField(max_length=30, verbose_name=_("Confirm Password")) |
|
extra_config = models.TextField(blank=True) |
|
user_key_content = models.TextField(editable=False, blank=True) |
|
user_csr_content = models.TextField(editable=False, blank=True) |
|
user_crt_content = models.TextField(editable=False, blank=True) |
|
user_crl_content = models.TextField(editable=False, blank=True) |
|
user_cert_index = models.TextField(max_length=4,editable=False, blank=True) |
|
config_content = models.TextField(editable=False, blank=True) |
|
ovpnserver = models.ForeignKey(OVPNServer, verbose_name=_("OVPNServer")) |
|
|
|
def __unicode__(self): |
|
return self.name |
|
|
|
# private methods |
|
|
|
# public methods |
|
|
|
def save(self,force_insert=False,force_update=False): |
|
""" Overload of public method used to save object in database """ |
|
|
|
self.set_validity() |
|
# set certificate values as of authority |
|
self._country = self.ovpnserver.ovpnauthority.country |
|
self._state = self.ovpnserver.ovpnauthority.state |
|
self._city = self.ovpnserver.ovpnauthority.city |
|
self._organization = self.ovpnserver.ovpnauthority.organization |
|
self._validity = self.ovpnserver.ovpnauthority.expiration_date |
|
# generate user certificates |
|
self.generate_user_certificate() |
|
|
|
self.generate_user_config() |
|
|
|
self.ovpnserver.is_up_to_date = False |
|
super(OVPNServer, self.ovpnserver).save() |
|
|
|
super(OVPNUser, self).save(force_insert,force_update) |
|
|
|
def set_validity(self,isvalid=True): |
|
self.is_valid = isvalid |
|
|
|
def generate_user_config(self): |
|
# create ovpnconfig object |
|
ovpnclient_config = OVPNConfig() |
|
ovpnclient_config.ip_address = self.ovpnserver.public_ip_address |
|
if len(self.ovpnserver.dns_name) != 0: |
|
ovpnclient_config.dns_name = self.ovpnserver.dns_name |
|
ovpnclient_config.port = self.ovpnserver.port |
|
ovpnclient_config.protocol = self.ovpnserver.protocol |
|
ovpnclient_config.device = self.ovpnserver.device |
|
ovpnclient_config.keep_alive = self.ovpnserver.keep_alive |
|
ovpnclient_config.compress_data = self.ovpnserver.compress_data |
|
ovpnclient_config.tls_server = self.ovpnserver.tls_server |
|
ovpnclient_config.client_pkcs_file = self.name + '-' + self.ovpnserver.ovpnauthority.ovpnsite.name + '-' + self.ovpnserver.ovpnauthority.name + '-' + self.ovpnserver.name + '.p12' |
|
ovpnclient_config.log_verbosity = self.ovpnserver.log_verbosity |
|
|
|
# generate client config file |
|
self.config_content = ovpnclient_config.generate_client_config() |
|
|
|
def generate_user_certificate(self): |
|
# creates ovpntools object |
|
ovpntools = self.ovpnserver.ovpnauthority.create_ovpntools_object() |
|
ovpntools.commonName = self.name |
|
ovpntools.email = self.email |
|
|
|
# generate temporary authority files |
|
self.ovpnserver.ovpnauthority.write_authority_files() |
|
ovpntools.generate_ossl_config() |
|
|
|
# generate user csr key |
|
ovpntools.generate_user_csr_key(self.name) |
|
self.user_csr_content = ovpntools.read_user_csr(self.name).rstrip('\n') |
|
self.user_key_content = ovpntools.read_user_key(self.name).rstrip('\n') |
|
|
|
# generate user crt |
|
ovpntools.generate_user_crt(self.name) |
|
self.user_crt_content = ovpntools.read_user_certificate(self.name).rstrip('\n') |
|
|
|
# generate user crl |
|
ovpntools.generate_user_crl(self.name) |
|
self.user_crl_content = ovpntools.read_user_crl(self.name).rstrip('\n') |
|
|
|
# read user certificate index |
|
self.user_cert_index = ovpntools.read_user_cert_index() |
|
|
|
# generate user pkcs |
|
#ovpntools.generate_user_pkcs(self.name,self.password) |
|
|
|
# update authority index and serial |
|
self.ovpnserver.ovpnauthority.ca_index = ovpntools.read_index().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_index_attr = ovpntools.read_index_attr().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_index_old = ovpntools.read_index_old().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_serial = ovpntools.read_serial().rstrip('\n') |
|
#self.ovpnserver.ovpnauthority.save(force_update=True) |
|
super(OVPNAuthority,self.ovpnserver.ovpnauthority).save() |
|
|
|
def revoke_user(self): |
|
# generate temporary certs files |
|
self.ovpnserver.ovpnauthority.write_authority_files() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + self.name + '.key', 'w') |
|
print >> f, self.user_key_content |
|
f.close() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + self.name + '.csr', 'w') |
|
print >> f, self.user_csr_content |
|
f.close() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + self.name + '.crt', 'w') |
|
print >> f, self.user_crt_content |
|
f.close() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + self.user_cert_index + '.pem', 'w') |
|
print >> f, self.user_crt_content |
|
f.close() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + self.name + '.crl', 'w') |
|
print >> f, self.user_crl_content |
|
f.close() |
|
# creates ovpntools object |
|
ovpntools = self.ovpnserver.ovpnauthority.create_ovpntools_object() |
|
ovpntools.commonName = self.name |
|
ovpntools.email = self.email |
|
|
|
ovpntools.revoke_user(self.name) |
|
self.is_valid = False; |
|
|
|
# update authority index and serial |
|
self.ovpnserver.ovpnauthority.ca_index = ovpntools.read_index().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_index_attr = ovpntools.read_index_attr().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_index_old = ovpntools.read_index_old().rstrip('\n') |
|
self.ovpnserver.ovpnauthority.ca_serial = ovpntools.read_serial().rstrip('\n') |
|
super(OVPNAuthority, self.ovpnserver.ovpnauthority).save() |
|
|
|
self.ovpnserver.is_up_to_date = False |
|
super(OVPNServer, self.ovpnserver).save() |
|
|
|
# save ovpnuser object |
|
super(OVPNUser, self).save() |
|
|
|
def notify_user(self, subject, message, copyto=None): |
|
""" function used to send user, connexion parameters and certificate by email """ |
|
if copyto is not None: |
|
self.email.append(copyto) |
|
send_mail(subject, message, sender, self.email) |
|
|
|
def delete(self): |
|
# generate temporary certs files |
|
self.ovpnserver.ovpnauthority.write_authority_files() |
|
# delete the user's line |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + 'index.txt', 'r') |
|
newlines = '' |
|
# converts user_cert_index into hexa |
|
user_cert_index_hex = '%.2X' % int(self.user_cert_index) |
|
for line in f: |
|
(flag, serial, serial2, id, type, cn) = line.split("\t") |
|
#if user_cert_index_hex in line: |
|
if id == user_cert_index_hex: |
|
print "removed line %s" % line |
|
else: |
|
newlines += line |
|
f.close() |
|
f = open(self.ovpnserver.ovpnauthority.work_dir + '/' + 'index.txt', 'w') |
|
print >> f, newlines |
|
f.close() |
|
|
|
# update authority index |
|
# creates ovpntools object |
|
ovpntools = self.ovpnserver.ovpnauthority.create_ovpntools_object() |
|
ovpntools.commonName = self.name |
|
ovpntools.email = self.email |
|
|
|
self.ovpnserver.ovpnauthority.ca_index = ovpntools.read_index().rstrip('\n') |
|
# save the ovpnathority |
|
super(OVPNAuthority, self.ovpnserver.ovpnauthority).save() |
|
|
|
self.ovpnserver.is_up_to_date = False |
|
super(OVPNServer, self.ovpnserver).save() |
|
|
|
# delete object |
|
super(OVPNUser,self).delete() |
|
|
|
class Meta: |
|
unique_together = ("ovpnserver", "name") |
|
|
|
|
|
class OVPNUserParamForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNUser |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(OVPNUserParamForm, self).__init__(*args, **kwargs) |
|
if self.instance: |
|
#self.fields['password'].initial = OVPNUser.objects.get(id = self.instance.id).password |
|
#self.fields['password_validation'].initial = OVPNUser.objects.get(id = self.instance.id).password |
|
self.fields['password'].required = False |
|
self.fields['password_validation'].required = False |
|
self.fields['extra_config'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2'}) |
|
self.fields['password'].widget = forms.PasswordInput() |
|
self.fields['password_validation'].widget = forms.PasswordInput() |
|
|
|
def clean_password_validation(self): |
|
if self.cleaned_data.get('password') != self.cleaned_data.get('password_validation'): |
|
raise forms.ValidationError('Passwords must match.') |
|
else: |
|
return self.cleaned_data.get('password') |
|
|
|
class OVPNUserForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNUser |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(OVPNUserForm, self).__init__(*args, **kwargs) |
|
self.fields['extra_config'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2'}) |
|
self.fields['password'].widget = forms.PasswordInput() |
|
self.fields['password_validation'].widget = forms.PasswordInput() |
|
|
|
def clean_password_validation(self): |
|
if self.cleaned_data.get('password') != self.cleaned_data.get('password_validation'): |
|
raise forms.ValidationError('Passwords must match.') |
|
else: |
|
return self.cleaned_data.get('password') |
|
|
|
class OVPNUserPasswdForm(forms.ModelForm): |
|
class Meta: |
|
model = OVPNUser |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(OVPNUserPasswdForm, self).__init__(*args, **kwargs) |
|
self.fields['name'].widget = forms.TextInput(attrs={'readonly':''}) |
|
self.fields['email'].widget = forms.TextInput(attrs={'readonly':''}) |
|
self.fields['type'].widget = forms.TextInput(attrs={'readonly':''}) |
|
self.fields['extra_config'].widget = forms.Textarea(attrs={'cols':'40', 'rows':'2', 'readonly':''}) |
|
self.fields['password'].widget = forms.PasswordInput() |
|
self.fields['password_validation'].widget = forms.PasswordInput() |
|
|
|
def clean_password_validation(self): |
|
if self.cleaned_data.get('password') != self.cleaned_data.get('password_validation'): |
|
raise forms.ValidationError('Passwords must match.') |
|
else: |
|
return self.cleaned_data.get('password') |
|
|
|
class OVPNLogs(models.Model): |
|
""" Class used to manage log events in database """ |
|
date = models.DateTimeField(auto_now=True) |
|
title = models.CharField(max_length=255) |
|
message = models.TextField() |
|
user = models.CharField(max_length=255) |
|
related_object = models.CharField(max_length=255, default='Global') |
|
related_object_id = models.PositiveIntegerField() |
|
|
|
# overload of init function to auto save Logs when created |
|
def __init__(self,*args,**kwargs): |
|
super(OVPNLogs,self).__init__(*args,**kwargs) |
|
self.save(force_insert=True) |
|
|
|
class UserForm_temp(forms.Form): |
|
first_name = forms.CharField() |
|
last_name = forms.CharField() |
|
email = forms.EmailField() |
|
username = forms.CharField() |
|
password = forms.CharField(widget=forms.PasswordInput(render_value=False)) |
|
|
|
def save(self): |
|
new_user = User.objects.create_user(username=self.cleaned_data['username'], |
|
email=self.cleaned_data['email'], |
|
password=self.cleaned_data['password']) |
|
new_user.first_name = self.cleaned_data['first_name'] |
|
new_user.last_name = self.cleaned_data['last_name'] |
|
new_user.is_active = False |
|
new_user.save() |
|
return new_user |
|
|
|
class UserForm(forms.ModelForm): |
|
class Meta: |
|
model = User |
|
|
|
password = forms.CharField(_('password')) |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(UserForm, self).__init__(*args, **kwargs) |
|
self.fields['password'].widget = forms.PasswordInput() |
|
|
|
def clean_password(self): |
|
import random |
|
algo = 'sha1' |
|
salt = get_hexdigest(algo, str(random.random()), str(random.random()))[:5] |
|
hsh = get_hexdigest(algo, salt, self.cleaned_data.get('password')) |
|
hash_passwd = '%s$%s$%s' % (algo, salt, hsh) |
|
return hash_passwd |
|
|
|
|