普罗米修斯 第一部分:DiscoveryManager和ScrapeManager

首先

Prometheus被广泛用作为一种灵活的开源监控工具,用于收集指标数据、设置警报,并且还成为了CNCF的毕业项目。

在本篇(第一部)中,简单介绍了Prometheus的基本运作方式。这份笔记是我基于个人兴趣所阅读的,希望能帮助其他对相似内容感兴趣的人更好地把握整体概况。

总体来说,感觉就像下图所示(稍微有些零乱)。这次主要关注 Discovery 和 Scape 部分。 WebUI、PromQL、AlertManager、Storage等等这些部分,本次(目前)省略。

prometheus.png

组件

首先简要介绍接下来出现的组件:

    1. 目标:Prometheus抓取的对象

发现管理器:管理服务发现(SD)。每个SD的任务是从配置文件中更新目标。

提供者:服务发现的类型(例如kubernetes,azure)。

发现器:各个SD的实现

抓取管理器:管理抓取池的集合,并在目标由发现管理器更新时进行相应的更新。

抓取池:将目标组转换为实际的抓取对象。另外,抓取池具有多个抓取器。

抓取器:

TSDB:存储度量数据的数据库(与存储相关的内容不在本次讨论范围内)。

整体的趋势

    1. 初始化Manager类。

设置reloader变量为reloader。

应用scrapeManager的配置。

应用notificationManager的配置。

应用discoveryManagerScrape的配置。

其他。

将各个Manager的Run()和reload处理器设置到run.Group中。

通过g.Run()执行注册的RunGroup,开始所有操作。

关于服务发现

服务发现是由DiscoveryManager管理的提供者(如kubernetes、http等ScrapeConfig的类型)进行发现的。有两个重要的点,即Run和reloader。分别介绍如下。

跑()

DiscoveryManager是在prometheus/main.go文件中进行初始化,并在Run()函数中启动时调用sender()函数。

func (m *Manager) Run() error {
    go m.sender()
    for range m.ctx.Done() {
        m.cancelDiscoverers()
        return m.ctx.Err()
    }
    return nil
}

如果这个sender在For循环中,当triggerSend通道上存在消息时,它会向m.syncCh发送m.allGroups()。 (对于不知道Go语言中的<-操作符的人,请参考通道的引用)

func (m *Manager) sender() {
    ticker := time.NewTicker(m.updatert)
    defer ticker.Stop()

    for {
        select {
        case <-m.ctx.Done():
            return
        case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
            select {
            case <-m.triggerSend: // triggerSend チャンネルにメッセージがあれば
                sentUpdates.WithLabelValues(m.name).Inc() // sentUpdatesメトリクスをインクリメント
                select {
                case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
                default:
                    delayedUpdates.WithLabelValues(m.name).Inc()
                    level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
                    select {
                    case m.triggerSend <- struct{}{}: // 次のLoopでもう一度更新するようにtriggerSendにメッセージを送信
                    default:
                    }
                }
            default:
            }
        }
    }
}

当触发器函数(triggerSend)在updater()的循环中发生更改时,将发送一条消息。

func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
    for {
        select {
        case <-ctx.Done():
            return
        case tgs, ok := <-updates: // updates があったら
            receivedUpdates.WithLabelValues(m.name).Inc() // receiveUpdatesをインクリメント
            if !ok {
                level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
                return
            }

            for _, s := range p.subs { // providerのSubsucribers 同じProviderに紐づくscrape jobたちにたいして
                m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) // targetgroup.Groupを更新 setNameはconfigurationのjob_nameとprover.nameは `kubernetes`, `http`など 
            }

            select {
            case m.triggerSend <- struct{}{}: // triggerSendにメッセージを送信
            default:
            }
        }
    }
}

这个更新器(updater)是通过参数可知的,他被调用给提供者(provider),并在startProvider()中调用。

func (m *Manager) startProvider(ctx context.Context, p *provider) {
    level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
    ctx, cancel := context.WithCancel(ctx)
    updates := make(chan []*targetgroup.Group)

    m.discoverCancel = append(m.discoverCancel, cancel)

    go p.d.Run(ctx, updates)
    go m.updater(ctx, p, updates)
}

startProvider是一个函数。

    1. 创建一个通道,其中要存放类型为targetgroup.Group的数组命名为updates。

 

    1. 调用Discoverer的Run(ctx, updates)来对应provider。

调用updater(ctx, p, updates)。

当Discover的Run和updater传递相同的通道时,Discoverer发现目标组并将其更新为updates,然后在updater()函数中调用Manager的updateGroup方法。

