-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmanager.go
161 lines (130 loc) · 3.26 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package zenfo
import (
"database/sql"
"fmt"
"log"
"sync"
_ "github.com/lib/pq" // postgres driver
)
// Manager handles running all workers
type Manager struct {
workers []Worker
db *sql.DB
log chan string
DryRun bool
DeleteRows bool
}
// NewManager builds internal worker map and returns new Mananger object
func NewManager(dbName, dbUser string) (*Manager, error) {
m := new(Manager)
// TODO It'd be nice to keep worker code inside a subdir, e.g. workers/
// Currently it creates circular imports
m.workers = []Worker{
&Aczc{},
&Sfzc{},
&Jikoji{},
}
// TODO probably abstract all the DB logic to its own class
if !m.DryRun {
db, err := sql.Open("postgres", fmt.Sprintf("user=%s dbname=%s sslmode=disable", dbUser, dbName))
if err != nil {
return nil, err
}
m.db = db
}
return m, nil
}
// Run iterates all workers for execution
func (m *Manager) Run() error {
// http client
client := NewClient()
var (
venueSelect *sql.Stmt
venueInsert *sql.Stmt
eventInsert *sql.Stmt
)
if !m.DryRun {
// TODO Insert or update to maintain event IDs
if m.DeleteRows {
r, err := m.db.Exec("DELETE FROM events")
if err != nil {
return err
}
count, err := r.RowsAffected()
if err != nil {
return err
}
log.Printf("Deleted %d rows!\n", count)
}
var err error
venueSelect, err = m.db.Prepare(`SELECT id FROM venues WHERE name=$1`)
if err != nil {
return err
}
venueInsert, err = m.db.Prepare(`INSERT INTO venues (name, addr, geo, website, phone, email) VALUES ($1, $2, point($3, $4), $5, $6, $7) RETURNING id`)
if err != nil {
return err
}
eventInsert, err = m.db.Prepare("INSERT INTO events (venue_id, name, blurb, description, start_date, end_date, url) VALUES ($1, $2, $3, $4, $5, $6, $7)")
if err != nil {
return err
}
}
var wg sync.WaitGroup
for _, worker := range m.workers {
wg.Add(2)
w := worker
out := make(chan string)
errs := make(chan error)
// Handle errors
go func() {
if err := <-errs; err != nil {
log.Fatalf("[%s] -> FATAL: %s\n", w.Name(), err)
}
// Wait for any error before marking as done
wg.Done()
}()
// Print worker log to screen
go func() {
for msg := range out {
log.Printf("[%s] -> %s\n", w.Name(), msg)
}
// Also wait for log output before marking done
wg.Done()
}()
go func() {
defer close(out)
defer close(errs)
if err := w.Init(client, out); err != nil {
errs <- err
}
log.Printf("Running worker: %s - %s\n", w.Name(), w.Desc())
events, err := w.Events()
if err != nil {
errs <- err
}
for _, e := range events {
//log.Printf("event=%+v\n", e)
venue := e.Venue
var venueID int
// Create any new venues found from events
if !m.DryRun {
err := venueSelect.QueryRow(venue.Name).Scan(&venueID)
if err == sql.ErrNoRows {
if err := venueInsert.QueryRow(venue.Name, venue.Addr, venue.Lat, venue.Lng, venue.Website, venue.Phone, venue.Email).Scan(&venueID); err != nil {
errs <- err
}
} else if err != nil {
errs <- err
}
// Store event
if _, err := eventInsert.Exec(venueID, e.Name, e.Blurb, e.Desc, e.Start, e.End, e.URL); err != nil {
errs <- err
}
}
}
}()
}
wg.Wait()
return nil
}