summaryrefslogtreecommitdiffstats
path: root/processor_main.go
diff options
context:
space:
mode:
Diffstat (limited to 'processor_main.go')
-rw-r--r--processor_main.go174
1 files changed, 174 insertions, 0 deletions
diff --git a/processor_main.go b/processor_main.go
new file mode 100644
index 0000000..a7d1c6f
--- /dev/null
+++ b/processor_main.go
@@ -0,0 +1,174 @@
+/* processor_main - PostgreSQL-based workload engine
+ * Written by Rob Stradling
+ * Copyright (C) 2016-2017 COMODO CA Limited
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package main
+
+import (
+ "database/sql"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+ _ "github.com/lib/pq"
+)
+
+func checkErr(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+func recoverErr(context string) {
+ if r := recover(); r != nil {
+ log.Printf("ERROR: %v [%s]", r, context)
+ }
+}
+
+func doUpdateWorkItem(wi *WorkItem, update_statement *sql.Stmt) {
+ result, err := wi.Update(update_statement)
+ if err != nil {
+ log.Printf("ERROR: UPDATE failed (%v)\n", err.Error)
+ } else {
+ rows_affected, err := result.RowsAffected()
+ if err != nil {
+ log.Printf("ERROR: UPDATE failed (%v)\n", err.Error)
+ } else if rows_affected < 1 {
+ log.Println("ERROR: No rows UPDATEd")
+ }
+ }
+}
+
+func doBatchOfWork(db *sql.DB, w *Work, batch_size int, concurrent_items int) int {
+ // Fetch a batch of work to do from the DB
+ log.Println("Initializing...")
+ w.Begin(db)
+ log.Println("Preparing...")
+ select_query := w.SelectQuery(batch_size)
+ log.Printf("Executing...%s", select_query)
+ rows, err := db.Query(select_query)
+ checkErr(err)
+ defer rows.Close()
+
+ // Prepare the UPDATE statement that will be run after performing each work item
+ update_statement, err := db.Prepare(w.UpdateStatement())
+ checkErr(err)
+ defer update_statement.Close()
+
+ // Do the batch of work, throttling the number of concurrent work items
+ log.Println("Performing...")
+ var wg sync.WaitGroup
+ var chan_concurrency = make(chan int, concurrent_items)
+ var i int
+ for i = 0; rows.Next(); i++ {
+ var wi WorkItem
+ err = wi.Parse(rows)
+ checkErr(err)
+ wg.Add(1)
+ go func() {
+ defer func() {
+ wg.Done()
+ }()
+ defer doUpdateWorkItem(&wi, update_statement)
+ chan_concurrency <- 1
+ defer func() { <-chan_concurrency }()
+ defer recoverErr(wi.crl_url)
+ wi.Perform(db, w)
+ }()
+ }
+
+ // Wait for all work items to complete
+ wg.Wait()
+ w.End()
+
+ return i
+}
+
+func main() {
+ defer recoverErr("main")
+
+ // Configure signal handling
+ chan_signals := make(chan os.Signal, 20)
+ signal.Notify(chan_signals, os.Interrupt, syscall.SIGTERM)
+
+ // Parse common command line flags
+ var conn_info string
+ flag.StringVar(&conn_info, "conninfo", fmt.Sprintf("user=certwatch dbname=certwatch connect_timeout=5 sslmode=disable application_name=%s", os.Args[0][(strings.LastIndex(os.Args[0], "/") + 1):len(os.Args[0])]), "DB connection info")
+ var conn_open int
+ flag.IntVar(&conn_open, "connopen", 5, "Maximum number of open connections to the DB [0=unlimited]")
+ var conn_idle int
+ flag.IntVar(&conn_idle, "connidle", 0, "Maximum number of connections in the idle connection pool")
+ var interval time.Duration
+ flag.DurationVar(&interval, "interval", time.Second * 30, "How often to check for more work [0s=exit when no more work to do]")
+ var batch_size int
+ flag.IntVar(&batch_size, "batch", 100, "Maximum number of items per batch of work")
+ var concurrent_items int
+ flag.IntVar(&concurrent_items, "concurrent", 10, "Maximum number of items processed simultaneously")
+
+ // Parse any custom flags
+ var work Work
+ custom_flags := work.CustomFlags()
+ flag.Parse()
+ work.Init()
+
+ // Show configuration
+ log.Printf("Configuration:\n conninfo: %s\n connopen: %d\n connidle: %d\n interval: %v\n batch: %d\n concurrent: %d\n%s", conn_info, conn_open, conn_idle, interval, batch_size, concurrent_items, custom_flags)
+
+ // Connect to the database
+ log.Println("Connecting...")
+ db, err := sql.Open("postgres", conn_info)
+ checkErr(err)
+ defer db.Close()
+ db.SetMaxOpenConns(conn_open)
+ db.SetMaxIdleConns(conn_idle)
+
+ // Perform work in batches
+ next_time := time.Now()
+ keep_looping := true
+ for keep_looping {
+ // Perform one batch of work
+ items_processed := doBatchOfWork(db, &work, batch_size, concurrent_items)
+
+ // Exit if interval=0s and there's no more work to do
+ if (items_processed == 0) && (interval == 0) {
+ break
+ }
+
+ // Schedule the next batch of work
+ next_time = next_time.Add(interval)
+ if (items_processed > 0) || (next_time.Before(time.Now())) {
+ next_time = time.Now()
+ }
+
+ // Have a rest if possible. Process any pending SIGINT or SIGTERM.
+ log.Println("Resting...")
+ select {
+ case sig := <-chan_signals:
+ log.Printf("Signal received: %v\n", sig)
+ keep_looping = false
+ case <-time.After(next_time.Sub(time.Now())):
+ }
+ }
+
+ // We're done
+ log.Println("Goodbye!")
+}