时间:2025-09-29 17:42
人气:
作者:admin
作者:张富春(ahfuzhang),转载时请注明作者和引用链接,谢谢!
以最经典的计算 qps 的曲线为例,vmselect 内部是如何计算的?
通常会在 grafana 中配置一个 line chart,然后使用以下的 promql 表达式来计算每分钟的请求量:
sum by (path) (increase(http_request_total{job="myApp"}[1m]))
grafana 会向所配置数据源的 vmselect 发送类似的请求:
POST /select/0/prometheus/api/v1/query_range HTTP/1.1
Host: xxx
Content-Type: application/x-www-form-urlencoded
start=${开始时间}&end=${结束时间}&step=15s&query=sum by (path) (increase(http_request_total{job="myApp"}[1m]))
TL;DR
可以直接跳到下一节看源码分析
| 文件 | 函数 | 调用代码 | 说明 |
|---|---|---|---|
| app/vmselect/main.go | func main() | Main 函数 | |
| go httpserver.Serve(listenAddrs, requestHandler | 启动http 服务 | ||
| func requestHandler | http的callback 函数 | ||
| return selectHandler(qt, startTime, w, r, p, at) | 执行 http://vmselect:8481/select/ 这个路径 | ||
| func selectHandler | 查询的处理函数 | ||
| prometheus.QueryRangeHandler(qt, startTime, at, w, r) | promql 查询的处理函数 /query_range 这条API 的处理代码 |
||
| app/vmselect/prometheus/ prometheus.go |
func QueryRangeHandler | ||
| queryRangeHandler(qt, startTime, at, w, query | 从http协议中取出参数,执行范围查询 | ||
| func queryRangeHandler | |||
| result, err := promql.Exec(qt, ec, query, false) | 组织好 promql.EvalConfig 对象 | ||
| app/vmselect/promql/exec.go | func Exec | 执行查询表达式的函数 | |
| e, err := parsePromQLWithCache(q) | 解析查询表达式 | ||
| qid := activeQueriesV.Add(ec, q) | 记录当前正在查询哪个表达式 | ||
| rv, err := evalExpr(qt, ec, e) | 执行解析后的表达式, metricsql.Expr对象 | ||
| app/vmselect/promql/eval.go | func evalExpr | evalExpr会根据 promql的结构嵌套执行,直到叶子节点。 | |
| rv, err := evalExprInternal(qt, ec, e) | |||
| func evalExprInternal | 逐个种类判断,一共八个种类。 是哪种表达式,就执行对应的分支 | ||
| rv, err := evalAggrFunc(qtChild, ec, ae) | 执行聚合表达式。(选择最常见的一种表达式来分析) | ||
| func evalAggrFunc | |||
| callbacks := getIncrementalAggrFuncCallbacks(ae.Name) | 根据表达式中的聚合函数名,找到对应的执行代码。 例如:函数 sum() 对应着一个 golang 的 func | ||
| fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae) | 如果 sum() 里面还有类似 increase() 这样的 rollup 函数,则执行这一步 | ||
| args, re, err := evalRollupFuncArgs(qt, ec, fe) | 先执行 rollup() 函数里面的表达式 | ||
| rf, err := nrf(args) | 得到表达式的结果后,再执行 rollup() 函数 | ||
| func evalRollupFuncArgs | |||
| ts, err := evalExpr(qt, ec, arg) | 嵌套执行表达式,又回到函数 func evalExpr | ||
| 内层一般都是 metrics 表达式 | 这里开始展示执行到了叶子节点的情况。 | ||
| rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) | |||
| func evalRollupFunc | |||
| return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc) | |||
| func evalRollupFuncWithoutAt | |||
| rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window) | |||
| func evalRollupFuncWithMetricExpr | |||
| tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) | |||
| func evalRollupFuncNoCache | |||
| tfss := searchutil.ToTagFilterss(me.LabelFilterss) | 把 metrics 相关的表达式,变成标签过滤的对象 | ||
| sq = storage.NewMultiTenantSearchQuery(ts, minTimestamp, ec.End, tfss, ec.MaxSeries) | 把 [][]storage.TagFilter 构造成 SearchQuery 对象 | ||
| rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline) | 把请求发到 storage 节点,得到了 Results 对象 | ||
| evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) | |||
| func evalRollupNoIncrementalAggregate | |||
| rss.RunParallel(qt, ... | 当 vmstorage 返回 metricBlock 数据块后,开始并行执行,做各种聚合运算。 | ||
| app/vmselect/netstorage/ netstorage.go |
func (rss *Results) RunParallel | ||
| rowsProcessedTotal, err := rss.runParallel(qt, f) | 在与核数相匹配的协程中并行执行 | ||
| func (rss *Results) runParallel | |||
| err = tsw.do(&tmpResult.rs, 0) | 每个 time series的数据上调用 do 方法 | ||
| func (tsw *timeseriesWork) do | |||
| err := tsw.pts.Unpack(r, rss.tbfs, rss.tr) | 把 metricBlock 数据进行反序列化,变成与 data point 数量相等的 []timestamp 和 []values | ||
| func (pts *packedTimeseries) Unpack | |||
| dedupInterval := storage.GetDedupInterval() | 查询时,拉取全局的 dedup 间隔配置 | ||
| mergeSortBlocks(dst, sbh, dedupInterval) | 去重逻辑。去掉重复的 timestamp |
QueryRangeHandler 中进行处理Exec() 函数来处理查询parsePromQL() 来解析 promql 表达式,把表达式变成 8 中基本语句的嵌套。8种语句包含:
metricsql.MetricExpr: metric 的过滤表达式,主要是 tag 层面的过滤metricsql.RollupExpr: 可以理解为 increase(), rate() 这样的区间聚合函数metricsql.FuncExpr: 执行 metricsQL 内部提供的函数,例如 label_replace() 等metricsql.AggrFuncExpr: 执行聚合函数,例如 max, sum, avg 等metricsql.BinaryOpExpr: 执行布尔表达式的运算,主要有:and / or / unlessmetricsql.NumberExpr: 数值常量表达式metricsql.StringExpr: 把字符串看成一个独立的时间序列metricsql.DurationExpr: 产生新的 timestamp 的序列evalExpr(), 直到执行完成整个表达式evalExpr() 函数,做 sum 等表达式的计算最后,为了便于分析 VictoriaMetrics 系列的源码,我又专门建立了一个源码仓库来存放增加了注释的源码:
https://github.com/ahfuzhang/code_comments
Have Fun. ????