Skip to content

Commit

Permalink
fix: make sure canal have context
Browse files Browse the repository at this point in the history
  • Loading branch information
trim21 committed Oct 13, 2024
1 parent 69c2923 commit 9e21e2f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
13 changes: 8 additions & 5 deletions canal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func (e *eventHandler) Close() error {
return nil
}

func (e *eventHandler) OnUserPasswordChange(id model.UserID) error {
func (e *eventHandler) OnUserPasswordChange(ctx context.Context, id model.UserID) error {
e.log.Info("user change password", log.User(id))

if err := e.session.RevokeUser(context.Background(), id); err != nil {
if err := e.session.RevokeUser(ctx, id); err != nil {
e.log.Error("failed to revoke user", log.User(id), zap.Error(err))
return errgo.Wrap(err, "session.RevokeUser")
}
Expand All @@ -110,14 +110,17 @@ func (e *eventHandler) onMessage(key, value []byte) error {

e.log.Debug("new message", zap.String("table", p.Source.Table))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var err error
switch p.Source.Table {
case "chii_subject_fields":
err = e.OnSubjectField(key, p)
err = e.OnSubjectField(ctx, key, p)
case "chii_subjects":
err = e.OnSubject(key, p)
err = e.OnSubject(ctx, key, p)
case "chii_members":
err = e.OnUserChange(key, p)
err = e.OnUserChange(ctx, key, p)
}

return err
Expand Down
14 changes: 7 additions & 7 deletions canal/on_subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,32 @@ import (
"github.com/bangumi/server/internal/model"
)

func (e *eventHandler) OnSubject(key json.RawMessage, payload Payload) error {
func (e *eventHandler) OnSubject(ctx context.Context, key json.RawMessage, payload Payload) error {
var k SubjectKey
if err := json.Unmarshal(key, &k); err != nil {
return nil
}

return e.onSubjectChange(k.ID, payload.Op)
return e.onSubjectChange(ctx, k.ID, payload.Op)
}

func (e *eventHandler) OnSubjectField(key json.RawMessage, payload Payload) error {
func (e *eventHandler) OnSubjectField(ctx context.Context, key json.RawMessage, payload Payload) error {
var k SubjectFieldKey
if err := json.Unmarshal(key, &k); err != nil {
return nil
}

return e.onSubjectChange(k.ID, payload.Op)
return e.onSubjectChange(ctx, k.ID, payload.Op)
}

func (e *eventHandler) onSubjectChange(subjectID model.SubjectID, op string) error {
func (e *eventHandler) onSubjectChange(ctx context.Context, subjectID model.SubjectID, op string) error {
switch op {
case opCreate, opUpdate, opSnapshot:
if err := e.search.OnSubjectUpdate(context.TODO(), subjectID); err != nil {
if err := e.search.OnSubjectUpdate(ctx, subjectID); err != nil {
return errgo.Wrap(err, "search.OnSubjectUpdate")
}
case opDelete:
if err := e.search.OnSubjectDelete(context.TODO(), subjectID); err != nil {
if err := e.search.OnSubjectDelete(ctx, subjectID); err != nil {
return errgo.Wrap(err, "search.OnSubjectDelete")
}
}
Expand Down
12 changes: 6 additions & 6 deletions canal/on_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/bangumi/server/internal/pkg/logger/log"
)

func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error {
func (e *eventHandler) OnUserChange(ctx context.Context, key json.RawMessage, payload Payload) error {
var k UserKey
if err := json.Unmarshal(key, &k); err != nil {
e.log.Error("failed to unmarshal json", zap.Error(err))
Expand All @@ -53,14 +53,14 @@ func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error
}

if before.Password != after.Password {
err := e.OnUserPasswordChange(k.ID)
err := e.OnUserPasswordChange(ctx, k.ID)
if err != nil {
e.log.Error("failed to clear cache", zap.Error(err))
}
}

if before.NewNotify != after.NewNotify {
e.redis.Publish(context.Background(), fmt.Sprintf("event-user-notify-%d", k.ID), redisUserChannel{
e.redis.Publish(ctx, fmt.Sprintf("event-user-notify-%d", k.ID), redisUserChannel{
UserID: k.ID,
NewNotify: after.NewNotify,
})
Expand All @@ -72,14 +72,14 @@ func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error
}

e.log.Debug("clear user avatar cache", log.User(k.ID))
go e.clearImageCache(after.Avatar)
go e.clearImageCache(context.Background(), after.Avatar)
}
}

return nil
}

func (e *eventHandler) clearImageCache(avatar string) {
func (e *eventHandler) clearImageCache(ctx context.Context, avatar string) {
p, q, ok := strings.Cut(avatar, "?")
if !ok {
p = avatar
Expand All @@ -93,7 +93,7 @@ func (e *eventHandler) clearImageCache(avatar string) {

e.log.Debug("clear image for prefix", zap.String("avatar", avatar), zap.String("prefix", p))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

pages := s3.NewListObjectsV2Paginator(
Expand Down

0 comments on commit 9e21e2f

Please sign in to comment.