summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRob Stradling <rob@comodo.com>2017-01-18 14:37:59 +0000
committerRob Stradling <rob@comodo.com>2017-01-18 14:37:59 +0000
commitabd0416387d8c8680b5293fa21e7e6d1f80b0a3d (patch)
treee5b3b6c06c49eca7ecddd750aa9c41fcf7aed57e
parent793ad6ede2b9ba5da97bc65b270b502698b31301 (diff)
downloadcrl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.zip
crl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.tar.gz
crl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.tar.bz2
Initial code drop
-rw-r--r--Makefile8
-rw-r--r--crl_monitor.go277
-rw-r--r--processor_main.go174
3 files changed, 459 insertions, 0 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..f5962b6
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,8 @@
+all: clean crl_monitor
+
+# Tidy up files created by compiler/linker.
+clean:
+ rm -f crl_monitor
+
+crl_monitor:
+ GOPATH=/root/go go build crl_monitor.go processor_main.go
diff --git a/crl_monitor.go b/crl_monitor.go
new file mode 100644
index 0000000..343872a
--- /dev/null
+++ b/crl_monitor.go
@@ -0,0 +1,277 @@
+/* crl_monitor - Certificate Revocation List Monitor
+ * Written by Rob Stradling
+ * Copyright (C) 2016-2017 COMODO CA Limited
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package main
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "crypto/tls"
+ "crypto/x509"
+ "crypto/x509/pkix"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "strings"
+ "time"
+)
+
+type Work struct {
+ timeout time.Duration
+ transport http.Transport
+ http_client http.Client
+ upsert_statement *sql.Stmt
+}
+
+type WorkItem struct {
+ ca_id int32
+ crl_url string
+ this_update time.Time
+ next_update time.Time
+ last_checked time.Time
+ issuer_cert []byte
+ error_message sql.NullString
+ crl_sha256 [sha256.Size]byte
+}
+
+func checkRedirectURL(req *http.Request, via []*http.Request) error {
+ // Fixup incorrectly encoded redirect URLs
+ req.URL.RawQuery = strings.Replace(req.URL.RawQuery, " ", "%20", -1)
+ return nil
+}
+
+func (w *Work) CustomFlags() string {
+ flag.DurationVar(&w.timeout, "timeout", 15 * time.Second, "HTTP timeout")
+ return fmt.Sprintf(" timeout: %s\n", w.timeout)
+}
+
+func (w *Work) Init() {
+ w.transport = http.Transport { TLSClientConfig: &tls.Config { InsecureSkipVerify: true } }
+ w.http_client = http.Client { CheckRedirect: checkRedirectURL, Timeout: w.timeout, Transport: &w.transport }
+}
+
+// Work.Begin
+// Do any DB stuff that needs to happen before a batch of work.
+func (w *Work) Begin(db *sql.DB) {
+ us, err := db.Prepare(`
+INSERT INTO crl_revoked (
+ CA_ID, SERIAL_NUMBER, REASON_CODE,
+ REVOCATION_DATE, LAST_SEEN_CHECK_DATE
+)
+VALUES (
+ $1, decode($2, 'hex'), $3::smallint,
+ $4,
+ statement_timestamp()
+)
+ON CONFLICT ON CONSTRAINT crlr_pk
+ DO UPDATE SET REASON_CODE = $3::smallint,
+ REVOCATION_DATE = $4,
+ LAST_SEEN_CHECK_DATE = statement_timestamp()
+`)
+ checkErr(err)
+
+ w.upsert_statement = us
+}
+
+// Work.End
+// Do any DB stuff that needs to happen after a batch of work.
+func (w *Work) End() {
+ w.upsert_statement.Close()
+}
+
+// Work.Prepare()
+// Prepare the driving SELECT query.
+func (w *Work) SelectQuery(batch_size int) string {
+ return fmt.Sprintf(`
+SELECT crl.CA_ID, crl.DISTRIBUTION_POINT_URL, coalesce(crl.THIS_UPDATE, 'epoch'::timestamp), coalesce(crl.NEXT_UPDATE, 'epoch'::timestamp), coalesce(crl.LAST_CHECKED, 'epoch'::timestamp), c.CERTIFICATE
+ FROM crl LEFT JOIN LATERAL
+ (SELECT c.CERTIFICATE
+ FROM ca_certificate cac, certificate c
+ WHERE crl.CA_ID = cac.CA_ID
+ AND cac.CERTIFICATE_ID = c.ID
+ LIMIT 1) c ON TRUE
+ WHERE crl.IS_ACTIVE = 't'
+ AND crl.NEXT_CHECK_DUE < statement_timestamp()
+ ORDER BY crl.IS_ACTIVE, crl.NEXT_CHECK_DUE
+ LIMIT %d
+`, batch_size)
+}
+
+// WorkItem.Parse()
+// Parse one SELECTed row to configure one work item.
+func (wi *WorkItem) Parse(rs *sql.Rows) error {
+ return rs.Scan(&wi.ca_id, &wi.crl_url, &wi.this_update, &wi.next_update, &wi.last_checked, &wi.issuer_cert)
+}
+
+func (wi *WorkItem) checkErr(err error) {
+ if err != nil {
+ wi.error_message.String = err.Error()
+ wi.error_message.Valid = true
+ panic(err)
+ }
+}
+
+// WorkItem.Perform()
+// Do the work for one item.
+func (wi *WorkItem) Perform(db *sql.DB, w *Work) {
+ wi.error_message.String = ""
+ wi.error_message.Valid = false
+
+ // Retrieve the CRL
+ var err error
+ var crl *pkix.CertificateList
+ var body []byte
+ if strings.HasPrefix(strings.ToLower(wi.crl_url), "ldap") {
+ // TODO: Support LDAP CRL URLs
+ wi.error_message.String = "Unsupported URL scheme"
+ wi.error_message.Valid = true
+ log.Printf("%s: %s", wi.error_message.String, wi.crl_url)
+ return
+ } else {
+ // Fetch the CRL via HTTP(S)
+ req, err := http.NewRequest("GET", wi.crl_url, nil)
+ wi.checkErr(err)
+ req.Header.Add("User-Agent", "crt.sh")
+ req.Header.Add("If-Modified-Since", wi.this_update.Format(http.TimeFormat))
+
+ resp, err := w.http_client.Do(req)
+ wi.checkErr(err)
+ defer resp.Body.Close()
+ if resp.StatusCode == 304 {
+ log.Printf("Not Modified (304): %s", wi.crl_url)
+ return
+ }
+ if resp.StatusCode != 200 {
+ wi.error_message.String = fmt.Sprintf("HTTP %d", resp.StatusCode)
+ wi.error_message.Valid = true
+ log.Printf("%s: %s", wi.error_message.String, wi.crl_url)
+ return
+ }
+
+ // Extract the HTTP response body
+ body, err = ioutil.ReadAll(resp.Body)
+ wi.checkErr(err)
+ }
+
+ // Progress report
+ log.Printf("Downloaded (%d bytes): %s", len(body), wi.crl_url)
+
+ // Parse the CRL
+ crl, err = x509.ParseCRL(body)
+ wi.checkErr(err)
+
+ // Extract various fields from this CRL
+ var temp_this_update = wi.this_update
+ wi.this_update = crl.TBSCertList.ThisUpdate
+ wi.next_update = crl.TBSCertList.NextUpdate
+
+ // Check that this CRL is newer than the last one we processed
+ if temp_this_update.Sub(wi.this_update) >= 0 {
+ log.Printf("Not Modified (thisUpdate): %s", wi.crl_url)
+ return
+ }
+
+ // Parse the supplied issuer certificate
+ cert, err := x509.ParseCertificate(wi.issuer_cert)
+ checkErr(err)
+
+ // Check this CRL's signature using the supplied issuer certificate
+ err = cert.CheckCRLSignature(crl)
+ wi.checkErr(err)
+
+ // Show progress report
+ log.Printf("Verified: %s", wi.crl_url)
+
+ // Calculate SHA-256(CRL)
+ wi.crl_sha256 = sha256.Sum256(body)
+
+ // TODO: Check crl.HasExpired(time.Now) ?
+ // TODO: Set inactive if "latest" CRL is ancient?
+ // TODO: Deactivate if duplicate of another CDP?
+
+ // Begin a new transaction, prepare the UPSERT statement, and defer the COMMIT statement.
+ tx, err := db.Begin()
+ defer tx.Commit()
+ stmt := tx.Stmt(w.upsert_statement)
+ defer stmt.Close()
+
+ // Loop through revoked certs, UPSERTing each one into the DB
+ for _, revoked_cert := range crl.TBSCertList.RevokedCertificates {
+ // Get the CRL Entry Reason Code (if specified)
+ var reason_code sql.NullInt64
+ reason_code.Valid = false
+ for _, ext := range revoked_cert.Extensions {
+ if ext.Id.Equal([]int{2, 5, 29, 21}) {
+ if bytes.HasPrefix(ext.Value, []byte{10, 1}) { // ENUMERATED, length=1
+ reason_code.Int64 = int64(ext.Value[2])
+ reason_code.Valid = true
+ }
+ }
+ }
+
+ // Convert the revoked serial number to a hex string
+ var serial_string = fmt.Sprintf("%X", revoked_cert.SerialNumber)
+ if revoked_cert.SerialNumber.Sign() >= 0 {
+ if len(serial_string) % 2 != 0 {
+ serial_string = "0" + serial_string
+ } else if serial_string[0] >= 56 { // 56 = "8" in ASCII
+ serial_string = "00" + serial_string
+ }
+ } else {
+ // TODO: Handle negative serial numbers properly
+ log.Printf("NEGATIVE serial number: %X", revoked_cert.SerialNumber)
+ }
+
+ // UPSERT this CRL entry
+ result, err := stmt.Exec(wi.ca_id, serial_string, reason_code, revoked_cert.RevocationTime)
+ wi.checkErr(err)
+ rows_affected, err := result.RowsAffected()
+ wi.checkErr(err)
+ if rows_affected != 1 {
+ wi.checkErr(errors.New("UPSERT failed"))
+ }
+ }
+
+ log.Printf("Processed (%d revocations): %s", len(crl.TBSCertList.RevokedCertificates), wi.crl_url)
+}
+
+// Work.UpdateStatement()
+// Prepare the UPDATE statement to be run after processing each work item.
+func (w *Work) UpdateStatement() string {
+ return `
+UPDATE crl
+ SET CRL_SHA256=$1,
+ THIS_UPDATE=$2::timestamp,
+ NEXT_UPDATE=$3::timestamp,
+ LAST_CHECKED=statement_timestamp(),
+ NEXT_CHECK_DUE=statement_timestamp() + interval '1 hour',
+ ERROR_MESSAGE=$4::text
+ WHERE CA_ID=$5
+ AND DISTRIBUTION_POINT_URL=$6
+`
+}
+
+// WorkItem.Update()
+// Update the DB with the results of the work for this item.
+func (wi *WorkItem) Update(update_statement *sql.Stmt) (sql.Result, error) {
+ return update_statement.Exec(wi.crl_sha256[:], wi.this_update, wi.next_update, wi.error_message, wi.ca_id, wi.crl_url)
+}
diff --git a/processor_main.go b/processor_main.go
new file mode 100644
index 0000000..a7d1c6f
--- /dev/null
+++ b/processor_main.go
@@ -0,0 +1,174 @@
+/* processor_main - PostgreSQL-based workload engine
+ * Written by Rob Stradling
+ * Copyright (C) 2016-2017 COMODO CA Limited
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package main
+
+import (
+ "database/sql"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+ _ "github.com/lib/pq"
+)
+
+func checkErr(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+func recoverErr(context string) {
+ if r := recover(); r != nil {
+ log.Printf("ERROR: %v [%s]", r, context)
+ }
+}
+
+func doUpdateWorkItem(wi *WorkItem, update_statement *sql.Stmt) {
+ result, err := wi.Update(update_statement)
+ if err != nil {
+ log.Printf("ERROR: UPDATE failed (%v)\n", err.Error)
+ } else {
+ rows_affected, err := result.RowsAffected()
+ if err != nil {
+ log.Printf("ERROR: UPDATE failed (%v)\n", err.Error)
+ } else if rows_affected < 1 {
+ log.Println("ERROR: No rows UPDATEd")
+ }
+ }
+}
+
+func doBatchOfWork(db *sql.DB, w *Work, batch_size int, concurrent_items int) int {
+ // Fetch a batch of work to do from the DB
+ log.Println("Initializing...")
+ w.Begin(db)
+ log.Println("Preparing...")
+ select_query := w.SelectQuery(batch_size)
+ log.Printf("Executing...%s", select_query)
+ rows, err := db.Query(select_query)
+ checkErr(err)
+ defer rows.Close()
+
+ // Prepare the UPDATE statement that will be run after performing each work item
+ update_statement, err := db.Prepare(w.UpdateStatement())
+ checkErr(err)
+ defer update_statement.Close()
+
+ // Do the batch of work, throttling the number of concurrent work items
+ log.Println("Performing...")
+ var wg sync.WaitGroup
+ var chan_concurrency = make(chan int, concurrent_items)
+ var i int
+ for i = 0; rows.Next(); i++ {
+ var wi WorkItem
+ err = wi.Parse(rows)
+ checkErr(err)
+ wg.Add(1)
+ go func() {
+ defer func() {
+ wg.Done()
+ }()
+ defer doUpdateWorkItem(&wi, update_statement)
+ chan_concurrency <- 1
+ defer func() { <-chan_concurrency }()
+ defer recoverErr(wi.crl_url)
+ wi.Perform(db, w)
+ }()
+ }
+
+ // Wait for all work items to complete
+ wg.Wait()
+ w.End()
+
+ return i
+}
+
+func main() {
+ defer recoverErr("main")
+
+ // Configure signal handling
+ chan_signals := make(chan os.Signal, 20)
+ signal.Notify(chan_signals, os.Interrupt, syscall.SIGTERM)
+
+ // Parse common command line flags
+ var conn_info string
+ flag.StringVar(&conn_info, "conninfo", fmt.Sprintf("user=certwatch dbname=certwatch connect_timeout=5 sslmode=disable application_name=%s", os.Args[0][(strings.LastIndex(os.Args[0], "/") + 1):len(os.Args[0])]), "DB connection info")
+ var conn_open int
+ flag.IntVar(&conn_open, "connopen", 5, "Maximum number of open connections to the DB [0=unlimited]")
+ var conn_idle int
+ flag.IntVar(&conn_idle, "connidle", 0, "Maximum number of connections in the idle connection pool")
+ var interval time.Duration
+ flag.DurationVar(&interval, "interval", time.Second * 30, "How often to check for more work [0s=exit when no more work to do]")
+ var batch_size int
+ flag.IntVar(&batch_size, "batch", 100, "Maximum number of items per batch of work")
+ var concurrent_items int
+ flag.IntVar(&concurrent_items, "concurrent", 10, "Maximum number of items processed simultaneously")
+
+ // Parse any custom flags
+ var work Work
+ custom_flags := work.CustomFlags()
+ flag.Parse()
+ work.Init()
+
+ // Show configuration
+ log.Printf("Configuration:\n conninfo: %s\n connopen: %d\n connidle: %d\n interval: %v\n batch: %d\n concurrent: %d\n%s", conn_info, conn_open, conn_idle, interval, batch_size, concurrent_items, custom_flags)
+
+ // Connect to the database
+ log.Println("Connecting...")
+ db, err := sql.Open("postgres", conn_info)
+ checkErr(err)
+ defer db.Close()
+ db.SetMaxOpenConns(conn_open)
+ db.SetMaxIdleConns(conn_idle)
+
+ // Perform work in batches
+ next_time := time.Now()
+ keep_looping := true
+ for keep_looping {
+ // Perform one batch of work
+ items_processed := doBatchOfWork(db, &work, batch_size, concurrent_items)
+
+ // Exit if interval=0s and there's no more work to do
+ if (items_processed == 0) && (interval == 0) {
+ break
+ }
+
+ // Schedule the next batch of work
+ next_time = next_time.Add(interval)
+ if (items_processed > 0) || (next_time.Before(time.Now())) {
+ next_time = time.Now()
+ }
+
+ // Have a rest if possible. Process any pending SIGINT or SIGTERM.
+ log.Println("Resting...")
+ select {
+ case sig := <-chan_signals:
+ log.Printf("Signal received: %v\n", sig)
+ keep_looping = false
+ case <-time.After(next_time.Sub(time.Now())):
+ }
+ }
+
+ // We're done
+ log.Println("Goodbye!")
+}