Flux 查询

基本查询结构

Flux 是一种函数式数据脚本语言,查询通过 |> 管道连接函数构建。

// Read from a bucket with time range
from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r._field == "usage_user")
  |> filter(fn: (r) => r.host == "server01")

// Absolute time range
from(bucket: "metrics")
  |> range(start: 2024-01-01T00:00:00Z, stop: 2024-02-01T00:00:00Z)

// Multiple field filters
from(bucket: "sensors")
  |> range(start: -30m)
  |> filter(fn: (r) =>
    r._measurement == "temperature" and
    (r.location == "room1" or r.location == "room2")
  )

aggregateWindow

使用聚合函数将数据降采样到规则时间窗口中。

// 5-minute averages
from(bucket: "metrics")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

// Other aggregation functions
|> aggregateWindow(every: 1h, fn: sum)
|> aggregateWindow(every: 1h, fn: max)
|> aggregateWindow(every: 1h, fn: min)
|> aggregateWindow(every: 1h, fn: count)
|> aggregateWindow(every: 1h, fn: last)
|> aggregateWindow(every: 1h, fn: first)

// Custom function in aggregateWindow
|> aggregateWindow(
  every: 1h,
  fn: (tables=<-, column) => tables |> percentile(percentile: 0.99)
)

map、keep、drop、rename

// map: transform each row
from(bucket: "metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> map(fn: (r) => ({
    r with
    _value: (r._value * 9.0 / 5.0) + 32.0,   // Celsius to Fahrenheit
    unit: "fahrenheit"
  }))

// keep: keep only specific columns
|> keep(columns: ["_time", "_value", "host", "_field"])

// drop: remove specific columns
|> drop(columns: ["_start", "_stop"])

// rename: rename columns
|> rename(columns: {_value: "metric_value", host: "server"})

join 与 pivot

// Join two streams
cpu = from(bucket: "metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")

mem = from(bucket: "metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")

join(
  tables: {cpu: cpu, mem: mem},
  on: ["_time", "host"]
)
|> map(fn: (r) => ({ r with ratio: r._value_cpu / r._value_mem }))

// pivot: reshape field rows into columns
from(bucket: "metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> pivot(
    rowKey:    ["_time", "host"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )
// Result columns: _time, host, usage_user, usage_system, usage_idle

任务与变量

// Flux variables
option task = { name: "hourly_downsampling", every: 1h, offset: 5m }

start = -task.every

data = from(bucket: "raw_metrics")
  |> range(start: start)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> aggregateWindow(every: 1h, fn: mean)

data |> to(bucket: "downsampled_metrics", org: "myorg")

// Useful functions
|> limit(n: 100)                          // take first N rows
|> sort(columns: ["_time"], desc: true)   // sort by time desc
|> unique(column: "host")                 // unique values
|> distinct(column: "_field")             // distinct field names
|> tail(n: 10)                            // last N rows
|> count()                                // count rows
|> mean()                                 // mean of _value