diff --git a/canal/event.go b/canal/event.go index fec87c3d..67efec00 100644 --- a/canal/event.go +++ b/canal/event.go @@ -85,10 +85,10 @@ func (e *eventHandler) Close() error { return nil } -func (e *eventHandler) OnUserPasswordChange(id model.UserID) error { +func (e *eventHandler) OnUserPasswordChange(ctx context.Context, id model.UserID) error { e.log.Info("user change password", log.User(id)) - if err := e.session.RevokeUser(context.Background(), id); err != nil { + if err := e.session.RevokeUser(ctx, id); err != nil { e.log.Error("failed to revoke user", log.User(id), zap.Error(err)) return errgo.Wrap(err, "session.RevokeUser") } @@ -110,14 +110,17 @@ func (e *eventHandler) onMessage(key, value []byte) error { e.log.Debug("new message", zap.String("table", p.Source.Table)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var err error switch p.Source.Table { case "chii_subject_fields": - err = e.OnSubjectField(key, p) + err = e.OnSubjectField(ctx, key, p) case "chii_subjects": - err = e.OnSubject(key, p) + err = e.OnSubject(ctx, key, p) case "chii_members": - err = e.OnUserChange(key, p) + err = e.OnUserChange(ctx, key, p) } return err diff --git a/canal/on_subject.go b/canal/on_subject.go index fbe70e8b..c60a4287 100644 --- a/canal/on_subject.go +++ b/canal/on_subject.go @@ -23,32 +23,32 @@ import ( "github.com/bangumi/server/internal/model" ) -func (e *eventHandler) OnSubject(key json.RawMessage, payload Payload) error { +func (e *eventHandler) OnSubject(ctx context.Context, key json.RawMessage, payload Payload) error { var k SubjectKey if err := json.Unmarshal(key, &k); err != nil { return nil } - return e.onSubjectChange(k.ID, payload.Op) + return e.onSubjectChange(ctx, k.ID, payload.Op) } -func (e *eventHandler) OnSubjectField(key json.RawMessage, payload Payload) error { +func (e *eventHandler) OnSubjectField(ctx context.Context, key json.RawMessage, payload Payload) error { var k SubjectFieldKey if err := json.Unmarshal(key, &k); err != nil { return nil } - return e.onSubjectChange(k.ID, payload.Op) + return e.onSubjectChange(ctx, k.ID, payload.Op) } -func (e *eventHandler) onSubjectChange(subjectID model.SubjectID, op string) error { +func (e *eventHandler) onSubjectChange(ctx context.Context, subjectID model.SubjectID, op string) error { switch op { case opCreate, opUpdate, opSnapshot: - if err := e.search.OnSubjectUpdate(context.TODO(), subjectID); err != nil { + if err := e.search.OnSubjectUpdate(ctx, subjectID); err != nil { return errgo.Wrap(err, "search.OnSubjectUpdate") } case opDelete: - if err := e.search.OnSubjectDelete(context.TODO(), subjectID); err != nil { + if err := e.search.OnSubjectDelete(ctx, subjectID); err != nil { return errgo.Wrap(err, "search.OnSubjectDelete") } } diff --git a/canal/on_user.go b/canal/on_user.go index 27c5af9b..990a4044 100644 --- a/canal/on_user.go +++ b/canal/on_user.go @@ -32,7 +32,7 @@ import ( "github.com/bangumi/server/internal/pkg/logger/log" ) -func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error { +func (e *eventHandler) OnUserChange(ctx context.Context, key json.RawMessage, payload Payload) error { var k UserKey if err := json.Unmarshal(key, &k); err != nil { e.log.Error("failed to unmarshal json", zap.Error(err)) @@ -53,14 +53,14 @@ func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error } if before.Password != after.Password { - err := e.OnUserPasswordChange(k.ID) + err := e.OnUserPasswordChange(ctx, k.ID) if err != nil { e.log.Error("failed to clear cache", zap.Error(err)) } } if before.NewNotify != after.NewNotify { - e.redis.Publish(context.Background(), fmt.Sprintf("event-user-notify-%d", k.ID), redisUserChannel{ + e.redis.Publish(ctx, fmt.Sprintf("event-user-notify-%d", k.ID), redisUserChannel{ UserID: k.ID, NewNotify: after.NewNotify, }) @@ -72,14 +72,14 @@ func (e *eventHandler) OnUserChange(key json.RawMessage, payload Payload) error } e.log.Debug("clear user avatar cache", log.User(k.ID)) - go e.clearImageCache(after.Avatar) + go e.clearImageCache(context.Background(), after.Avatar) } } return nil } -func (e *eventHandler) clearImageCache(avatar string) { +func (e *eventHandler) clearImageCache(ctx context.Context, avatar string) { p, q, ok := strings.Cut(avatar, "?") if !ok { p = avatar @@ -93,7 +93,7 @@ func (e *eventHandler) clearImageCache(avatar string) { e.log.Debug("clear image for prefix", zap.String("avatar", avatar), zap.String("prefix", p)) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() pages := s3.NewListObjectsV2Paginator(