『Go言語による並行処理』を読んでContextチョットワカルようになったので記録に残します。

Contextの解決したい問題

    1. Goは並行処理のためにゴルーチンを簡単に生成できる

 

    1. 多くの場合ゴルーチンは他のゴルーチンと連携しており、子のゴルーチンが処理を続けるべきかを知ることでリソースを効率的に利用できる

 

    それを実現する標準的なパターンが必要となりContextパッケージができた

使う時にどうするのが良いのか

実際に確かめてみました。
サンプルコードはgithubにあげてます。

structure image on mermaid

Proxy

別の場所からリクエストを受け取って次のサーバー流す場合。
Proxy自身も個別にタイムアウト設定を持っており渡した先で一定時間に終わらない場合にタイムアウトをさせます。
childCtx, cancel := context.WithCancel(ctx)でcancel関数とそれとつながったContextを生成しそれを次のサーバーに渡し、Proxyがtimeoutだと判断したらcancel()を呼びます。

childCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
  // Call server's method with cancelable context by proxy
  res, err := p.Server.Call(childCtx, task)
  ch <- ProxiedResponce{
    Responce: res,
    Error:    err,
  }
}()
go func() {
  // Call cancel after proxy timeout
  <-time.After(p.Timeout)
  cancel()
}()

サンプルの結果は以下のとおりです。

# Responce表示は[(計測時間)/(TaskのTimeout設定)]
# Clientは`70ms`でTimeoutするContextを渡す

# Proxyのtimeoutは45ms
# 30msで完了する`Fast`Serverにリクエストを渡した場合
Proxy: through[TO:45ms] Fast: Timeout [20.1622ms/20ms] Timeout by Server # time 20msなのでServerにてTimeout
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/60ms] <nil>
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/40ms] <nil>
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/80ms] <nil>

# 90msで完了する`Slow`Serverにリクエストを渡した場合
Proxy: through[TO:45ms] Slow: Timeout [20.1622ms/20ms] Timeout by Server
Proxy: through[TO:45ms] Slow: Timeout [40.8834ms/40ms] Timeout by Server
Proxy: through[TO:45ms] Slow: Cancel [45.0796ms/80ms] context canceled # ProxyによるCancel
Proxy: through[TO:45ms] Slow: Cancel [45.0796ms/60ms] context canceled # ProxyによるCancel

# Proxyのtimeoutを75msに変更
# 30msで完了する`Fast`Serverにリクエストを渡した場合
Proxy: through[TO:75ms] Fast: Timeout [20.1622ms/20ms] Timeout by Server # time 20msなのでServerにてTimeout
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/60ms] <nil>
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/80ms] <nil>
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/40ms] <nil>

