summaryrefslogtreecommitdiffstats
path: root/certbot_loopia.py
blob: 1569c5181c5fc822d4eb400e0fcb051628f6a3c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import itertools
import re
import zope.interface

from certbot.plugins.dns_common import DNSAuthenticator
from certbot.interfaces import IAuthenticator, IPluginFactory
from datetime import datetime, timedelta
from loopialib import DnsRecord, Loopia, split_domain
from time import sleep


logger = logging.getLogger(__name__)


@zope.interface.implementer(IAuthenticator)
@zope.interface.provider(IPluginFactory)
class LoopiaAuthenticator(DNSAuthenticator):
    """
    Loopia DNS ACME authenticator.

    This Authenticator uses the Loopia API to fulfill a dns-01 challenge.
    """

    #: Short description of plugin
    description = __doc__.strip().split("\n", 1)[0]

    #: TTL for the validation TXT record
    ttl = 30

    def __init__(self, *args, **kwargs):
        super(LoopiaAuthenticator, self).__init__(*args, **kwargs)
        self._client = None
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add, default_propagation_seconds=15 * 60):
        super(LoopiaAuthenticator, cls).add_parser_arguments(
            add, default_propagation_seconds)
        add("credentials", help="Loopia API credentials INI file.")


    def more_info(self):
        """
        More in-depth description of the plugin.
        """

        return "\n".join(line[4:] for line in __doc__.strip().split("\n"))

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            "credentials",
            "Loopia credentials INI file",
            {
                "user": "API username for Loopia account",
                "password": "API password for Loopia account",
            },
        )

    def _get_loopia_client(self):
        return Loopia(
            self.credentials.conf("user"),
            self.credentials.conf("password"))

    def _perform(self, domain, validation_name, validation):
        loopia = self._get_loopia_client()
        domain_parts = split_domain(validation_name)

        dns_record = DnsRecord("TXT", ttl=self.ttl, data=validation)

        logger.debug(
            "Creating TXT record for {} on subdomain {}".format(*domain_parts))
        loopia.add_zone_record(dns_record, *domain_parts)

    def _cleanup(self, domain, validation_name, validation):
        loopia = self._get_loopia_client()
        domain_parts = split_domain(validation_name)
        dns_record = DnsRecord("TXT", ttl=self.ttl, data=validation)

        records = loopia.get_zone_records(*domain_parts)
        delete_subdomain = True
        for record in records:
            # Make sure the record we delete actually matches the one we created
            if dns_record.replace(id=record.id) == record:
                logger.debug("Removing zone record {}".format(record))
                loopia.remove_zone_record(record.id, *domain_parts)
            else:
                # This happens if there are other zone records on the current
                # sub domain.
                delete_subdomain = False

                msg = "Record {} prevents the subdomain from being deleted"
                logger.debug(msg.format(record))

        # Delete subdomain if we emptied it completely
        if delete_subdomain:
            msg = "Removing subdomain {1} on subdomain {0}"
            logger.debug(msg.format(*domain_parts))
            loopia.remove_subdomain(*domain_parts)