Skip to content

Commit

Permalink
Merge pull request #215 from trivago/SyslogMetadata
Browse files Browse the repository at this point in the history
Syslog metadata support
  • Loading branch information
arnecls authored Dec 21, 2017
2 parents 271f166 + 9b4906d commit 01a9a43
Showing 1 changed file with 80 additions and 5 deletions.
85 changes: 80 additions & 5 deletions consumer/syslogd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package consumer

import (
"os"
"strconv"
"sync"
"time"

"github.com/trivago/gollum/core"
"github.com/trivago/tgo/tnet"
"gopkg.in/mcuadros/go-syslog.v2"
syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
"sync"
)

// Syslogd consumer plugin
Expand All @@ -32,11 +36,12 @@ import (
// - Address: Defines the IP address or UNIX socket to listen to.
// This can take one of the four forms below, to listen on a TCP, UDP
// or UNIX domain socket. However, see the "Format" option for details on
// transport support by different formats. Default: "udp://0.0.0.0:514"
// transport support by different formats.
// * [hostname|ip]:<tcp-port>
// * tcp://<hostname|ip>:<tcp-port>
// * udp://<hostname|ip>:<udp-port>
// * unix://<filesystem-path>
// By default this parameter is set to "udp://0.0.0.0:514"
//
// - Format: Defines which syslog standard the server will support.
// Three standards, listed below, are currently available. All
Expand All @@ -45,6 +50,18 @@ import (
// * RFC3164 (https://tools.ietf.org/html/rfc3164) - unix, udp
// * RFC5424 (https://tools.ietf.org/html/rfc5424) - unix, udp
// * RFC6587 (https://tools.ietf.org/html/rfc6587) - unix, upd, tcp
// By default this parameter is set to "RFC6587".
//
// - SetMetadata: When set to true, syslog based metadata will be attached to
// the message. The metadata fields added depend on the protocol version used.
// RFC3164 supports: tag, timestamp, hostname, priority, facility, severity.
// RFC5424 and RFC6587 support: app_name, version, proc_id , msg_id, timestamp,
// hostname, priority, facility, severity.
// By default this parameter is set to "false".
//
// - TimestampFormat: When using SetMetadata this string denotes the go time
// format used to convert syslog timestamps into strings.
// By default this parameter is set to "2006-01-02T15:04:05.000 MST".
//
// Examples
//
Expand All @@ -69,6 +86,8 @@ type Syslogd struct {
format format.Format // RFC3164, RFC5424 or RFC6587?
protocol string
address string
withMetadata bool `config:"SetMetadata" default:"false"`
timestampFormat string `config:"TimestampFormat" default:"2006-01-02T15:04:05.000 MST"`
}

func init() {
Expand Down Expand Up @@ -118,12 +137,55 @@ func (cons *Syslogd) Configure(conf core.PluginConfigReader) {
func (cons *Syslogd) Handle(parts format.LogParts, code int64, err error) {
content := ""
isString := false
metaData := core.Metadata{}

switch cons.format {
case syslog.RFC3164:
content, isString = parts["content"].(string)

if cons.withMetadata {
hostname, _ := parts["hostname"].(string)
tag, _ := parts["tag"].(string)
priority, _ := parts["priority"].(int)
facility, _ := parts["facility"].(int)
severity, _ := parts["severity"].(int)
timestamp, _ := parts["timestamp"].(time.Time)

metaData.SetValue("tag", []byte(tag))
metaData.SetValue("timestamp", []byte(timestamp.Format(cons.timestampFormat)))

metaData.SetValue("hostname", []byte(hostname))
metaData.SetValue("priority", []byte(strconv.Itoa(priority)))
metaData.SetValue("facility", []byte(strconv.Itoa(facility)))
metaData.SetValue("severity", []byte(strconv.Itoa(severity)))
}

case syslog.RFC5424, syslog.RFC6587:
content, isString = parts["message"].(string)

if cons.withMetadata {
hostname, _ := parts["hostname"].(string)
app, _ := parts["app_name"].(string)
version, _ := parts["version"].(string)
procID, _ := parts["proc_id"].(string)
msgID, _ := parts["msg_id"].(string)
priority, _ := parts["priority"].(int)
facility, _ := parts["facility"].(int)
severity, _ := parts["severity"].(int)
timestamp, _ := parts["timestamp"].(time.Time)

metaData.SetValue("app_name", []byte(app))
metaData.SetValue("version", []byte(version))
metaData.SetValue("proc_id", []byte(procID))
metaData.SetValue("msg_id", []byte(msgID))
metaData.SetValue("timestamp", []byte(timestamp.Format(cons.timestampFormat)))

metaData.SetValue("hostname", []byte(hostname))
metaData.SetValue("priority", []byte(strconv.Itoa(priority)))
metaData.SetValue("facility", []byte(strconv.Itoa(facility)))
metaData.SetValue("severity", []byte(strconv.Itoa(severity)))
}

default:
cons.Logger.Error("Could not determine the format to retrieve message/content")
}
Expand All @@ -133,7 +195,11 @@ func (cons *Syslogd) Handle(parts format.LogParts, code int64, err error) {
return
}

cons.Enqueue([]byte(content))
if cons.withMetadata {
cons.EnqueueWithMetadata([]byte(content), metaData)
} else {
cons.Enqueue([]byte(content))
}
}

// Consume opens a new syslog socket.
Expand All @@ -146,7 +212,16 @@ func (cons *Syslogd) Consume(workers *sync.WaitGroup) {
switch cons.protocol {
case "unix":
if err := server.ListenUnixgram(cons.address); err != nil {
cons.Logger.Error("Failed to open unix://", cons.address)
if errRemove := os.Remove(cons.address); errRemove != nil {
cons.Logger.WithError(errRemove).Error("Failed to remove exisiting socket")
} else {
cons.Logger.Warning("Found existing socket ", cons.address, ". Removing.")
err = server.ListenUnixgram(cons.address)
}

if err != nil {
cons.Logger.WithError(err).Error("Failed to open unix://", cons.address)
}
}
case "udp":
if err := server.ListenUDP(cons.address); err != nil {
Expand Down

0 comments on commit 01a9a43

Please sign in to comment.