在此时,还没有调用startProvider。它将在reloader中被调用。

重新加载器(应用配置)

reloader 在 prometheus/main.go 文件中,定义了一个 reloaders 的集合,在 Initial configuration loading 阶段时会通过 reloadConfig() 方法进行调用。

DiscoveryManager的reloader是通过discoveryManagerScrape.ApplyConfig(c)实现的。c是一个scrape_config的内容,其中包含一个以JobName为键、以ServiceDiscoveryConfigs为值的映射。

        }, {
            name: "scrape_sd",
            reloader: func(cfg *config.Config) error {
                c := make(map[string]discovery.Configs)
                for _, v := range cfg.ScrapeConfigs {
                    c[v.JobName] = v.ServiceDiscoveryConfigs
                }
                return discoveryManagerScrape.ApplyConfig(c)
            },

那么,让我们来看看应用配置(cfg)。

// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
    m.mtx.Lock()
    defer m.mtx.Unlock()

    for pk := range m.targets {
        if _, ok := cfg[pk.setName]; !ok {
            discoveredTargets.DeleteLabelValues(m.name, pk.setName)
        }
    }
    m.cancelDiscoverers()
    m.targets = make(map[poolKey]map[string]*targetgroup.Group)
    m.providers = nil
    m.discoverCancel = nil

    failedCount := 0
    for name, scfg := range cfg {
        failedCount += m.registerProviders(scfg, name)
        discoveredTargets.WithLabelValues(m.name, name).Set(0)
    }
    failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))

    for _, prov := range m.providers {
        m.startProvider(m.ctx, prov)
    }

    return nil
}
    1. 取消发现者 (cancelDiscoverers())

 

    1. 根据每个配置 (config) 在 registerProviders() 中注册提供者 (provider登记)

 

    针对每个提供者 (provider) 启动 (startProvider())

最后在Run函数中,最近看到的startProvider被调用,并与updater()函数连接在一起。

registerProviders会根据接收到的configuration来为每种需要监视的Service Discovery类型创建provider,并通过相同的provider可以查看多个目标,每个jobName将被放入对应provider的subs列表中。

startProvider会以goroutine方式运行与其对应的discoverer的Run和updater,就像上面所看到的一样。

在此之前,可以理解从ScrapeConfig更新TargetGroups的整体流程。

prometheus-discovery-manager.png

最后,变量 m.Targets 中存储了目标,但由于在从 Discover Manager 到 Scraper Manager 传递时发生了更改,因此需要明确指定类型。(关于 Scraper Manager 的详细信息在下一章中解释)。

首先,targetgroup.Group 的定义如下,包括 Targets、Labels 和 Source。

// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
    // Targets is a list of targets identified by a label set. Each target is
    // uniquely identifiable in the group by its address label.
    Targets []model.LabelSet
    // Labels is a set of labels that is common across all targets in the group.
    Labels model.LabelSet

    // Source is an identifier that describes a group of targets.
    Source string
}

下一个是m.Targets,其在Manager类型中也存在。

    // Some Discoverers(eg. k8s) send only the updates for a given target group
    // so we use map[tg.Source]*targetgroup.Group to know which group to update.
    targets map[poolKey]map[string]*targetgroup.Group

以下是poolKey。

type poolKey struct {
    setName  string
    provider string
}

如果需要更具体地确定Targets,它可以被描述为以下两个阶段的Map。

DiscoveryManager.Targets: poolKey (setName和provider) → tg.Source → *targetgroup.Group

DiscoveryManager目标: poolKey (设置名称和提供者) → tg.Source → *targetgroup.Group

当这个被传递给Scrape Manager时,在allGroups()方法中,Map以Source作为键被展开并转换为List。

ScrapeManager.tsets: setName → []*targetgroup.Group
ScrapeManager.tsets: setName(设置名称) → []*targetgroup.Group(目标群组列表)

func (m *Manager) allGroups() map[string][]*targetgroup.Group {
    m.mtx.RLock()
    defer m.mtx.RUnlock()

    tSets := map[string][]*targetgroup.Group{}
    n := map[string]int{}
    for pkey, tsets := range m.targets {
        for _, tg := range tsets {
            // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
            // to signal that it needs to stop all scrape loops for this target set.
            tSets[pkey.setName] = append(tSets[pkey.setName], tg)
            n[pkey.setName] += len(tg.Targets)
        }
    }
    for setName, v := range n {
        discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v))
    }
    return tSets
}

难的是,源自于这个Targets的是将 updates 通道 (chan []*targetgroup.Group) 传递给Discoverer,每个Discoverer都会发送消息,因此详细的实现在目前是看不见的。

