feat(msg): new integration test - CreateSnapshots
All checks were successful
continuous-integration/drone/pr Build is passing

This commit is contained in:
Dawid Wysokiński 2023-02-24 07:04:07 +01:00
parent a530fdbe99
commit a901f87b38
Signed by: Kichiyaki
GPG Key ID: B5445E357FB8B892
3 changed files with 3753 additions and 5 deletions

View File

@ -69,7 +69,6 @@ func (p *PlayerConsumer) Register(router *message.Router) {
}
func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload model.ServerRefreshedEvPayload
if err := p.marshaler.Unmarshal(msg, &payload); err != nil {
p.logger.Error("invalid payload, dropping message", err, watermill.LogFields{

View File

@ -73,7 +73,6 @@ func TestUpdateData(t *testing.T) {
amqp.GenerateQueueNameTopicNameWithSuffix("_update_data_village"),
generateExchangeAndRoutingKeyName,
)
serverPublisher := msg.NewServerPublisher(serverPub, marshaler)
// filesys is set later in this test to os.DirFS("./testdata/updatedata/stage{n}")
var filesys fs.FS
@ -131,7 +130,14 @@ func TestUpdateData(t *testing.T) {
villageSvc := service.NewVillage(villageRepo, client)
playerSnapshotSvc := service.NewPlayerSnapshot(playerSnapshotRepo, playerSvc)
tribeSnapshotSvc := service.NewTribeSnapshot(tribeSnapshotRepo, tribeSvc)
jobSvc := service.NewJob(versionSvc, serverSvc, serverPublisher, nil, nil, nil)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
msg.NewServerPublisher(serverPub, marshaler),
nil,
nil,
nil,
)
runRouterWithContext(
t,
@ -301,7 +307,6 @@ func TestUpdateEnnoblements(t *testing.T) {
amqp.GenerateQueueNameTopicNameWithSuffix("_update_ennoblements_ennoblement"),
generateExchangeAndRoutingKeyName,
)
ennoblementPublisher := msg.NewEnnoblementPublisher(ennoblementPub, marshaler)
// filesys is set later in this test to os.DirFS("./testdata/updateennoblements/stage{n}")
var filesys fs.FS
@ -353,7 +358,14 @@ func TestUpdateEnnoblements(t *testing.T) {
versionSvc := service.NewVersion(versionRepo)
serverSvc := service.NewServer(serverRepo, client)
ennoblementSvc := service.NewEnnoblement(ennoblementRepo, client)
jobSvc := service.NewJob(versionSvc, serverSvc, nil, ennoblementPublisher, nil, nil)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
nil,
msg.NewEnnoblementPublisher(ennoblementPub, marshaler),
nil,
nil,
)
runRouterWithContext(
t,
@ -409,6 +421,218 @@ func TestUpdateEnnoblements(t *testing.T) {
runUpdateEnnoblements(2)
}
func TestCreateSnapshots(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_create_snapshots"
}
marshaler := msg.GobMarshaler{}
rmqConn := newRabbitMQConnection(t)
nopLogger := watermill.NopLogger{}
serverPub, serverSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_server"),
generateExchangeAndRoutingKeyName,
)
playerPub, playerSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_player"),
generateExchangeAndRoutingKeyName,
)
tribePub, tribeSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_tribe"),
generateExchangeAndRoutingKeyName,
)
// db
db := bundbtest.NewDB(t)
require.NoError(t, bundbtest.NewFixture(db).Load(ctx, os.DirFS("./testdata/createsnapshots"), "fixture.yml"))
// repos
versionRepo := bundb.NewVersion(db)
serverRepo := bundb.NewServer(db)
playerRepo := bundb.NewPlayer(db)
playerSnapshotRepo := bundb.NewPlayerSnapshot(db)
tribeRepo := bundb.NewTribe(db)
tribeSnapshotRepo := bundb.NewTribeSnapshot(db)
// services
versionSvc := service.NewVersion(versionRepo)
serverSvc := service.NewServer(serverRepo, nil)
playerSvc := service.NewPlayer(playerRepo, nil, nil)
playerSnapshotSvc := service.NewPlayerSnapshot(playerSnapshotRepo, playerSvc)
tribeSvc := service.NewTribe(tribeRepo, nil)
tribeSnapshotSvc := service.NewTribeSnapshot(tribeSnapshotRepo, tribeSvc)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
nil,
nil,
msg.NewSnapshotPublisher(playerPub, marshaler),
nil,
)
runRouterWithContext(
t,
ctx,
msg.NewServerConsumer(marshaler, serverPub, serverSub, nopLogger, serverSvc),
msg.NewPlayerConsumer(marshaler, playerPub, playerSub, nopLogger, playerSvc, playerSnapshotSvc),
msg.NewTribeConsumer(marshaler, tribePub, tribeSub, nopLogger, tribeSvc, tribeSnapshotSvc),
)
require.NoError(t, jobSvc.CreateSnapshots(ctx))
now := time.Now()
assert.Eventually(t, func() bool {
servers, err := serverRepo.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
})
if err != nil {
return false
}
for _, s := range servers {
if !isWithinDuration(now, s.PlayerSnapshotsCreatedAt, time.Minute) ||
!isWithinDuration(now, s.TribeSnapshotsCreatedAt, time.Minute) {
return false
}
}
return true
}, 60*time.Second, time.Second, "servers")
require.NoError(t, ctx.Err())
assert.Eventually(t, func() bool {
tribes, err := tribeRepo.List(ctx, domain.ListTribesParams{
Sort: []domain.TribeSort{
{By: domain.TribeSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
snapshots, err := tribeSnapshotRepo.List(ctx, domain.ListTribeSnapshotsParams{
Sort: []domain.TribeSnapshotSort{
{By: domain.TribeSnapshotSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
if len(snapshots) != len(tribes) {
return false
}
for _, trb := range tribes {
idx := -1
for i, s := range snapshots {
if !cmp.Equal(domain.TribeSnapshot{
OpponentsDefeated: trb.OpponentsDefeated,
ID: 0,
TribeID: trb.ID,
ServerKey: trb.ServerKey,
NumMembers: trb.NumMembers,
NumVillages: trb.NumVillages,
Points: trb.Points,
AllPoints: trb.AllPoints,
Rank: trb.Rank,
Dominance: trb.Dominance,
Date: now,
CreatedAt: now,
}, s, cmpopts.IgnoreFields(domain.TribeSnapshot{}, "ID"), cmpopts.EquateApproxTime(24*time.Hour)) {
continue
}
idx = i
break
}
if idx == -1 {
return false
}
}
return true
}, 60*time.Second, time.Second, "tribe snapshots")
require.NoError(t, ctx.Err())
assert.Eventually(t, func() bool {
players, err := playerRepo.List(ctx, domain.ListPlayersParams{
Sort: []domain.PlayerSort{
{By: domain.PlayerSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
snapshots, err := playerSnapshotRepo.List(ctx, domain.ListPlayerSnapshotsParams{
Sort: []domain.PlayerSnapshotSort{
{By: domain.PlayerSnapshotSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
if len(snapshots) != len(players) {
return false
}
for _, p := range players {
idx := -1
for i, s := range snapshots {
if !cmp.Equal(domain.PlayerSnapshot{
OpponentsDefeated: p.OpponentsDefeated,
ID: 0,
PlayerID: p.ID,
ServerKey: p.ServerKey,
NumVillages: p.NumVillages,
Points: p.Points,
Rank: p.Rank,
TribeID: p.TribeID,
Date: now,
CreatedAt: now,
}, s, cmpopts.IgnoreFields(domain.PlayerSnapshot{}, "ID"), cmpopts.EquateApproxTime(24*time.Hour)) {
continue
}
idx = i
break
}
if idx == -1 {
return false
}
}
return true
}, 60*time.Second, time.Second, "player snapshots")
}
type urlChangerTransport struct {
url string
transport http.RoundTripper

File diff suppressed because it is too large Load Diff