Fluency Platform APIs
The following internal utilities are provided. These are used interact with internal built-in Platform functions.
Platform_Metric_Counter
- Platform_Metric_Counter(name, labels, increment)
- Write a Counter metric with name to Prometheus database with the value labels
- if metric counter does not exist in the database, a new metric will be created with increment
- otherwise, increment will be added to the current counter value of the metric
let customer = obj["@customer"]
let dimensions = {
namespace:"fluency",
app:"import",
eventType:"Office365",
customer: customer
}
Platform_Metric_Counter("fluency_import_count", dimensions,1)
Platform_Metric_Counter("fluency_import_bytes", dimensions,size)
Platform_Metric_QueryBuild
- Platform_Metric_QueryBuild(options)
- build a promQL query
- options: {metric, select, duration, stat, groupBy, aggregate, sort, limit}
- for the select option, the select labels are:
- =: Select labels that are exactly equal to the provided string.
- !=: Select labels that are not equal to the provided string.
- =~: Select labels that regex-match the provided string.
- !~: Select labels that do not regex-match the provided string.
Platform_Metric_Query
- Platform_Metric_Query(query, time)
- return a fpl table
Platform_Metric_QueryRange
- Platform_Metric_QueryRange(query, from, to, step)
- return a fpl stream
function main() {
// let query = `sum by(component) (increase(platform_component_bytes[5m]))`
let query = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component",
sort: "topk",
limit: 3
})
let table = Platform_Metric_Query(query, "@h")
// let keys = []
let keys = table.Map((row) => {
return row.component
})
let select = sprintf(`component=~"%s"`, keys.Join("|"))
printf("%s",select)
let query2 = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
select: select,
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component"
})
let stream = Platform_Metric_QueryRange(query2, "-24h@h", "@h", "1h")
//return {table}
//let query = `sum by(eventType) (increase(fluency_import_bytes[1h]))`
//let table = Platform_Metric_Query(query, "@h")
//let stream = Platform_Metric_QueryRange(query, "-48h@h", "@h", "1h")
return {table, stream}
}
Platform_Metric_Sort
- Platform_Metric_Sort({metric, select, groupBy, from, to, sort, limit})
- return top/bottom N rows
- metric: metric name (must be a counter type)
- select: metric label select
- groupBy: groupBy field(s), string or list of strings
- from/to: time range in relative or absolute time format
- sort: "topk" or "bottomk"
- limit: number of rows
Platform_Metric_Sort_Histogram
- Platform_Metric_Sort_Histogram({metric, select, groupBy, from, to, interval, sort, limit})
- return top/bottom N metrics
- metric: metric name (must be a counter type)
- select: metric label select
- groupBy: groupBy field(s), string or list of strings
- from/to: time range in relative or absolute time format
- sort: "topk" or "bottomk"
- limit: number of rows
- interval: histogram interval "1h", "1d", "1w", "1m"
function main({from="-24h@h", to="@h"}) {
let groupBy="importSource"
let options = {
metric: "fluency_import_bytes",
from: from,
to: to,
groupBy: groupBy,
sort: "topk",
limit: 10
}
// promQL: topk(10, sum by (importSource) (increase(fluency_import_bytes[24h])))
let table = Platform_Metric_Sort(options)
options.interval= "1h"
// promQL: (sum by (importSource) (increase(fluency_import_bytes{importSource="foo" or importSource="bar"}[1h]))) [24h:1h]
let histogram = Platform_Metric_Sort_Histogram(options)
return {table, histogram}
}
Platform_Metric_Alert_Counter_Stop
- Platform_Metric_Alert_Counter_Stop(options)
- alert if counter stop increasing for some time
- options: {metric, select, groupBy, window, refWindow, interval, recordWindow}
- metric: metric name (must be a counter type)
- select: metric label select
- groupBy: groupBy field(s), string or list of strings
- duration: detection thresold. default is "10m"
- lookback: lookback offset. default is "1h"
- interval: polling interval. default is "1m"
- history: alert record duration, default is "1h"
- if no alert found, return undefined.
- else return alerts.
let options = {
metric: `platform_component_total`,
groupBy: "id",
duration: "10m",
lookback: "1h",
interval: "1m",
history: "1h"
}
let alerts = Platform_Metric_Alert_Counter_Stop(options)
if alerts {
alerts.Emit("Component_Stop", "component stopped for 10 minutes", "warn", 3600)
}
Platform_LoadComponent
- Platform_LoadComponent()
- return all components (datasource, datasink, router and pipe)
// create a key value map for component id => name translation
let idMap = {}
let components = Platform_LoadComponent()
components.Each( (_, c) => {
idMap[c.id] = c.name
})
Platform_Platform_Cache_Check
- Platform_Cache_Check(cacheName)
- return true if cache exists
let flag = Platform_Cache_Check("cache1")
printf("flag %v", flag)
let ok = Platform_Cache_Register("cache1", {expire: 3600})
printf("ok %v", ok)
if ok {
Platform_Cache_Set("cache1", "foo", "bar")
Platform_Cache_SetMultiple("cache1", ["k1", "k2"], ["v1", "v2"])
}
let b = Platform_Cache_Get("cache1", "foo")
printf("b: %s", b)
Platform_Cache_Register
- Platform_Cache_Register(cacheName, options)
- register a cache
- return true if success
- return false if cache is already registered
- options: {expire=0}
- cache expire time in seconds, default is 0 (never expire)
Platform_Cache_Set
- Platform_Cache_Set(cacheName, key, value)
- Set a key value pair to cache
Platform_Cache_SetMultiple
- Platform_Cache_SetMultiple(cacheName, keys, values)
- Set multiple key value pairs to cache
Platform_Cache_Get
- Platform_Cache_Get(cacheName, key)
- get the value of key from cache and returns it
- return undefined if key is not found
Platform_Channel(channel, eventEnvelop)
- sends an event to a channel
- the event will be sent to all rules in the channel
- any runtime excpetions will be ignored
Platform_Notification_Email(options)
- sends an email notification
- options: {to, cc, bcc, subject, html, text}
- to: email address or list of email addresses
- cc: email address or list of email addresses
- bcc: email address or list of email addresses
- subject: email subject
- html: email body in html format
- text: email body in text format
let template = `<p>Time: {{ .time }}</p><p>Alert: <b>{{.name}}</b> ({{ .description }})</p>`
let subjectTemplate = `Fluency Platform Alert: {{.name}} - {{ .action }}: {{.displayName}}`
let html = htmlTemplate(template, event)
let subject = template(subjectTemplate, event)
let options = {
to: config.to,
cc: config.cc,
subject,
html
}
Platform_Notification_Email(options)
Platform_Notification_Slack(integrationName, options)
- sends a slack notification
- integrationName: slack integration name
- options: {channel, message}
- channel: slack channel name
- message: slack message
let template = `
Alert: *{{.name}}*
Description: *{{.description}}*
Severity: *{{.severity}}*
Action: *{{.action}}*
Source: *{{.source}}*
`
let message = template(template, event)
let integrationName = config.integrationName
let options = {
channel: "#fluency_grid",
message
}
Platform_Notification_Slack(integrationName, options)
Platform_Sink
- Platform_Sink(sink, eventEnvelop)
- sends the event to the specified data sink
sleep
- sleep(delayInMillisecond)
- puts the component to sleep for the specified duration
sleep(1000) // sleep for one second