-
Notifications
You must be signed in to change notification settings - Fork 362
/
Copy pathimport.go
332 lines (288 loc) · 10.1 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package commands
import (
"errors"
"fmt"
"strings"
"github.com/activecm/rita-legacy/config"
"github.com/activecm/rita-legacy/parser"
"github.com/activecm/rita-legacy/pkg/remover"
"github.com/activecm/rita-legacy/resources"
"github.com/activecm/rita-legacy/util"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
func init() {
importCommand := cli.Command{
Name: "import",
Usage: "Import zeek logs into a target database",
UsageText: "rita import [command options] <import directory|file> [<import directory|file>...] <database name>\n\n" +
"Logs directly in <import directory> will be imported into a database" +
" named <database name>.",
Flags: []cli.Flag{
ConfigFlag,
threadFlag,
deleteFlag,
rollingFlag,
totalChunksFlag,
currentChunkFlag,
},
Action: func(c *cli.Context) error {
importer := NewImporter(c)
err := importer.run()
fmt.Println(updateCheck(getConfigFilePath(c)))
return err
},
}
bootstrapCommands(importCommand)
}
type (
//Importer ...
Importer struct {
res *resources.Resources
configFile string
args cli.Args
importFiles []string
targetDatabase string
deleteOldData bool
userRolling bool
userTotalChunks int
userCurrChunk int
threads int
}
)
// NewImporter ....
func NewImporter(c *cli.Context) *Importer {
return &Importer{
configFile: getConfigFilePath(c),
args: c.Args(),
deleteOldData: c.Bool("delete"),
userRolling: c.Bool("rolling"),
userTotalChunks: c.Int("numchunks"),
userCurrChunk: c.Int("chunk"),
threads: util.Max(c.Int("threads")/2, 1),
}
}
// parseArgs handles parsing the positional import arguments
func (i *Importer) parseArgs() error {
if len(i.args) < 2 {
return cli.NewExitError("\n\t[!] Both <files/directory to import> and <database name> are required.", -1)
}
i.targetDatabase = i.args[len(i.args)-1] // the last argument
i.importFiles = i.args[:len(i.args)-1] // all except the last argument
//check if one argument is set but not the other
if i.importFiles[0] == "" || i.targetDatabase == "" {
return cli.NewExitError("\n\t[!] Both <files/directory to import> and <database name> are required.", -1)
}
// check if import directory is okay to read from
err := checkFilesExist(i.importFiles)
if err != nil {
return err
}
err = i.checkForInvalidDBChars(i.targetDatabase)
if err != nil {
return cli.NewExitError(err.Error(), -1)
}
return nil
}
func checkFilesExist(files []string) error {
for _, file := range files {
if !util.Exists(file) {
return cli.NewExitError(fmt.Errorf("\n\t[!] %v cannot be found", file), -1)
}
}
return nil
}
// parseFlags validates the user supplied flags against the current state of
// the target database and determines what settings should be set in the
// rolling configuration
func parseFlags(dbExists bool, dbIsRolling bool, dbCurrChunk int, dbTotalChunks int,
userIsRolling bool, userCurrChunk int, userTotalChunks int, cfgDefaultChunks int,
deleteOldData bool) (config.RollingStaticCfg, error) {
cfg := config.RollingStaticCfg{}
// a user-provided value for either of the chunk options implies rolling
if userTotalChunks != -1 || userCurrChunk != -1 {
userIsRolling = true
}
// ensure the user specifies a rolling import if the database exists
// and is not a rolling database.
if !deleteOldData && (dbExists && !dbIsRolling) && !userIsRolling {
return cfg, errors.New(
"\t[!] New data cannot be imported into a non-rolling database. " +
"Run with --rolling to convert this database into a rolling database",
)
}
// set cfg.TotalChunks
if userTotalChunks != -1 { // user gave the total number of chunks via command line
// it's currently an error to try to reduce the total number of chunks in an existing rolling database
if dbExists && dbIsRolling && userTotalChunks < dbTotalChunks {
return cfg, fmt.Errorf(
"\t[!] Cannot modify the total number of chunks in an existing database [ %d ]",
dbTotalChunks,
)
}
// use the user-provided value
cfg.TotalChunks = userTotalChunks
} else { // user didn't specify the total number of chunks
if !dbExists && !userIsRolling {
// if the database doesn't exist and wasn't specified to be rolling
// then assume only one chunk
cfg.TotalChunks = 1
} else if deleteOldData && dbExists && !dbIsRolling && !userIsRolling {
// if the user is re-importing in to a non-rolling database
// then assume only one chunk
cfg.TotalChunks = 1
} else if dbExists && dbIsRolling {
// if the database is already rolling use the existing value
cfg.TotalChunks = dbTotalChunks
} else {
// otherwise we're converting a non-rolling database or creating a new rolling database
// and the user didn't specify so use the default value
cfg.TotalChunks = cfgDefaultChunks
}
}
//set cfg.CurrentChunk
if userCurrChunk != -1 { // user gave the current chunk via command line
// use the user-provided value
cfg.CurrentChunk = userCurrChunk
} else { // user didn't specify the current chunk
if !dbExists {
// if the database doesn't exist then assume this is the first chunk
cfg.CurrentChunk = 0
} else if deleteOldData && dbExists && !dbIsRolling {
// if the user wants to re-import into a non-rolling database
// then assume we want to replace the first chunk
cfg.CurrentChunk = 0
} else if deleteOldData && dbIsRolling {
// replace the last chunk if the user specified --delete but not --chunk
cfg.CurrentChunk = dbCurrChunk
} else {
// otherwise increment tne current value, wrapping back to 0 when needed
cfg.CurrentChunk = (dbCurrChunk + 1) % cfg.TotalChunks
}
}
cfg.Rolling = dbIsRolling || userIsRolling
// preserve the default chunks setting (even though we don't use it after this currently)
cfg.DefaultChunks = cfgDefaultChunks
// validate current chunk number
if cfg.CurrentChunk < 0 ||
cfg.CurrentChunk >= cfg.TotalChunks {
return cfg, fmt.Errorf(
"\t[!] Current chunk number [ %d ] must be 0 or greater and less than the total number of chunks [ %d ]",
cfg.CurrentChunk,
cfg.TotalChunks,
)
}
return cfg, nil
}
// run runs the importer
func (i *Importer) run() error {
// verify command line arguments
err := i.parseArgs()
if err != nil {
return err
}
i.res = resources.InitResources(i.configFile)
// set up target database
i.res.DB.SelectDB(i.targetDatabase)
// set up the rolling configuration
// grab the current rolling settings from the MetaDB
exists, isRolling, currChunk, totalChunks, err := i.res.MetaDB.GetRollingSettings(i.targetDatabase)
if err != nil {
return cli.NewExitError(fmt.Errorf("\n\t[!] Error while reading existing database settings: %v", err.Error()), -1)
}
// validate the user given flags against the rolling settings from the MetaDB
// and determine the rolling configuration
rollingCfg, err := parseFlags(
exists, isRolling, currChunk, totalChunks,
i.userRolling, i.userCurrChunk, i.userTotalChunks, i.res.Config.S.Rolling.DefaultChunks,
i.deleteOldData,
)
if err != nil {
return cli.NewExitError(err.Error(), -1)
}
i.res.Config.S.Rolling = rollingCfg
importer, err := parser.NewFSImporter(i.res)
if len(importer.GetInternalSubnets()) == 0 {
return cli.NewExitError("Internal subnets are not defined. Please set the InternalSubnets section of the config file.", -1)
}
if err != nil {
return cli.NewExitError(fmt.Errorf("error creating new file system importer: %v", err.Error()), -1)
}
indexedFiles := importer.CollectFileDetails(i.importFiles, i.threads)
// if no compatible files for import were found, exit
if len(indexedFiles) == 0 {
return cli.NewExitError("No compatible log files found", -1)
}
if i.deleteOldData {
err := i.handleDeleteOldData()
if err != nil {
return cli.NewExitError(fmt.Errorf("error deleting old data: %v", err.Error()), -1)
}
}
i.res.Log.Infof("Importing %v\n", i.importFiles)
fmt.Printf("\n\t[+] Importing %v:\n", i.importFiles)
// about to import into and convert an existing, non-rolling database
if exists && !isRolling && rollingCfg.Rolling {
i.res.Log.Infof("Non-rolling database %v will be converted to rolling\n", i.targetDatabase)
fmt.Printf("\t[+] Non-rolling database %v will be converted to rolling\n", i.targetDatabase)
}
/*
// Uncomment these lines to enable CPU profiling
f, err := os.Create("./cpu.pprof")
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
defer f.Close() // error handling omitted for example
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
*/
importer.Run(indexedFiles, i.threads)
i.res.Log.Infof("Finished importing %v\n", i.importFiles)
return nil
}
func (i *Importer) handleDeleteOldData() error {
if !i.res.Config.S.Rolling.Rolling {
fmt.Printf("\t[+] Removing database: %s\n", i.targetDatabase)
err := deleteSingleDatabase(i.res, i.targetDatabase, false)
if err != nil {
i.res.Log.WithFields(log.Fields{
"database": i.targetDatabase,
"err": err.Error(),
}).Warn("Failed to remove database before import")
// Don't stop execution if the old database doesn't exist.
if err.Error() == "No records for database found" {
fmt.Printf("\t[-] %s\n", err.Error())
} else {
return err
}
}
return nil
}
// Remove the analysis results for the chunk
targetChunk := i.res.Config.S.Rolling.CurrentChunk
removerRepo := remover.NewMongoRemover(i.res.DB, i.res.Config, i.res.Log)
err := removerRepo.Remove(targetChunk)
if err != nil {
return err
}
err = i.res.MetaDB.SetChunk(targetChunk, i.targetDatabase, false)
if err != nil {
return err
}
// Remove the file records so they get imported again
err = i.res.MetaDB.RemoveFilesByChunk(i.targetDatabase, targetChunk)
if err != nil {
return err
}
return nil
}
// validates target db name
func (i *Importer) checkForInvalidDBChars(db string) error {
invalidChars := "/\\.,*<>:|?$#"
if strings.ContainsAny(db, invalidChars) {
return fmt.Errorf("\n\t[!] database cannot contain the characters < /, \\, ., \", *, <, >, :, |, ?, $ > as well as spaces or the null character")
}
return nil
}