-
Notifications
You must be signed in to change notification settings - Fork 3
/
stream_exports.go
83 lines (73 loc) · 1.71 KB
/
stream_exports.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package authb
import (
"errors"
"github.com/nats-io/jwt/v2"
)
type streamExports struct {
*AccountData
}
func (s *streamExports) Get(subject string) (StreamExport, error) {
se := s.getStreamExport(subject)
if se != nil {
return se, nil
}
return nil, ErrNotFound
}
func (s *streamExports) AddWithConfig(e StreamExport) error {
if e == nil {
return errors.New("invalid stream export")
}
be, ok := e.(*StreamExportImpl)
if !ok {
return errors.New("invalid stream export")
}
if err := s.addExport(be.export); err != nil {
return err
}
return s.update()
}
func (s *streamExports) Add(name string, subject string) (StreamExport, error) {
err := s.newExport(name, subject, jwt.Stream)
if err != nil {
return nil, err
}
// the pointer in the claim is changed by update, so we need to find it again
x := s.getStreamExport(subject)
if x == nil {
return nil, errors.New("could not find stream")
}
return x, nil
}
func (s *streamExports) Set(exports ...StreamExport) error {
var buf []*jwt.Export
// save existing serviceExports
for _, e := range s.Claim.Exports {
if e.IsService() {
buf = append(buf, e)
}
}
s.Claim.Exports = buf
for _, e := range exports {
if err := s.AddWithConfig(e); err != nil {
return err
}
}
return s.update()
}
func (s *streamExports) Delete(subject string) (bool, error) {
return s.deleteExport(subject, false)
}
func (s *streamExports) GetByName(name string) (StreamExport, error) {
for _, e := range s.Claim.Exports {
if e.IsStream() && e.Name == name {
se := &StreamExportImpl{}
se.data = s.AccountData
se.export = e
return se, nil
}
}
return nil, ErrNotFound
}
func (s *streamExports) List() []StreamExport {
return s.getStreamExports()
}