# 90msで完了する`Slow`Serverにリクエストを渡した場合
Proxy: through[TO:75ms] Slow: Timeout [20.1622ms/20ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Timeout [40.8041ms/40ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Timeout [61.0672ms/60ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Cancel [70.9447ms/80ms] context deadline exceeded # Clientの指定したTimeoutによるCancel

コード全体を示します。Clientに返すのにサーバーからの戻り値を受けていますがプロキシ自身がエラーメッセージを生成して戻したりWithTimeoutでより短いコンテキストを生成しなおすというパターンもありそうです。
キャンセル時のRollback完了まで確認が必要か、誰がキャンセルをしたのか明示が必要か、などによってそのあたり変わりそうですね。

// ProxyProcessWithTimeout pass the request to the next process
// and interrupts the request by its own time out setting
type ProxyProcessWithTimeout struct {
    Name    string
    Server  Process
    Timeout time.Duration
}

func (p *ProxyProcessWithTimeout) Call(ctx context.Context, task Task) (Responce, error) {
    type ProxiedResponce struct {
        Responce Responce
        Error    error
    }
    // Generate goroutine
    ch := make(chan ProxiedResponce)

    // Get a cancel function to cancel by proxy
    childCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    go func() {
        // Call server's method with cancelable context by proxy
        res, err := p.Server.Call(childCtx, task)
        ch <- ProxiedResponce{
            Responce: res,
            Error:    err,
        }
    }()
    go func() {
        // Call cancel after proxy timeout
        <-time.After(p.Timeout)
        cancel()
    }()

    // wait responce
    pres := <-ch
    res := Responce(fmt.Sprintf("%s: through[TO:%s] %s", p.Name, p.Timeout, string(pres.Responce)))
    return res, pres.Error
}

Process

重い処理を行う場合は受け取ったContextから割り込みを適切に受けるのが良いです。
一定の短いループに分解できるのであれば途中で<-ctx.Done()を確認して中断をすべきかを判断させます。

for {
  // 何らかの処理...
  // ...
  //
  select {
      // Cancel, Time
      case <-ctx.Done():
        // rollback 
        return 
      default:
        // 終了条件
  }
}

サンプルの結果は以下のとおりです。

# Timeout [(計測時間)/(TaskのTimeout設定)]
# Clientは`70ms`でTimeoutするContextを渡す
# Proxyは`75ms`でContextをCancelする

Proxy: through[TO:75ms] l:1ms: Complete [loop: 20] [33.804ms/40ms] <nil>
Proxy: through[TO:75ms] l:1ms: Timeout [41.9723ms/40ms] Timeout by Simulation
Proxy: through[TO:75ms] l:1ms: Cancel [70.7212ms/80ms] context deadline exceeded # Contextのシグナルを受けて中断できている
Proxy: through[TO:75ms] l:1ms: Cancel [70.7212ms/80ms] context deadline exceeded

サンプルの全体は以下のとおりです。
Tick時間かかる処理をn回行う(task.Valueから取る)とし、ループのたびにcontextを確認します。
例では1tick=1msで動かしてますが20loopの実行に30msかかってますね。どこに時間がかかってるかまた調べます。

// ProcessSimulation simurate the real process.
// This is an example of a process that can be suspended from Context
type ProcessSimulation struct {
    Name string
    Tick time.Duration
}

func (p *ProcessSimulation) Call(ctx context.Context, task Task) (Responce, error) {
    start := time.Now()
    loopMax, ok := task.Value.(int)
    if !ok {
        return Responce(""), errors.Errorf("Task Value is not int")
    }
    if loopMax < 1 {
        return Responce(""), errors.Errorf("Task.Value must int type and 1 or more")
    }

    // Heavy loop process
    var counter int
    timeout := time.After(task.Timeout)
    for {
        if counter >= loopMax {
            return Responce(fmt.Sprintf("%s: Complete [loop: %d] [%s/%s]", p.Name, counter, time.Since(start), task.Timeout)), nil
        }
        counter++

        // Here is check Context and time out setting.
        select {
        case <-ctx.Done():
            // Add Rollback process when need
            return Responce(fmt.Sprintf("%s: Cancel [%s/%s]", p.Name, time.Since(start), task.Timeout)), ctx.Err()
        case <-timeout:
            // Add Rollback process when need
            return Responce(fmt.Sprintf("%s: Timeout [%s/%s]", p.Name, time.Since(start), task.Timeout)), errors.Errorf("Timeout by Simulation")
        case <-time.After(p.Tick):
        }
    }
}

ソースコードを読む

次はなぜWith***で設定をするのか、複数回WithTimeout()を呼んだらどうなるのか知るためにソースコード(context.go)を読んでみました。

    • ContextはWith***(ctx,args)のたびに新しいインスタンスを生成している

 

    • それらはContext内で宣言された*emptyCtxを終端に持つ連結リストの子孫である

 

    • Cancelは常に親から子に伝播し親は影響を受けない

 

    Valueは子から親に向かってのみ参照する

という実装になっていました。これで安心してキャンセルできますね。

Cancelの実装

Contextのキャンセルの実装はcancelCtxにあります。Timeout,DeadlineもcancelCtxを埋め込んであり指定の時間が来たらキャンセルをするだけです。
そのキャンセルの実装が次の部分です

    func (c *cancelCtx) cancel(removeFromParent bool, err error)

自身の状態を見てキャンセルされていない場合は状態を変更し(err fieldをnon-nilにする)子にも同じerrorを伝え、子を連結リストから取り除きます。
また自身を親の持つ子のリスト取り除きます。

親子関係の設定はWith***の際にfunc propagateCancel(parent Context, child canceler)を呼ぶことで行われます。

    func propagateCancel(parent Context, child canceler)

Valueの実装

WithValue(ctx, key, value)は指定されたkeyとvalueとctxへの参照を持つ*valueCtxを返します。
Value(key)が呼ばれるとkeyが一致するまで親の方向をたどります。
すべての親はbackground *emptyCtxであり常にnilを返すのでkeyが一致する最も近い*valueCtxの値かnilのどちらかが返ってきます。

    func WithValue

Done()から得られる値

Doneの戻り値は<-chan struct{}です。
クローズしてあるチャンネルはその型のデフォルト値とtrueを返します。
故にどこでどれだけDone()で待っていてもチャンネル閉じられたなら処理は次に進みます。

まとめ

以上Contextのキャンセル処理について試しました。
今思うと上記サンプルのResponceにはerrorを含んだほうが良かった気がします。
こういうのを考えるのは楽しいですね。

Contextをhttp.Requestなどから渡されるところから入ると単なるキャリアのように感じますが(私がそうでした)パフォーマンスを出すために裏側でゴルーチンが動いておりその背景からContextを引き継いでいるのだということでいろいろ納得をしました。
パフォーマンスを出すうえでゴルーチンで並行処理をすると思うのでその際は以下に注意して作ると良さそうです。

Contextを引数にとって割り込み可能に作る
重い処理はどこで割り込ませるのか、Rollback処理、エラーの伝播を設計時に考えておく