普罗米修斯 第一部分:DiscoveryManager和ScrapeManager
首先
Prometheus被广泛用作为一种灵活的开源监控工具,用于收集指标数据、设置警报,并且还成为了CNCF的毕业项目。
在本篇(第一部)中,简单介绍了Prometheus的基本运作方式。这份笔记是我基于个人兴趣所阅读的,希望能帮助其他对相似内容感兴趣的人更好地把握整体概况。
总体来说,感觉就像下图所示(稍微有些零乱)。这次主要关注 Discovery 和 Scape 部分。 WebUI、PromQL、AlertManager、Storage等等这些部分,本次(目前)省略。
组件
首先简要介绍接下来出现的组件:
-
- 目标:Prometheus抓取的对象
发现管理器:管理服务发现(SD)。每个SD的任务是从配置文件中更新目标。
提供者:服务发现的类型(例如kubernetes,azure)。
发现器:各个SD的实现
抓取管理器:管理抓取池的集合,并在目标由发现管理器更新时进行相应的更新。
抓取池:将目标组转换为实际的抓取对象。另外,抓取池具有多个抓取器。
抓取器:
TSDB:存储度量数据的数据库(与存储相关的内容不在本次讨论范围内)。
整体的趋势
-
- 初始化Manager类。
设置reloader变量为reloader。
应用scrapeManager的配置。
应用notificationManager的配置。
应用discoveryManagerScrape的配置。
其他。
将各个Manager的Run()和reload处理器设置到run.Group中。
通过g.Run()执行注册的RunGroup,开始所有操作。
关于服务发现
服务发现是由DiscoveryManager管理的提供者(如kubernetes、http等ScrapeConfig的类型)进行发现的。有两个重要的点,即Run和reloader。分别介绍如下。
跑()
DiscoveryManager是在prometheus/main.go文件中进行初始化,并在Run()函数中启动时调用sender()函数。
func (m *Manager) Run() error {
go m.sender()
for range m.ctx.Done() {
m.cancelDiscoverers()
return m.ctx.Err()
}
return nil
}
如果这个sender在For循环中,当triggerSend通道上存在消息时,它会向m.syncCh发送m.allGroups()。 (对于不知道Go语言中的<-操作符的人,请参考通道的引用)
func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-m.triggerSend: // triggerSend チャンネルにメッセージがあれば
sentUpdates.WithLabelValues(m.name).Inc() // sentUpdatesメトリクスをインクリメント
select {
case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
default:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}: // 次のLoopでもう一度更新するようにtriggerSendにメッセージを送信
default:
}
}
default:
}
}
}
}
当触发器函数(triggerSend)在updater()的循环中发生更改时,将发送一条消息。
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates: // updates があったら
receivedUpdates.WithLabelValues(m.name).Inc() // receiveUpdatesをインクリメント
if !ok {
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
return
}
for _, s := range p.subs { // providerのSubsucribers 同じProviderに紐づくscrape jobたちにたいして
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) // targetgroup.Groupを更新 setNameはconfigurationのjob_nameとprover.nameは `kubernetes`, `http`など
}
select {
case m.triggerSend <- struct{}{}: // triggerSendにメッセージを送信
default:
}
}
}
}
这个更新器(updater)是通过参数可知的,他被调用给提供者(provider),并在startProvider()中调用。
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel)
go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates)
}
startProvider是一个函数。
-
- 创建一个通道,其中要存放类型为targetgroup.Group的数组命名为updates。
-
- 调用Discoverer的Run(ctx, updates)来对应provider。
调用updater(ctx, p, updates)。
当Discover的Run和updater传递相同的通道时,Discoverer发现目标组并将其更新为updates,然后在updater()函数中调用Manager的updateGroup方法。
在此时,还没有调用startProvider。它将在reloader中被调用。
重新加载器(应用配置)
reloader 在 prometheus/main.go 文件中,定义了一个 reloaders 的集合,在 Initial configuration loading 阶段时会通过 reloadConfig() 方法进行调用。
DiscoveryManager的reloader是通过discoveryManagerScrape.ApplyConfig(c)实现的。c是一个scrape_config的内容,其中包含一个以JobName为键、以ServiceDiscoveryConfigs为值的映射。
}, {
name: "scrape_sd",
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
那么,让我们来看看应用配置(cfg)。
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.mtx.Lock()
defer m.mtx.Unlock()
for pk := range m.targets {
if _, ok := cfg[pk.setName]; !ok {
discoveredTargets.DeleteLabelValues(m.name, pk.setName)
}
}
m.cancelDiscoverers()
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.providers = nil
m.discoverCancel = nil
failedCount := 0
for name, scfg := range cfg {
failedCount += m.registerProviders(scfg, name)
discoveredTargets.WithLabelValues(m.name, name).Set(0)
}
failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))
for _, prov := range m.providers {
m.startProvider(m.ctx, prov)
}
return nil
}
-
- 取消发现者 (cancelDiscoverers())
-
- 根据每个配置 (config) 在 registerProviders() 中注册提供者 (provider登记)
- 针对每个提供者 (provider) 启动 (startProvider())
最后在Run函数中,最近看到的startProvider被调用,并与updater()函数连接在一起。
registerProviders会根据接收到的configuration来为每种需要监视的Service Discovery类型创建provider,并通过相同的provider可以查看多个目标,每个jobName将被放入对应provider的subs列表中。
startProvider会以goroutine方式运行与其对应的discoverer的Run和updater,就像上面所看到的一样。
在此之前,可以理解从ScrapeConfig更新TargetGroups的整体流程。
最后,变量 m.Targets 中存储了目标,但由于在从 Discover Manager 到 Scraper Manager 传递时发生了更改,因此需要明确指定类型。(关于 Scraper Manager 的详细信息在下一章中解释)。
首先,targetgroup.Group 的定义如下,包括 Targets、Labels 和 Source。
// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Targets []model.LabelSet
// Labels is a set of labels that is common across all targets in the group.
Labels model.LabelSet
// Source is an identifier that describes a group of targets.
Source string
}
下一个是m.Targets,其在Manager类型中也存在。
// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group
以下是poolKey。
type poolKey struct {
setName string
provider string
}
如果需要更具体地确定Targets,它可以被描述为以下两个阶段的Map。
DiscoveryManager.Targets: poolKey (setName和provider) → tg.Source → *targetgroup.Group
DiscoveryManager目标: poolKey (设置名称和提供者) → tg.Source → *targetgroup.Group
当这个被传递给Scrape Manager时,在allGroups()方法中,Map以Source作为键被展开并转换为List。
ScrapeManager.tsets: setName → []*targetgroup.Group
ScrapeManager.tsets: setName(设置名称) → []*targetgroup.Group(目标群组列表)
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
tSets := map[string][]*targetgroup.Group{}
n := map[string]int{}
for pkey, tsets := range m.targets {
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSets[pkey.setName] = append(tSets[pkey.setName], tg)
n[pkey.setName] += len(tg.Targets)
}
}
for setName, v := range n {
discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v))
}
return tSets
}
难的是,源自于这个Targets的是将 updates 通道 (chan []*targetgroup.Group) 传递给Discoverer,每个Discoverer都会发送消息,因此详细的实现在目前是看不见的。
关于Scrape
ScrapeManager负责管理多个ScrapePool,每个ScrapePool都有一个ScrapeLoop。
与Service Discovery几乎相同,我们将查看Manager的Run和 reloader。
跑步()
运行(tsets <-chan map[string][]*targetgroup.Group)
当Run开始时,tsets参数作为一个接收通道,包含了一个包含目标组映射的地图。
看一下prometheus/main.go中调用Run的部分。
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
运行 scrapeManager.Run(discoveryManagerScrape.SyncCh()) 并且可以看出 discoveryManagerScrape.SyncCh() 是作为参数返回的。 (discoveryManagerScrape 是用于刮取的发现管理器,还有一个用于通知的discoveryManagerNotify。)
在发现管理器中,SyncCh是一个熟悉的变量名!
func (m *Manager) sender() {
...
case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
...
}
在 `sender()` 函数中,向信道发送 `m.allGroups()`。当查看 `SyncCh()` 函数时,可以如预期地看到它以只读方式返回 `m.syncCh`。
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh
}
我发现 DiscoveryManager一直在更新从ScrapeConfig转换而来的TargetGroups,并通过通道将其持续传递给ScrapeManager!
我理解了这段代码https://github.com/prometheus/prometheus/blob/f5655c47e8d88220fab4c0a034ab480d4bbd537b/scrape/manager.go#L121-L122中Scrape Manager的说明的含义。
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.
这边是进入Run模块的地方。
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)
select {
case m.triggerReload <- struct{}{}:
default:
}
case <-m.graceShut:
return nil
}
}
}
-
- 在调用reloader()函数之后,
-
- 对于使用for循环遍历参数tsets表示的通道来说,如果有新的消息到来,
-
- 那么先通过m.updateTsets(ts)来更新目标集合,
- 然后再向m.triggerReload通道发送消息(在reloader中也会出现)。
由于Run正在阅读reloader,所以下一个reloader是
func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
每隔5秒检查m.triggerReload是否存在,如果存在则调用reload()函数。
func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
}
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)
}
m.mtxScrape.Unlock()
wg.Wait()
}
-
- 如果在m.targetSets目标集上没有检查与scrapePools对应的内容,则在newScrapePool中创建scrape pool,并将其存储在m.scrapePool中
- 对于ScrapePool进行与目标组的同步操作
ScrapePool具有以下类型,其中loops是一个scrapeLoop数组,storage.Appendable用于保存已经爬取的内容。activeTargets保持最新状态以进行Scrape操作的目标。
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable storage.Appendable
logger log.Logger
cancel context.CancelFunc
// mtx must not be taken after targetMtx.
mtx sync.Mutex
config *config.ScrapeConfig
client *http.Client
loops map[uint64]loop
targetLimitHit bool // Internal state to speed up the target_limit checks.
targetMtx sync.Mutex
// activeTargets and loops must always be synchronized to have the same
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
}
在Sync()函数中,我们使用targetsFromGroup来获取列表的目标组,并调用sp.sync(all)来只同步这些目标组中的资源。
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()
sp.targetMtx.Lock()
var all []*Target
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, failures := targetsFromGroup(tg, sp.config)
for _, err := range failures {
level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
}
targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
for _, t := range targets {
if t.Labels().Len() > 0 {
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.targetMtx.Unlock()
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}
在 sync(targets) 中,更新传入的 targets 的 activeTargets 和 loops。如果目标不在 activeTargets 中,则创建 targetScraper 并进行更新。同时,删除重复的目标。最后,调用 loop 的 run()。
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)
sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()
if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})
sp.activeTargets[hash] = t
sp.loops[hash] = l
uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])
delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}
sp.targetMtx.Unlock()
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
for _, l := range uniqueLoops {
if l != nil {
go l.run(interval, timeout, nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}
在loop.run()函数中,调用了scrapeAndReport()函数,该函数又调用了scrape()、appender()和report()函数。在发送HTTP请求以获取指标后,应该会进行保存操作(与scrape、appender和report函数的具体细节无关)。
总结
1. 在Prometheus的主函数中,大致流程是什么?
2. DiscoveryManager和ScrapeManager的大致流程是什么?
3. 信息是如何从DiscoveryManager传递到ScrapeManager的?
我能够看到
下次,计划对Storage环境和NotificationManager进行全面介绍,以便可以全面了解整体情况。
其他方面的感想是,如何有效地描绘内部逻辑是困难的。
链接
本次目标代码:
– Prometheus的main.go:https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go
– DiscoverManager:https://github.com/prometheus/prometheus/blob/main/discovery/manager.go
– ScrapeManager:https://github.com/prometheus/prometheus/blob/main/scrape/manager.go