1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
import logging
import itertools
import re
import zope.interface
from certbot.plugins.dns_common import DNSAuthenticator
from certbot.interfaces import IAuthenticator, IPluginFactory
from datetime import datetime, timedelta
from loopialib import DnsRecord, Loopia, split_domain
from time import sleep
logger = logging.getLogger(__name__)
@zope.interface.implementer(IAuthenticator)
@zope.interface.provider(IPluginFactory)
class LoopiaAuthenticator(DNSAuthenticator):
"""
Loopia DNS ACME authenticator.
This Authenticator uses the Loopia API to fulfill a dns-01 challenge.
"""
#: Short description of plugin
description = __doc__.strip().split("\n", 1)[0]
#: TTL for the validation TXT record
ttl = 30
def __init__(self, *args, **kwargs):
super(LoopiaAuthenticator, self).__init__(*args, **kwargs)
self._client = None
self.credentials = None
@classmethod
def add_parser_arguments(cls, add, default_propagation_seconds=15 * 60):
super(LoopiaAuthenticator, cls).add_parser_arguments(
add, default_propagation_seconds)
add("credentials", help="Loopia API credentials INI file.")
def more_info(self):
"""
More in-depth description of the plugin.
"""
return "\n".join(line[4:] for line in __doc__.strip().split("\n"))
def _setup_credentials(self):
self.credentials = self._configure_credentials(
"credentials",
"Loopia credentials INI file",
{
"user": "API username for Loopia account",
"password": "API password for Loopia account",
},
)
def _get_loopia_client(self):
return Loopia(
self.credentials.conf("user"),
self.credentials.conf("password"))
def _perform(self, domain, validation_name, validation):
loopia = self._get_loopia_client()
domain_parts = split_domain(validation_name)
dns_record = DnsRecord("TXT", ttl=self.ttl, data=validation)
logger.debug(
"Creating TXT record for {} on subdomain {}".format(*domain_parts))
loopia.add_zone_record(dns_record, *domain_parts)
def _cleanup(self, domain, validation_name, validation):
loopia = self._get_loopia_client()
domain_parts = split_domain(validation_name)
dns_record = DnsRecord("TXT", ttl=self.ttl, data=validation)
records = loopia.get_zone_records(*domain_parts)
delete_subdomain = True
for record in records:
# Make sure the record we delete actually matches the one we created
if dns_record.replace(id=record.id) == record:
logger.debug("Removing zone record {}".format(record))
loopia.remove_zone_record(record.id, *domain_parts)
else:
# This happens if there are other zone records on the current
# sub domain.
delete_subdomain = False
msg = "Record {} prevents the subdomain from being deleted"
logger.debug(msg.format(record))
# Delete subdomain if we emptied it completely
if delete_subdomain:
msg = "Removing subdomain {1} on subdomain {0}"
logger.debug(msg.format(*domain_parts))
loopia.remove_subdomain(*domain_parts)
|