关于Scrape

ScrapeManager负责管理多个ScrapePool,每个ScrapePool都有一个ScrapeLoop。

与Service Discovery几乎相同,我们将查看Manager的Run和 reloader。

跑步()

运行(tsets <-chan map[string][]*targetgroup.Group)

当Run开始时,tsets参数作为一个接收通道,包含了一个包含目标组映射的地图。

看一下prometheus/main.go中调用Run的部分。

        // Scrape manager.
        g.Add(
            func() error {
                // When the scrape manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager so
                // we wait until the config is fully loaded.
                <-reloadReady.C

                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                level.Info(logger).Log("msg", "Scrape manager stopped")
                return err
            },
            func(err error) {
                // Scrape manager needs to be stopped before closing the local TSDB
                // so that it doesn't try to write samples to a closed storage.
                level.Info(logger).Log("msg", "Stopping scrape manager...")
                scrapeManager.Stop()
            },
        )

运行 scrapeManager.Run(discoveryManagerScrape.SyncCh()) 并且可以看出 discoveryManagerScrape.SyncCh() 是作为参数返回的。 (discoveryManagerScrape 是用于刮取的发现管理器,还有一个用于通知的discoveryManagerNotify。)

在发现管理器中,SyncCh是一个熟悉的变量名!

func (m *Manager) sender() {
    ...
              case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
    ...
}

在 `sender()` 函数中,向信道发送 `m.allGroups()`。当查看 `SyncCh()` 函数时,可以如预期地看到它以只读方式返回 `m.syncCh`。

// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
    return m.syncCh
}

我发现 DiscoveryManager一直在更新从ScrapeConfig转换而来的TargetGroups,并通过通道将其持续传递给ScrapeManager!

我理解了这段代码https://github.com/prometheus/prometheus/blob/f5655c47e8d88220fab4c0a034ab480d4bbd537b/scrape/manager.go#L121-L122中Scrape Manager的说明的含义。

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.

这边是进入Run模块的地方。

func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
    go m.reloader()
    for {
        select {
        case ts := <-tsets:
            m.updateTsets(ts)

            select {
            case m.triggerReload <- struct{}{}:
            default:
            }

        case <-m.graceShut:
            return nil
        }
    }
}
    1. 在调用reloader()函数之后,

 

    1. 对于使用for循环遍历参数tsets表示的通道来说,如果有新的消息到来,

 

    1. 那么先通过m.updateTsets(ts)来更新目标集合,

 

    然后再向m.triggerReload通道发送消息(在reloader中也会出现)。

由于Run正在阅读reloader,所以下一个reloader是

func (m *Manager) reloader() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-m.graceShut:
            return
        case <-ticker.C:
            select {
            case <-m.triggerReload:
                m.reload()
            case <-m.graceShut:
                return
            }
        }
    }
}

每隔5秒检查m.triggerReload是否存在,如果存在则调用reload()函数。

func (m *Manager) reload() {
    m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {
        if _, ok := m.scrapePools[setName]; !ok {
            scrapeConfig, ok := m.scrapeConfigs[setName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
                continue
            }
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
            if err != nil {
                level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
                continue
            }
            m.scrapePools[setName] = sp
        }

        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {
            sp.Sync(groups)
            wg.Done()
        }(m.scrapePools[setName], groups)

    }
    m.mtxScrape.Unlock()
    wg.Wait()
}
    1. 如果在m.targetSets目标集上没有检查与scrapePools对应的内容,则在newScrapePool中创建scrape pool,并将其存储在m.scrapePool中

 

    对于ScrapePool进行与目标组的同步操作

ScrapePool具有以下类型,其中loops是一个scrapeLoop数组,storage.Appendable用于保存已经爬取的内容。activeTargets保持最新状态以进行Scrape操作的目标。

// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
    appendable storage.Appendable
    logger     log.Logger
    cancel     context.CancelFunc

    // mtx must not be taken after targetMtx.
    mtx            sync.Mutex
    config         *config.ScrapeConfig
    client         *http.Client
    loops          map[uint64]loop
    targetLimitHit bool // Internal state to speed up the target_limit checks.

    targetMtx sync.Mutex
    // activeTargets and loops must always be synchronized to have the same
    // set of hashes.
    activeTargets  map[uint64]*Target
    droppedTargets []*Target

    // Constructor for new scrape loops. This is settable for testing convenience.
    newLoop func(scrapeLoopOptions) loop
}

在Sync()函数中,我们使用targetsFromGroup来获取列表的目标组,并调用sp.sync(all)来只同步这些目标组中的资源。

// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
    sp.mtx.Lock()
    defer sp.mtx.Unlock()
    start := time.Now()

    sp.targetMtx.Lock()
    var all []*Target
    sp.droppedTargets = []*Target{}
    for _, tg := range tgs {
        targets, failures := targetsFromGroup(tg, sp.config)
        for _, err := range failures {
            level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
        }
        targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
        for _, t := range targets {
            if t.Labels().Len() > 0 {
                all = append(all, t)
            } else if t.DiscoveredLabels().Len() > 0 {
                sp.droppedTargets = append(sp.droppedTargets, t)
            }
        }
    }
    sp.targetMtx.Unlock()
    sp.sync(all)

    targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
        time.Since(start).Seconds(),
    )
    targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

在 sync(targets) 中,更新传入的 targets 的 activeTargets 和 loops。如果目标不在 activeTargets 中,则创建 targetScraper 并进行更新。同时,删除重复的目标。最后,调用 loop 的 run()。

func (sp *scrapePool) sync(targets []*Target) {
    var (
        uniqueLoops   = make(map[uint64]loop)
        interval      = time.Duration(sp.config.ScrapeInterval)
        timeout       = time.Duration(sp.config.ScrapeTimeout)
        bodySizeLimit = int64(sp.config.BodySizeLimit)
        sampleLimit   = int(sp.config.SampleLimit)
        labelLimits   = &labelLimits{
            labelLimit:            int(sp.config.LabelLimit),
            labelNameLengthLimit:  int(sp.config.LabelNameLengthLimit),
            labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
        }
        honorLabels     = sp.config.HonorLabels
        honorTimestamps = sp.config.HonorTimestamps
        mrc             = sp.config.MetricRelabelConfigs
    )

    sp.targetMtx.Lock()
    for _, t := range targets {
        hash := t.hash()

        if _, ok := sp.activeTargets[hash]; !ok {
            s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
            l := sp.newLoop(scrapeLoopOptions{
                target:          t,
                scraper:         s,
                sampleLimit:     sampleLimit,
                labelLimits:     labelLimits,
                honorLabels:     honorLabels,
                honorTimestamps: honorTimestamps,
                mrc:             mrc,
            })

            sp.activeTargets[hash] = t
            sp.loops[hash] = l

            uniqueLoops[hash] = l
        } else {
            // This might be a duplicated target.
            if _, ok := uniqueLoops[hash]; !ok {
                uniqueLoops[hash] = nil
            }
            // Need to keep the most updated labels information
            // for displaying it in the Service Discovery web page.
            sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
        }
    }

    var wg sync.WaitGroup

    // Stop and remove old targets and scraper loops.
    for hash := range sp.activeTargets {
        if _, ok := uniqueLoops[hash]; !ok {
            wg.Add(1)
            go func(l loop) {
                l.stop()
                wg.Done()
            }(sp.loops[hash])

            delete(sp.loops, hash)
            delete(sp.activeTargets, hash)
        }
    }

    sp.targetMtx.Unlock()

    targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
    forcedErr := sp.refreshTargetLimitErr()
    for _, l := range sp.loops {
        l.setForcedError(forcedErr)
    }
    for _, l := range uniqueLoops {
        if l != nil {
            go l.run(interval, timeout, nil)
        }
    }
    // Wait for all potentially stopped scrapers to terminate.
    // This covers the case of flapping targets. If the server is under high load, a new scraper
    // may be active and tries to insert. The old scraper that didn't terminate yet could still
    // be inserting a previous sample set.
    wg.Wait()
}

在loop.run()函数中,调用了scrapeAndReport()函数,该函数又调用了scrape()、appender()和report()函数。在发送HTTP请求以获取指标后,应该会进行保存操作(与scrape、appender和report函数的具体细节无关)。

prometheus-scrape-manager.png

总结

1. 在Prometheus的主函数中,大致流程是什么?
2. DiscoveryManager和ScrapeManager的大致流程是什么?
3. 信息是如何从DiscoveryManager传递到ScrapeManager的?

我能够看到

下次,计划对Storage环境和NotificationManager进行全面介绍,以便可以全面了解整体情况。

其他方面的感想是,如何有效地描绘内部逻辑是困难的。

链接

本次目标代码:
– Prometheus的main.go:https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go
– DiscoverManager:https://github.com/prometheus/prometheus/blob/main/discovery/manager.go
– ScrapeManager:https://github.com/prometheus/prometheus/blob/main/scrape/manager.go

广告
将在 10 秒后关闭
bannerAds