Skip to main content

Processor Guidelines

The following page details guidelines and best practices for creating a Processor for use in Fluency Platform.

Anatomy of a Processor

Main Function

Each processor (and FPL script in general) require a main() function. For the use-case in Platform, the arguments to this main function are normally de-structured into obj and size objects.

Requirements:

  • inclusion of the following line, to set the @type key to "event"
obj["@type"] = "event"
  • a return value of either pass, abort, drop or error

    • ether as a string, or
    • as an object with a status key
  • the @parser key is nice-to-have, and helps with idenfication of the processing path

Example: simple passthrough

// Description:
// Default system event passthrough

// Data input format: ({ obj, size }) or ( doc )
function main({obj, size}) {
obj["@type"] = "event"
obj["@parser"] = "fpl"
return {"status":"pass"}
}

Main function sections

The main function can be logically divided into sections for better organization.

Filtering, selection and Message / Tag extraction

This sections peforms initial filtering of the input (event record), and determines if the input should A) proceed through this parser (Pipe), or B) exit this parser, and be sent to the next Pipe, or Event Sink or C) be dropped and removed from processing completely.

Normally, message / tag extraction also takes place here, as these values are normally used to filter / select the event records.

Example:

function main({obj, size}) {
...
let msg = obj["@message"]
if (!msg){
return {"status":"abort"}
}

if (!startsWith(msg, "some-pattern")){
return {"status":"abort"}
}

let tags = obj["@tags"]
if (!tags) {
return "abort"
}
if (!tags.Some( (_, tag) => startsWith(tag, "some-pattern" ))) {
return "abort"
}
...
}

To signify exit to the next Pipe, we can use the abort return value:

    return {"status":"abort"} // or "abort"
Output fields settings

Once the event passes selection, this portion of the code sets the required fields for the output. Note that the de-structired obj from the input is being modified, and will become the de-facto output.

Example:

function main({obj, size}) {
...
obj["@type"] = "event"
obj["@parser"] = "fpl-ProductNameType"
obj["@parserVersion"] = "20231105-1"
obj["@event_type"]="system_productName" // system product name
obj["@eventType"]="ProductName" // descriptive product / event name

...
// parse event
...

// this field name (added after parsing, must match the system_productName)
// obj["@system_productName"] = f
...
}
Event Parsing

The main body of the processor concerns code related to event parsing. Normally, this code is heavy in string or object manipulation functions. The end goal of this section is to convert the message string into a JSON object.

Example:

function main({obj, size}) {
...

let m = mergeTagWithMessage(obj)
let f = decoder_MixedKeyValue(m)

// or

if (tags.Some( (_, tag) => content(tag, "Firewall" ))) {
selected = true
f = parseFirewallLogs(msg)
}

...
}

function parseFirewallLogs(msg) {
...
let start = indexOf(msg, " ")
let m = subString(msg, start+1, len(msg))
for k, v = range resultMap {
let key = toLower(k)
...
}
...
}

Some functions, such as decoder_MixedKeyValue(), are built-in. Others are user-defined as needed.

Further Adjustments

In some case, further modifications to the parsed object is needed. We can organize these modifications into a separate function.

Example:

function main({obj, size}) {
...
let f = decoder_CEF(msg)
...
dataFieldAdjustments(f)
...
}

function dataFieldAdjustments(doc) {
...
let dur = (doc["duration"] ? parseInt(doc["duration"]) : 0)
doc["dur"] = dur
doc["protocol"] = parseProto(doc["proto"])
...
}

Metrics generation

An important feature of the Platform is the ability to collect and present metrics based on the processed data.

See the Metrics section, under "Additional functions" below.

Example:

function main({obj, size}) {
...
// device name
let deviceName = (f["devname"] ? f["devname"] : "unknown")
recordDeviceMetrics(obj, size, deviceName)
...
}

function recordDeviceMetrics(obj, size, deviceName) {
...
...
}
Advanced: Metaflow normalization

If the input data record is from a Network device, it is possible to normalize this data in a general metaflow object. In Fluency, this data is also sent to the Fusion Engine for correlation, hence the function name generateFusionEvent().

See the "Additional functions" section below.

Example:

function main({obj, size}) {
...
obj["@metaflow"] = generateFusionEvent(f, obj["@timestamp"])
...
}

function generateFusionEvent(f,ts) {
...
...
}

Additional Functions

Additional functions can be added as need in processing.

Metrics: recordDeviceMetrics()

Device Metrics, or metrics related to the operation of the Platform ingress, can be generated during Processing. At this point in the processing pipeline, the event object is in memory, and is in the best position to be evaluated for all types of metrics.

Example:

function main({obj, size}) {
...
// device name
let deviceName = (f["devname"] ? f["devname"] : "unknown")
recordDeviceMetrics(obj, size, deviceName)
...
}

function recordDeviceMetrics(obj, size, deviceName) {
let sender = obj["@sender"]
let source = obj["@source"]

let deviceEntry = Fluency_Device_LookupName(deviceName)
if (!deviceEntry) {
deviceEntry = {
name:deviceName,
ips: [sender],
group:"FPL-detect: Product Name / Data type",
device: {
name:"ProductName",
category:"ProductType"
}
}
Fluency_Device_Add(deviceEntry)
}
let dimensions = {
namespace:"fluency",
app:"import",
eventType:"ProductName" // descriptive product / event name
syslogSender:sender,
// syslogDevice:deviceEntry.name,
customer: "default",
importSource: deviceEntry.name,
deviceType: deviceEntry.device.name
}
if (deviceEntry.group) {
dimensions.group = deviceEntry.group
}
Platform_Metric_Counter("fluency_import_count", dimensions,1)
Platform_Metric_Counter("fluency_import_bytes", dimensions,size)
}

Note: Fluency provides many built-in functions for metric generation. In most cases, the recordDeviceMetrics() function shown above can be used with slight modifications.

Metaflow: generateFusionEvent()

In network traffic analysis, a basic requirement to generate a Metaflow record is the presence of five pieces of information, a 5-tuple. These are:

  • Source IP address (sip)
  • Destination IP address (dip)
  • Source port (sp)
  • Destination port (dp)
  • Network protocol (prot)

Together, these values define a specific communication record between two machines on a network.

Additional information, such as timestamp (ts), duration (dur) of the connection, bandwidth (rxB, txB, rxP, txP), and other meta fields can be added to enhance the record on top of this basic group.

When given data from a network device (such as a firewall or router), it is often useful to normalize te data in this context.

In Fluency we can create such a normalized data object, and place it under the @metaflow field, via the following line of code:

    obj["@metaflow"] = generateFusionEvent(f, obj["@timestamp"])

Like the main() function, this function has several parts.