diff options
author | Rob Stradling <rob@comodo.com> | 2017-01-18 14:37:59 +0000 |
---|---|---|
committer | Rob Stradling <rob@comodo.com> | 2017-01-18 14:37:59 +0000 |
commit | abd0416387d8c8680b5293fa21e7e6d1f80b0a3d (patch) | |
tree | e5b3b6c06c49eca7ecddd750aa9c41fcf7aed57e | |
parent | 793ad6ede2b9ba5da97bc65b270b502698b31301 (diff) | |
download | crl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.zip crl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.tar.gz crl_monitor-abd0416387d8c8680b5293fa21e7e6d1f80b0a3d.tar.bz2 |
Initial code drop
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | crl_monitor.go | 277 | ||||
-rw-r--r-- | processor_main.go | 174 |
3 files changed, 459 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f5962b6 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +all: clean crl_monitor + +# Tidy up files created by compiler/linker. +clean: + rm -f crl_monitor + +crl_monitor: + GOPATH=/root/go go build crl_monitor.go processor_main.go diff --git a/crl_monitor.go b/crl_monitor.go new file mode 100644 index 0000000..343872a --- /dev/null +++ b/crl_monitor.go @@ -0,0 +1,277 @@ +/* crl_monitor - Certificate Revocation List Monitor + * Written by Rob Stradling + * Copyright (C) 2016-2017 COMODO CA Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package main + +import ( + "bytes" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "database/sql" + "errors" + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "strings" + "time" +) + +type Work struct { + timeout time.Duration + transport http.Transport + http_client http.Client + upsert_statement *sql.Stmt +} + +type WorkItem struct { + ca_id int32 + crl_url string + this_update time.Time + next_update time.Time + last_checked time.Time + issuer_cert []byte + error_message sql.NullString + crl_sha256 [sha256.Size]byte +} + +func checkRedirectURL(req *http.Request, via []*http.Request) error { + // Fixup incorrectly encoded redirect URLs + req.URL.RawQuery = strings.Replace(req.URL.RawQuery, " ", "%20", -1) + return nil +} + +func (w *Work) CustomFlags() string { + flag.DurationVar(&w.timeout, "timeout", 15 * time.Second, "HTTP timeout") + return fmt.Sprintf(" timeout: %s\n", w.timeout) +} + +func (w *Work) Init() { + w.transport = http.Transport { TLSClientConfig: &tls.Config { InsecureSkipVerify: true } } + w.http_client = http.Client { CheckRedirect: checkRedirectURL, Timeout: w.timeout, Transport: &w.transport } +} + +// Work.Begin +// Do any DB stuff that needs to happen before a batch of work. +func (w *Work) Begin(db *sql.DB) { + us, err := db.Prepare(` +INSERT INTO crl_revoked ( + CA_ID, SERIAL_NUMBER, REASON_CODE, + REVOCATION_DATE, LAST_SEEN_CHECK_DATE +) +VALUES ( + $1, decode($2, 'hex'), $3::smallint, + $4, + statement_timestamp() +) +ON CONFLICT ON CONSTRAINT crlr_pk + DO UPDATE SET REASON_CODE = $3::smallint, + REVOCATION_DATE = $4, + LAST_SEEN_CHECK_DATE = statement_timestamp() +`) + checkErr(err) + + w.upsert_statement = us +} + +// Work.End +// Do any DB stuff that needs to happen after a batch of work. +func (w *Work) End() { + w.upsert_statement.Close() +} + +// Work.Prepare() +// Prepare the driving SELECT query. +func (w *Work) SelectQuery(batch_size int) string { + return fmt.Sprintf(` +SELECT crl.CA_ID, crl.DISTRIBUTION_POINT_URL, coalesce(crl.THIS_UPDATE, 'epoch'::timestamp), coalesce(crl.NEXT_UPDATE, 'epoch'::timestamp), coalesce(crl.LAST_CHECKED, 'epoch'::timestamp), c.CERTIFICATE + FROM crl LEFT JOIN LATERAL + (SELECT c.CERTIFICATE + FROM ca_certificate cac, certificate c + WHERE crl.CA_ID = cac.CA_ID + AND cac.CERTIFICATE_ID = c.ID + LIMIT 1) c ON TRUE + WHERE crl.IS_ACTIVE = 't' + AND crl.NEXT_CHECK_DUE < statement_timestamp() + ORDER BY crl.IS_ACTIVE, crl.NEXT_CHECK_DUE + LIMIT %d +`, batch_size) +} + +// WorkItem.Parse() +// Parse one SELECTed row to configure one work item. +func (wi *WorkItem) Parse(rs *sql.Rows) error { + return rs.Scan(&wi.ca_id, &wi.crl_url, &wi.this_update, &wi.next_update, &wi.last_checked, &wi.issuer_cert) +} + +func (wi *WorkItem) checkErr(err error) { + if err != nil { + wi.error_message.String = err.Error() + wi.error_message.Valid = true + panic(err) + } +} + +// WorkItem.Perform() +// Do the work for one item. +func (wi *WorkItem) Perform(db *sql.DB, w *Work) { + wi.error_message.String = "" + wi.error_message.Valid = false + + // Retrieve the CRL + var err error + var crl *pkix.CertificateList + var body []byte + if strings.HasPrefix(strings.ToLower(wi.crl_url), "ldap") { + // TODO: Support LDAP CRL URLs + wi.error_message.String = "Unsupported URL scheme" + wi.error_message.Valid = true + log.Printf("%s: %s", wi.error_message.String, wi.crl_url) + return + } else { + // Fetch the CRL via HTTP(S) + req, err := http.NewRequest("GET", wi.crl_url, nil) + wi.checkErr(err) + req.Header.Add("User-Agent", "crt.sh") + req.Header.Add("If-Modified-Since", wi.this_update.Format(http.TimeFormat)) + + resp, err := w.http_client.Do(req) + wi.checkErr(err) + defer resp.Body.Close() + if resp.StatusCode == 304 { + log.Printf("Not Modified (304): %s", wi.crl_url) + return + } + if resp.StatusCode != 200 { + wi.error_message.String = fmt.Sprintf("HTTP %d", resp.StatusCode) + wi.error_message.Valid = true + log.Printf("%s: %s", wi.error_message.String, wi.crl_url) + return + } + + // Extract the HTTP response body + body, err = ioutil.ReadAll(resp.Body) + wi.checkErr(err) + } + + // Progress report + log.Printf("Downloaded (%d bytes): %s", len(body), wi.crl_url) + + // Parse the CRL + crl, err = x509.ParseCRL(body) + wi.checkErr(err) + + // Extract various fields from this CRL + var temp_this_update = wi.this_update + wi.this_update = crl.TBSCertList.ThisUpdate + wi.next_update = crl.TBSCertList.NextUpdate + + // Check that this CRL is newer than the last one we processed + if temp_this_update.Sub(wi.this_update) >= 0 { + log.Printf("Not Modified (thisUpdate): %s", wi.crl_url) + return + } + + // Parse the supplied issuer certificate + cert, err := x509.ParseCertificate(wi.issuer_cert) + checkErr(err) + + // Check this CRL's signature using the supplied issuer certificate + err = cert.CheckCRLSignature(crl) + wi.checkErr(err) + + // Show progress report + log.Printf("Verified: %s", wi.crl_url) + + // Calculate SHA-256(CRL) + wi.crl_sha256 = sha256.Sum256(body) + + // TODO: Check crl.HasExpired(time.Now) ? + // TODO: Set inactive if "latest" CRL is ancient? + // TODO: Deactivate if duplicate of another CDP? + + // Begin a new transaction, prepare the UPSERT statement, and defer the COMMIT statement. + tx, err := db.Begin() + defer tx.Commit() + stmt := tx.Stmt(w.upsert_statement) + defer stmt.Close() + + // Loop through revoked certs, UPSERTing each one into the DB + for _, revoked_cert := range crl.TBSCertList.RevokedCertificates { + // Get the CRL Entry Reason Code (if specified) + var reason_code sql.NullInt64 + reason_code.Valid = false + for _, ext := range revoked_cert.Extensions { + if ext.Id.Equal([]int{2, 5, 29, 21}) { + if bytes.HasPrefix(ext.Value, []byte{10, 1}) { // ENUMERATED, length=1 + reason_code.Int64 = int64(ext.Value[2]) + reason_code.Valid = true + } + } + } + + // Convert the revoked serial number to a hex string + var serial_string = fmt.Sprintf("%X", revoked_cert.SerialNumber) + if revoked_cert.SerialNumber.Sign() >= 0 { + if len(serial_string) % 2 != 0 { + serial_string = "0" + serial_string + } else if serial_string[0] >= 56 { // 56 = "8" in ASCII + serial_string = "00" + serial_string + } + } else { + // TODO: Handle negative serial numbers properly + log.Printf("NEGATIVE serial number: %X", revoked_cert.SerialNumber) + } + + // UPSERT this CRL entry + result, err := stmt.Exec(wi.ca_id, serial_string, reason_code, revoked_cert.RevocationTime) + wi.checkErr(err) + rows_affected, err := result.RowsAffected() + wi.checkErr(err) + if rows_affected != 1 { + wi.checkErr(errors.New("UPSERT failed")) + } + } + + log.Printf("Processed (%d revocations): %s", len(crl.TBSCertList.RevokedCertificates), wi.crl_url) +} + +// Work.UpdateStatement() +// Prepare the UPDATE statement to be run after processing each work item. +func (w *Work) UpdateStatement() string { + return ` +UPDATE crl + SET CRL_SHA256=$1, + THIS_UPDATE=$2::timestamp, + NEXT_UPDATE=$3::timestamp, + LAST_CHECKED=statement_timestamp(), + NEXT_CHECK_DUE=statement_timestamp() + interval '1 hour', + ERROR_MESSAGE=$4::text + WHERE CA_ID=$5 + AND DISTRIBUTION_POINT_URL=$6 +` +} + +// WorkItem.Update() +// Update the DB with the results of the work for this item. +func (wi *WorkItem) Update(update_statement *sql.Stmt) (sql.Result, error) { + return update_statement.Exec(wi.crl_sha256[:], wi.this_update, wi.next_update, wi.error_message, wi.ca_id, wi.crl_url) +} diff --git a/processor_main.go b/processor_main.go new file mode 100644 index 0000000..a7d1c6f --- /dev/null +++ b/processor_main.go @@ -0,0 +1,174 @@ +/* processor_main - PostgreSQL-based workload engine + * Written by Rob Stradling + * Copyright (C) 2016-2017 COMODO CA Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package main + +import ( + "database/sql" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + _ "github.com/lib/pq" +) + +func checkErr(err error) { + if err != nil { + panic(err) + } +} + +func recoverErr(context string) { + if r := recover(); r != nil { + log.Printf("ERROR: %v [%s]", r, context) + } +} + +func doUpdateWorkItem(wi *WorkItem, update_statement *sql.Stmt) { + result, err := wi.Update(update_statement) + if err != nil { + log.Printf("ERROR: UPDATE failed (%v)\n", err.Error) + } else { + rows_affected, err := result.RowsAffected() + if err != nil { + log.Printf("ERROR: UPDATE failed (%v)\n", err.Error) + } else if rows_affected < 1 { + log.Println("ERROR: No rows UPDATEd") + } + } +} + +func doBatchOfWork(db *sql.DB, w *Work, batch_size int, concurrent_items int) int { + // Fetch a batch of work to do from the DB + log.Println("Initializing...") + w.Begin(db) + log.Println("Preparing...") + select_query := w.SelectQuery(batch_size) + log.Printf("Executing...%s", select_query) + rows, err := db.Query(select_query) + checkErr(err) + defer rows.Close() + + // Prepare the UPDATE statement that will be run after performing each work item + update_statement, err := db.Prepare(w.UpdateStatement()) + checkErr(err) + defer update_statement.Close() + + // Do the batch of work, throttling the number of concurrent work items + log.Println("Performing...") + var wg sync.WaitGroup + var chan_concurrency = make(chan int, concurrent_items) + var i int + for i = 0; rows.Next(); i++ { + var wi WorkItem + err = wi.Parse(rows) + checkErr(err) + wg.Add(1) + go func() { + defer func() { + wg.Done() + }() + defer doUpdateWorkItem(&wi, update_statement) + chan_concurrency <- 1 + defer func() { <-chan_concurrency }() + defer recoverErr(wi.crl_url) + wi.Perform(db, w) + }() + } + + // Wait for all work items to complete + wg.Wait() + w.End() + + return i +} + +func main() { + defer recoverErr("main") + + // Configure signal handling + chan_signals := make(chan os.Signal, 20) + signal.Notify(chan_signals, os.Interrupt, syscall.SIGTERM) + + // Parse common command line flags + var conn_info string + flag.StringVar(&conn_info, "conninfo", fmt.Sprintf("user=certwatch dbname=certwatch connect_timeout=5 sslmode=disable application_name=%s", os.Args[0][(strings.LastIndex(os.Args[0], "/") + 1):len(os.Args[0])]), "DB connection info") + var conn_open int + flag.IntVar(&conn_open, "connopen", 5, "Maximum number of open connections to the DB [0=unlimited]") + var conn_idle int + flag.IntVar(&conn_idle, "connidle", 0, "Maximum number of connections in the idle connection pool") + var interval time.Duration + flag.DurationVar(&interval, "interval", time.Second * 30, "How often to check for more work [0s=exit when no more work to do]") + var batch_size int + flag.IntVar(&batch_size, "batch", 100, "Maximum number of items per batch of work") + var concurrent_items int + flag.IntVar(&concurrent_items, "concurrent", 10, "Maximum number of items processed simultaneously") + + // Parse any custom flags + var work Work + custom_flags := work.CustomFlags() + flag.Parse() + work.Init() + + // Show configuration + log.Printf("Configuration:\n conninfo: %s\n connopen: %d\n connidle: %d\n interval: %v\n batch: %d\n concurrent: %d\n%s", conn_info, conn_open, conn_idle, interval, batch_size, concurrent_items, custom_flags) + + // Connect to the database + log.Println("Connecting...") + db, err := sql.Open("postgres", conn_info) + checkErr(err) + defer db.Close() + db.SetMaxOpenConns(conn_open) + db.SetMaxIdleConns(conn_idle) + + // Perform work in batches + next_time := time.Now() + keep_looping := true + for keep_looping { + // Perform one batch of work + items_processed := doBatchOfWork(db, &work, batch_size, concurrent_items) + + // Exit if interval=0s and there's no more work to do + if (items_processed == 0) && (interval == 0) { + break + } + + // Schedule the next batch of work + next_time = next_time.Add(interval) + if (items_processed > 0) || (next_time.Before(time.Now())) { + next_time = time.Now() + } + + // Have a rest if possible. Process any pending SIGINT or SIGTERM. + log.Println("Resting...") + select { + case sig := <-chan_signals: + log.Printf("Signal received: %v\n", sig) + keep_looping = false + case <-time.After(next_time.Sub(time.Now())): + } + } + + // We're done + log.Println("Goodbye!") +} |