Skip to main content

Fluency Platform APIs

The following internal utilities are provided. These are used interact with internal built-in Platform functions.

Platform_Metric_Counter

  • Platform_Metric_Counter(name, labels, increment)
    • Write a Counter metric with name to Prometheus database with the value labels
    • if metric counter does not exist in the database, a new metric will be created with increment
    • otherwise, increment will be added to the current counter value of the metric
let customer = obj["@customer"]
let dimensions = {
namespace:"fluency",
app:"import",
eventType:"Office365",
customer: customer
}
Platform_Metric_Counter("fluency_import_count", dimensions,1)
Platform_Metric_Counter("fluency_import_bytes", dimensions,size)

Platform_Metric_QueryBuild

  • Platform_Metric_QueryBuild(options)
    • build a promQL query
    • options: {metric, select, duration, stat, groupBy, aggregate, sort, limit}
    • for the select option, the select labels are:
      • =: Select labels that are exactly equal to the provided string.
      • !=: Select labels that are not equal to the provided string.
      • =~: Select labels that regex-match the provided string.
      • !~: Select labels that do not regex-match the provided string.

Platform_Metric_Query

  • Platform_Metric_Query(query, time)
    • return a fpl table

Platform_Metric_QueryRange

  • Platform_Metric_QueryRange(query, from, to, step)
    • return a fpl stream
function main() {
// let query = `sum by(component) (increase(platform_component_bytes[5m]))`

let query = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component",
sort: "topk",
limit: 3
})
let table = Platform_Metric_Query(query, "@h")

// let keys = []
let keys = table.Map((row) => {
return row.component
})

let select = sprintf(`component=~"%s"`, keys.Join("|"))

printf("%s",select)


let query2 = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
select: select,
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component"
})

let stream = Platform_Metric_QueryRange(query2, "-24h@h", "@h", "1h")
//return {table}
//let query = `sum by(eventType) (increase(fluency_import_bytes[1h]))`
//let table = Platform_Metric_Query(query, "@h")
//let stream = Platform_Metric_QueryRange(query, "-48h@h", "@h", "1h")
return {table, stream}
}

Platform_Metric_Sort

  • Platform_Metric_Sort({metric, select, groupBy, from, to, sort, limit})
    • return top/bottom N rows
    • metric: metric name (must be a counter type)
    • select: metric label select
    • groupBy: groupBy field(s), string or list of strings
    • from/to: time range in relative or absolute time format
    • sort: "topk" or "bottomk"
    • limit: number of rows

Platform_Metric_Sort_Histogram

  • Platform_Metric_Sort_Histogram({metric, select, groupBy, from, to, interval, sort, limit})
    • return top/bottom N metrics
    • metric: metric name (must be a counter type)
    • select: metric label select
    • groupBy: groupBy field(s), string or list of strings
    • from/to: time range in relative or absolute time format
    • sort: "topk" or "bottomk"
    • limit: number of rows
    • interval: histogram interval "1h", "1d", "1w", "1m"
function main({from="-24h@h", to="@h"}) {
let groupBy="importSource"
let options = {
metric: "fluency_import_bytes",
from: from,
to: to,
groupBy: groupBy,
sort: "topk",
limit: 10
}
// promQL: topk(10, sum by (importSource) (increase(fluency_import_bytes[24h])))
let table = Platform_Metric_Sort(options)

options.interval= "1h"
// promQL: (sum by (importSource) (increase(fluency_import_bytes{importSource="foo" or importSource="bar"}[1h]))) [24h:1h]
let histogram = Platform_Metric_Sort_Histogram(options)

return {table, histogram}
}

Platform_Metric_Alert_Counter_Stop

  • Platform_Metric_Alert_Counter_Stop(options)
    • alert if counter stop increasing for some time
    • options: {metric, select, groupBy, window, refWindow, interval, recordWindow}
    • metric: metric name (must be a counter type)
    • select: metric label select
    • groupBy: groupBy field(s), string or list of strings
    • duration: detection thresold. default is "10m"
    • lookback: lookback offset. default is "1h"
    • interval: polling interval. default is "1m"
    • history: alert record duration, default is "1h"
    • if no alert found, return undefined.
    • else return alerts.
  let options = {
metric: `platform_component_total`,
groupBy: "id",
duration: "10m",
lookback: "1h",
interval: "1m",
history: "1h"
}
let alerts = Platform_Metric_Alert_Counter_Stop(options)
if alerts {
alerts.Emit("Component_Stop", "component stopped for 10 minutes", "warn", 3600)
}

Platform_LoadComponent

  • Platform_LoadComponent()
    • return all components (datasource, datasink, router and pipe)
  // create a key value map for component id => name translation
let idMap = {}
let components = Platform_LoadComponent()
components.Each( (_, c) => {
idMap[c.id] = c.name
})

Platform_Platform_Cache_Check

  • Platform_Cache_Check(cacheName)
    • return true if cache exists
let flag = Platform_Cache_Check("cache1")
printf("flag %v", flag)
let ok = Platform_Cache_Register("cache1", {expire: 3600})
printf("ok %v", ok)
if ok {
Platform_Cache_Set("cache1", "foo", "bar")
Platform_Cache_SetMultiple("cache1", ["k1", "k2"], ["v1", "v2"])
}

let b = Platform_Cache_Get("cache1", "foo")
printf("b: %s", b)

Platform_Cache_Register

  • Platform_Cache_Register(cacheName, options)
    • register a cache
    • return true if success
    • return false if cache is already registered
    • options: {expire=0}
    • cache expire time in seconds, default is 0 (never expire)

Platform_Cache_Set

  • Platform_Cache_Set(cacheName, key, value)
    • Set a key value pair to cache

Platform_Cache_SetMultiple

  • Platform_Cache_SetMultiple(cacheName, keys, values)
    • Set multiple key value pairs to cache

Platform_Cache_Get

  • Platform_Cache_Get(cacheName, key)
    • get the value of key from cache and returns it
    • return undefined if key is not found

Platform_Channel(channel, eventEnvelop)

  • sends an event to a channel
  • the event will be sent to all rules in the channel
  • any runtime excpetions will be ignored

Platform_Notification_Email(options)

  • sends an email notification
  • options: {to, cc, bcc, subject, html, text}
  • to: email address or list of email addresses
  • cc: email address or list of email addresses
  • bcc: email address or list of email addresses
  • subject: email subject
  • html: email body in html format
  • text: email body in text format
   let template = `<p>Time: {{ .time }}</p><p>Alert: <b>{{.name}}</b> ({{ .description }})</p>`
let subjectTemplate = `Fluency Platform Alert: {{.name}} - {{ .action }}: {{.displayName}}`
let html = htmlTemplate(template, event)
let subject = template(subjectTemplate, event)

let options = {
to: config.to,
cc: config.cc,
subject,
html
}
Platform_Notification_Email(options)

Platform_Notification_Slack(integrationName, options)

  • sends a slack notification
  • integrationName: slack integration name
  • options: {channel, message}
  • channel: slack channel name
  • message: slack message
  let template = `
Alert: *{{.name}}*
Description: *{{.description}}*
Severity: *{{.severity}}*
Action: *{{.action}}*
Source: *{{.source}}*
`
let message = template(template, event)
let integrationName = config.integrationName
let options = {
channel: "#fluency_grid",
message
}
Platform_Notification_Slack(integrationName, options)

Platform_Sink

  • Platform_Sink(sink, eventEnvelop)
    • sends the event to the specified data sink

sleep

  • sleep(delayInMillisecond)
    • puts the component to sleep for the specified duration
sleep(1000)  // sleep for one second