125 lines
4.0 KiB
Python
125 lines
4.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import bcrypt
|
|
import dns.exception
|
|
import dns.resolver
|
|
import os
|
|
import requests
|
|
import socket
|
|
import ssl
|
|
import subprocess
|
|
|
|
NULL_IP = '[100::1]'
|
|
|
|
def compile_url(domain, port, proto='https'):
|
|
port = ':{}'.format(port) if (proto == 'https' and port != '443') or (proto == 'http' and port != '80') else ''
|
|
host = '{}{}'.format(domain, port)
|
|
return '{}://{}'.format(proto, host) if proto is not None else host
|
|
|
|
def get_container_ip(app):
|
|
# Return an IP address of a container. If the container is not running, return address from IPv6 discard prefix instead
|
|
try:
|
|
return subprocess.run(['/usr/bin/docker', 'inspect', '-f', '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}', app], check=True, stdout=subprocess.PIPE).stdout.decode().strip()
|
|
except:
|
|
return NULL_IP
|
|
|
|
def get_local_ipv4():
|
|
# Return first routable IPv4 address
|
|
try:
|
|
return subprocess.run(['/sbin/ip', 'route', 'get', '1'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-1]
|
|
except:
|
|
return None
|
|
|
|
def get_local_ipv6():
|
|
# Return first routable IPv6 address
|
|
try:
|
|
return subprocess.run(['/sbin/ip', 'route', 'get', '2003::'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-3]
|
|
except:
|
|
return None
|
|
|
|
def get_external_ip(family):
|
|
# Return external IP address of given family via 3rd party service
|
|
allowed_gai_family = requests.packages.urllib3.util.connection.allowed_gai_family
|
|
try:
|
|
requests.packages.urllib3.util.connection.allowed_gai_family = lambda: family
|
|
return requests.get('https://tools.dasm.cz/myip.php', timeout=5).text
|
|
except:
|
|
return None
|
|
finally:
|
|
requests.packages.urllib3.util.connection.allowed_gai_family = allowed_gai_family
|
|
|
|
def get_external_ipv4():
|
|
# Return external IPv4 address
|
|
return get_external_ip(socket.AF_INET)
|
|
|
|
def get_external_ipv6():
|
|
# Return external IPv6 address
|
|
return get_external_ip(socket.AF_INET6)
|
|
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.timeout = 3
|
|
resolver.lifetime = 3
|
|
resolver.nameservers = ['8.8.8.8', '8.8.4.4', '2001:4860:4860::8888', '2001:4860:4860::8844']
|
|
|
|
def resolve_ip(domain, type):
|
|
# Resolve domain name using Google Public DNS
|
|
try:
|
|
return resolver.query(domain, type)[0].address
|
|
except dns.exception.Timeout:
|
|
raise
|
|
except:
|
|
return None
|
|
|
|
def ping_url(url):
|
|
try:
|
|
return requests.post('https://tools.dasm.cz/vm-ping.php', data = {'url': url}, timeout=5).text == 'vm-pong'
|
|
except requests.exceptions.Timeout:
|
|
raise
|
|
except:
|
|
return False
|
|
|
|
def is_service_started(app):
|
|
# Check OpenRC service status without calling any binary
|
|
return os.path.exists(os.path.join('/run/openrc/started', app))
|
|
|
|
def is_service_autostarted(app):
|
|
# Check OpenRC service enablement
|
|
return os.path.exists(os.path.join('/etc/runlevels/default', app))
|
|
|
|
def start_service(service):
|
|
subprocess.run(['/sbin/service', service, 'start'], check=True)
|
|
|
|
def stop_service(service):
|
|
subprocess.run(['/sbin/service', service, 'stop'], check=True)
|
|
|
|
def restart_service(service):
|
|
subprocess.run(['/sbin/service', service, 'restart'])
|
|
|
|
def reload_nginx():
|
|
subprocess.run(['/sbin/service', 'nginx', 'reload'])
|
|
|
|
def restart_nginx():
|
|
restart_service('nginx')
|
|
|
|
def get_cert_info(cert):
|
|
data = ssl._ssl._test_decode_cert(cert)
|
|
data['subject'] = dict(data['subject'][i][0] for i in range(len(data['subject'])))
|
|
data['issuer'] = dict(data['issuer'][i][0] for i in range(len(data['issuer'])))
|
|
return data
|
|
|
|
def adminpwd_hash(password):
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
def adminpwd_verify(password, hash):
|
|
return bcrypt.checkpw(password.encode(), hash.encode())
|
|
|
|
def update_luks_password(oldpassword, newpassword):
|
|
input = '{}\n{}'.format(oldpassword, newpassword).encode()
|
|
subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True)
|
|
|
|
def shutdown_vm():
|
|
subprocess.run(['/sbin/poweroff'])
|
|
|
|
def reboot_vm():
|
|
subprocess.run(['/sbin/reboot'])
|