尝试在Golang中挑战CQRS+ES – 准备篇
首先
我想知道在Go语言中是否可以实现CQRS+ES?于是我从能实现的地方开始尝试。
由于这只是一个挑战,我希望不要过于严格地定义DDD和CQRS。特别是当涉及到DDD的值对象或者需要进行序列化时,在Go语言中在领域内进行封装可能会感到很麻烦,所以我不打算在这方面太过努力。
准备
首先,將作為準備共同使用的物品整理成common套件。
集约路径
package common
import (
"sort"
"github.com/google/uuid"
"github.com/pkg/errors"
)
type AggregateContext interface {
AggregateID() string
StreamVersion() int64
SetStreamVersion(int64)
UncommittedEvents() []EventContext
AppendUncommittedEvent(EventContext)
CommitEvent(EventContext)
CommittedEvents() []EventContext
Replay([]*StoredEvent) error
}
func NewAggregateID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
type AggregateBase struct {
aggregateID string
streamVersion int64
uncommittedEventMap map[string]EventContext
committedEventMap map[string]EventContext
}
func NewAggregateBase(aggregateID string) *AggregateBase {
return &AggregateBase{
aggregateID: aggregateID,
streamVersion: int64(0),
uncommittedEventMap: make(map[string]EventContext),
committedEventMap: make(map[string]EventContext),
}
}
func (a *AggregateBase) AggregateID() string {
return a.aggregateID
}
func (a *AggregateBase) StreamVersion() int64 {
return a.streamVersion
}
func (a *AggregateBase) SetStreamVersion(value int64) {
a.streamVersion = value
}
func (a *AggregateBase) UncommittedEvents() []EventContext {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
results := make([]EventContext, 0, len(a.uncommittedEventMap))
for _, event := range a.uncommittedEventMap {
results = append(results, event)
}
sort.SliceStable(results, func(i, j int) bool {
return results[i].GetOccurredOn() < results[j].GetOccurredOn()
})
return results
}
func (a *AggregateBase) AppendUncommittedEvent(event EventContext) {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
a.uncommittedEventMap[event.GetEventID()] = event
}
func (a *AggregateBase) CommittedEvents() []EventContext {
if a.committedEventMap == nil {
a.committedEventMap = make(map[string]EventContext)
}
results := make([]EventContext, 0, len(a.committedEventMap))
for _, event := range a.committedEventMap {
results = append(results, event)
}
sort.SliceStable(results, func(i, j int) bool {
return results[i].GetOccurredOn() < results[j].GetOccurredOn()
})
return results
}
func (a *AggregateBase) CommitEvent(event EventContext) {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
delete(a.uncommittedEventMap, event.GetEventID())
if a.committedEventMap == nil {
a.committedEventMap = make(map[string]EventContext)
}
a.committedEventMap[event.GetEventID()] = event
}
大致上来解释…
-
- AggregateIDが一意なID。DDD的には集約毎に値オブジェクトにした方がいいんだろうなとは思う。
-
- StreamVersionがイベントソーシングのバージョン管理で永続化されるとSetStreamVersionでインクリメントされる。
-
- UncommittedEventsが適用前のイベントで、イベントがAppendUncommittedEventでここに突っ込まれる。
-
- CommittedEventsが適用済のイベントで、永続化やリプレイ時にここに適用された際にCommitEventでイベントが入る。
- Replayは永続化ストアから再生する時に呼ばれる。これは各集約ルートで実装。
活动
package common
import (
"github.com/google/uuid"
"github.com/pkg/errors"
)
type EventContext interface {
GetEventID() string
GetEventType() string
GetOccurredOn() int64
}
func NewEventID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
没有特别需要解释的,但为什么在接口上要加上”Get”呢?因为在实现时可能出现名称冲突。
永久存储店
package common
type PersistenceContext interface {
ReplayAggregate(a AggregateContext) error
Save(a AggregateContext) error
}
type FakePersistence struct{}
func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
storedEvents := make([]*StoredEvent, 0)
return a.Replay(storedEvents)
}
func (p *FakePersistence) Save(a AggregateContext) error {
for _, e := range a.UncommittedEvents() {
/*
storedEvent := &StoredEvent {
AggregateID: a.AggregateID(),
StreamVersion: a.StreamVersion(),
OccurredOn: e.OccurredOn(),
EventType: e.EventType(),
Data: []byte(e),
}
db save stored event
*/
a.CommitEvent(e)
}
return nil
}
type PersistenceQueryContext interface {
QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}
type FakePersistenceQuery struct{}
func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
storedEvents := make([]*StoredEvent, 0)
/*
load stored events from db
*/
return storedEvents, nil
}
type StoredEvent struct {
AggregateID string
StreamVersion int64
OccurredOn int64
EventType string
Data []byte
}
伪造(Fake)有点困难。实际上我尝试使用Cassandra(gocql)来实现,但是如果深入讨论就会变得太长,所以在这里省略使用Fake。
-
- FakePersistenceをコマンド側が利用し、集約ルートの永続化やリプレイを行います。
-
- FakePersistenceQueryをクエリ側が利用し、指定された範囲(baseとlimit)のバージョンのイベントを取得できるようにします。
- goではC#や、Javaのようにblobデータから単純にstructに戻せないのでStoredEventにラップして、再生時はその中のEventType(構造体名)を見てイベントを元のstructに戻します。
消息传送
package common
import (
"context"
"github.com/google/uuid"
"github.com/pkg/errors"
)
type MessageContext interface {
GetMessageID() string
GetMessageType() string
}
func NewMessageID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
type MessagingProducerContext interface {
Publish(m MessageContext) error
}
type FakeMessagingProducer struct{}
func (p *FakeMessagingProducer) Publish(m MessageContext) error {
/*
create message for messaging system
message header = m.MessageType()
message body = []byte(m)
messaging system send message
*/
return nil
}
type MessagingConsumerContext interface {
Consume(ctx context.Context, msg chan<- MessageContext) error
}
type FakeMessagingConsumer struct{}
func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- MessageContext) error {
/*
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
delivery = from messaging system
msg <- delivery to MessageContext
}
}
*/
return nil
}
这个也是一个很费劲的假货,但我们也实际上试用了Rabbitmq(amqp)。将频道作为消费者使用是为了那个目的。
-
- FakeMessagingProducerが送信側です。
-
- FakeMessagingConsumerが受信側です。Channelを使って受け待ちします。
- こちらもstructを再生するためにヘッダー等にMessageType(構造体名)と入れます。
结束
暂时先到这里作为准备篇。
因为我感到担忧,不知道在Fake方面能不能实际实现,而且代码本身也有些沉重,我在想能不能更像Go语言一样轻松一点呢……。