WARNING: Version 1.3 of Packetbeat has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Protocol Modules
editProtocol Modules
editPacketbeat’s source code is split up in Go packages or modules. The protocol
modules can be found in individual folders in the beats/packetbeat/protos
directory
in the Beats GitHub repository.
Before starting, we recommend reading through the source code of some of the existing modules. For TCP based protocols, the MySQL or HTTP ones are good models to follow. For UDP protocols, you can look at the DNS module.
All protocol modules implement the TcpProtocolPlugin
or the
UdpProtocolPlugin
(or both) from the following listing (found in
beats/packetbeat/protos/protos.go
).
// Functions to be exported by a protocol plugin type ProtocolPlugin interface { // Called to initialize the Plugin Init(test_mode bool, results publish.Transactions) error // Called to return the configured ports GetPorts() []int } type TcpProtocolPlugin interface { ProtocolPlugin // Called when TCP payload data is available for parsing. Parse(pkt *Packet, tcptuple *common.TcpTuple, dir uint8, private ProtocolData) ProtocolData // Called when the FIN flag is seen in the TCP stream. ReceivedFin(tcptuple *common.TcpTuple, dir uint8, private ProtocolData) ProtocolData // Called when a packets are missing from the tcp // stream. GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int, private ProtocolData) (priv ProtocolData, drop bool) // ConnectionTimeout returns the per stream connection timeout. // Return <=0 to set default tcp module transaction timeout. ConnectionTimeout() time.Duration } type UdpProtocolPlugin interface { ProtocolPlugin // ParseUdp is invoked when UDP payload data is available for parsing. ParseUdp(pkt *Packet) }
At the high level, the protocols plugins receive raw packet data via the
Parse()
or ParseUdp()
methods and produce objects that will be indexed into
Elasticsearch.
The Parse()
and ParseUdp()
methods are called for every packet that is
sniffed and is using one of the ports of the protocol as defined by the
GetPorts()
function. They receive the packet in a Packet object, which looks
like this:
The timestamp of the packet |
|
The source IP address, source port, destination IP address, destination port combination. |
|
The application layer payload (that is, without the IP and TCP/UDP headers) as a byte slice. |
The objects are sent using the publisher.Client
interface defined in the
beats/libbeat/publisher
directory in the Beats GitHub repository.
Besides the Parse()
function, the TCP layer also calls the ReceivedFin()
function
when a TCP stream is closed, and it calls the GapInStream()
function when packet
loss is detected in a TCP stream. The protocol module can use these callbacks to make
decisions about what to do with partial data that it receives. For example, for the
HTTP/1.0 protocol, the end of connection is used to know when the message is
finished.
Registering Your Plugin
editTo configure your plugin, you need to add a configuration struct to the
Protocols struct in config/config.go
. This struct will be filled by
goyaml on startup.
type Protocols struct { Icmp Icmp Dns Dns Http Http Memcache Memcache Mysql Mysql Mongodb Mongodb Pgsql Pgsql Redis Redis Thrift Thrift }
Next create an ID for the new plugin in protos/protos.go
:
// Protocol constants. const ( UnknownProtocol Protocol = iota HttpProtocol MysqlProtocol RedisProtocol PgsqlProtocol ThriftProtocol MongodbProtocol DnsProtocol MemcacheProtocol ) // Protocol names var ProtocolNames = []string{ "unknown", "http", "mysql", "redis", "pgsql", "thrift", "mongodb", "dns", "memcache", }
The protocol names must be in the same order as their corresponding protocol IDs. Additionally the protocol name must match the configuration name.
Finally register your new protocol plugin in packetbeat.go
EnabledProtocolPlugins:
var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[protos.Protocol]protos.ProtocolPlugin{ protos.HttpProtocol: new(http.Http), protos.MemcacheProtocol: new(memcache.Memcache), protos.MysqlProtocol: new(mysql.Mysql), protos.PgsqlProtocol: new(pgsql.Pgsql), protos.RedisProtocol: new(redis.Redis), protos.ThriftProtocol: new(thrift.Thrift), protos.MongodbProtocol: new(mongodb.Mongodb), protos.DnsProtocol: new(dns.Dns), }
Once the module is registered, it can be configured, and packets will be processed.
Before implementing all the logic for your new protocol module, it can be helpful to first register the module and implement the minimal plugin interface for printing a debug message on received packets. This way you can test the plugin registration to ensure that it’s working correctly.
The TCP Parse Function
editFor TCP protocols, the Parse()
function is the heart of the module. As
mentioned earlier, this function is called for every TCP packet
that contains data on the configured ports.
It is important to understand that because TCP is a stream-based protocol, the packet boundaries don’t necessarily match the application layer message boundaries. For example, a packet can contain only a part of the message, it can contain a complete message, or it can contain multiple messages.
If you see a packet in the middle of the stream, you have no guaranties that its first byte is the beginning of a message. However, if the packet is the first seen in a given TCP stream, then you can assume it is the beginning of the message.
The Parse()
function needs to deal with these facts, which generally means that it
needs to keep state across multiple packets.
Let’s have a look again at its signature:
func Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData
We’ve already talked about the first parameter, which contains the packet data. The rest of the parameters and the return value are used for maintaining state inside the TCP stream.
The tcptuple
is a unique identifier for the TCP stream that the packet
is part of. You can use the tcptuple.Hashable()
function to get a value that
you can store in a map. The dir
flag gives you the direction in which the
packet is flowing inside the TCP stream. The two possible values are
TcpDirectionOriginal
if the packet goes in the same direction as the first
packet from the stream and TcpDirectionReverse
if the packet goes in
the other direction.
The private
parameter can be used by the module to store state in the TCP stream.
The module would typically cast this at run time to a
type of its choice, modify it as needed, and then return the modified value.
The next time the TCP layer calls Parse()
or another function from the
TcpProtocolPlugin
interface, it will call the function with the modified
private value.
Here is an example of how the MySQL module handles the private data:
priv := mysqlPrivateData{} if private != nil { var ok bool priv, ok = private.(mysqlPrivateData) if !ok { priv = mysqlPrivateData{} } } [ ... ] return priv
Most modules then use a logic similar to the following to deal with incomplete data (this example is also from MySQL):
ok, complete := mysqlMessageParser(priv.Data[dir]) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it priv.Data[dir] = nil logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.") return priv } if complete { mysql.messageComplete(tcptuple, dir, stream) } else { // wait for more data break }
The mysqlMessageParser()
is the function that tries to parse a single MySQL
message. Its implementation is MySQL-specific, so it’s not interesting to us for this
guide. It returns two values: ok
, which is false
if there was a parsing error
from which we cannot recover, and complete
, which indicates whether a complete
and valid message was separated from the stream. These two values are used for
deciding what to do next. In case of errors, we drop the stream. If there are no
errors, but the message is not yet complete, we do nothing and wait for more
data. Finally, if the message is complete, we go to the next level.
This block of code is called in a loop so that it can separate multiple messages found in the same packet.
The UDP ParseUdp Function
editIf the protocol you are working on is running on top of UDP, then all the complexities that TCP parser/decoders need to deal with around extracting messages from packets are no longer relevant.
For an example, see the ParseUdp()
function from the DNS module.
Correlation
editMost protocols that Packetbeat supports today are request-response oriented. Packetbeat indexes into Elasticsearch a document for each request-response pair (called a transaction). This way we can have data from the request and the response in the same document and measure the response time.
But this can be different for your protocol. For example for an asynchronous protocol like AMPQ, it makes more sense to index a document for every message, and then no correlation is necessary. On the other hand, for a session-based protocol like SIP, it might make sense to index a document for a SIP transaction or for a full SIP dialog, which can have more than two messages.
The TCP stream or UDP ports are usually good indicators that two messages belong
to the same transactions. Therefore most protocol implementations in
Packetbeat use a map with tcptuple
maps for correlating the requests with the
responses. One thing you should be careful about is to expire and remove from
this map incomplete transactions. For example, we might see the request that has
created an entry in the map, but if we never see the reply, we need to remove
the request from memory on a timer, otherwise we risk leaking memory.
Sending the Result
editAfter the correlation step, you should have an JSON-like object that can be sent
to Elasticsearch for indexing. You send the object by publishing it
through the publisher client interface, which is received by the Init
function. The publisher client accepts structures of type common.MapStr
, which
is essentially a map[string]interface{}
with a few more convenience methods
added (see the beats/libbeat/common
package in the Beats GitHub repository).
As an example, here is the relevant code from the Redis module:
event := common.MapStr{ "@timestamp": common.Time(requ.Ts), "type": "redis", "status": error, "responsetime": responseTime, "redis": returnValue, "method": common.NetString(bytes.ToUpper(requ.Method)), "resource": requ.Path, "query": requ.Message, "bytes_in": uint64(requ.Size), "bytes_out": uint64(resp.Size), "src": src, "dst": dst, } if redis.SendRequest { event["request"] = requ.Message } if redis.SendResponse { event["response"] = resp.Message } return event
The following fields are required and their presence will be checked by system tests:
-
@timestamp
. Set this to the timestamp of the first packet from the message and cast it tocommon.Time
like in the example. -
type
. Set this to the protocol name. -
status
. The status of the transactions. Use eithercommon.OK_STATUS
orcommon.ERROR_STATUS
. If the protocol doesn’t have responses or a meaning of status code, use OK. -
resource
. This should represent what is requested, with the exact meaning depending on the protocol. For HTTP, this is the URL. For SQL databases, this is the table name. For key-value stores, this is the key. If nothing seems to make sense to put in this field, use the empty string.
Helpers
editParsing Helpers
editIn libbeat you also find some helpers for implementing parsers for binary and
text-based protocols. The Bytes_*
functions are the most low-level helpers
for binary protocols that use network byte order. These functions can be found in the
beats/libbeat/common
module in the Beats GitHub repository.
In addition to these very low-level helpers, a stream
buffer for parsing TCP-based streams, or simply UDP packets with integrated
error handling, is provided by beats/libbeat/common/streambuf
. The following example
demonstrates using the stream buffer for parsing the Memcache protocol UDP header:
func parseUdpHeader(buf *streambuf.Buffer) (mcUdpHeader, error) { var h mcUdpHeader h.requestId, _ = buf.ReadNetUint16() h.seqNumber, _ = buf.ReadNetUint16() h.numDatagrams, _ = buf.ReadNetUint16() buf.Advance(2) // ignore reserved return h, buf.Err() }
The stream buffer is also used to implement the binary and text-based protocols for memcache.
header := buf.Snapshot() buf.Advance(memcacheHeaderSize) msg := parser.message if msg.IsRequest { msg.vbucket, _ = header.ReadNetUint16At(6) } else { msg.status, _ = header.ReadNetUint16At(6) } cas, _ := header.ReadNetUint64At(16) if cas != 0 { setCasUnique(msg, cas) } msg.opaque, _ = header.ReadNetUint32At(12) // check message length extraLen, _ := header.ReadNetUint8At(4) keyLen, _ := header.ReadNetUint16At(2) totalLen, _ := header.ReadNetUint32At(8) [...] if extraLen > 0 { tmp, _ := buf.Collect(int(extraLen)) extras := streambuf.NewFixed(tmp) var err error if msg.IsRequest && requestArgs != nil { err = parseBinaryArgs(parser, requestArgs, header, extras) } else if responseArgs != nil { err = parseBinaryArgs(parser, responseArgs, header, extras) } if err != nil { msg.AddNotes(err.Error()) } } if keyLen > 0 { key, _ := buf.Collect(int(keyLen)) keys := []memcacheString{memcacheString{key}} msg.keys = keys } if valueLen == 0 { return parser.yield(buf.BufferConsumed()) }
The stream buffer also implements a number of interfaces defined in the standard "io" package
and can easily be used to serialize some packets for testing parsers (see
beats/packetbeat/protos/memcache/binary_test.go
).
Module Helpers
editPacketbeat provides the module beats/packetbeat/protos/applayer
with
common definitions among all application layer protocols. For example using the
Transaction type from applayer
guarantees that the final document will have all common required fields defined. Just embed the applayer.Transaction
with your own
application layer transaction type to make use of it. Here is an example from the memcache protocol:
type transaction struct { applayer.Transaction command *commandType request *message response *message } func (t *transaction) Event(event common.MapStr) error { // use applayer.Transaction to write common required fields if err := t.Transaction.Event(event); err != nil { logp.Warn("error filling generic transaction fields: %v", err) return err } mc := common.MapStr{} event["memcache"] = mc [...] return nil }
Use applayer.Message
in conjunction with applayer.Transaction
for creating the
transaction and applayer.Stream
to manage your stream buffers for parsing.