Skip to content

Commit

Permalink
enhance: Support aliyun as oss source
Browse files Browse the repository at this point in the history
Aliyun OSS was banned for minio adaptation issue. This PR add it back
after verification.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Aug 22, 2024
1 parent 7acafae commit 71f13a5
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/milvus-io/birdwatcher
go 1.18

require (
github.com/aliyun/credentials-go v1.3.6
github.com/aliyun/credentials-go v1.3.7
github.com/apache/arrow/go/v8 v8.0.0
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
github.com/blang/semver/v4 v4.0.0
Expand All @@ -25,7 +25,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.8.3
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/alibabacloud-go/debug v1.0.0 h1:3eIEQWfay1fB24PQIEzXAswlVJtdQok8f3EVN
github.com/alibabacloud-go/debug v1.0.0/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc=
github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU=
github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk=
github.com/aliyun/credentials-go v1.3.6 h1:K5STbhaWjoj5Ht0juOj9mWE2lGelShHLzu5QR3cQ5X8=
github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/aliyun/credentials-go v1.3.7 h1:f1XaxzMlyxvcRtHBWF6W3bWHWa2q26xNDjSnujXWgfM=
github.com/aliyun/credentials-go v1.3.7/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down Expand Up @@ -802,8 +802,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 h1:LcUqBlKC4j15LhT303yQDX/XxyHG4haEQqbHgZZA4SY=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982 h1:gxat/4F9zSOQRT2Kr9XvoakNyeWWXoLDPpdQruWfA2I=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0=
github.com/testcontainers/testcontainers-go v0.0.10/go.mod h1:2kePcwMHd3ix/BU3cTDuhvggUgMBAit+qcWwadeMXok=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
86 changes: 86 additions & 0 deletions oss/aliyun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package oss

import (
"github.com/aliyun/credentials-go/credentials" // >= v1.2.6
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
minioCred "github.com/minio/minio-go/v7/pkg/credentials"
)

type Credential interface {
credentials.Credential
}

func processMinioAliyunOptions(p MinioClientParam, opts *minio.Options) error {
if p.UseIAM {
credProvider, err := NewAliyunCredentialProvider()
if err != nil {
return err
}
opts.Creds = minioCred.New(credProvider)

Check failure on line 21 in oss/aliyun.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
} else {
opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "")
}
opts.BucketLookup = minio.BucketLookupDNS
return nil
}

// CredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider
// also implements transport
type CredentialProvider struct {
// aliyunCreds doesn't provide a way to get the expire time, so we use the cache to check if it's expired
// when aliyunCreds.GetAccessKeyId is different from the cache, we know it's expired
akCache string
aliyunCreds Credential
}

func NewAliyunCredentialProvider() (minioCred.Provider, error) {
aliyunCreds, err := credentials.NewCredential(nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create aliyun credential")
}
// backend, err := minio.DefaultTransport(true)
// if err != nil {
// return nil, errors.Wrap(err, "failed to create default transport")
// }
// credentials.GetCredential()
return &CredentialProvider{aliyunCreds: aliyunCreds}, nil
}

// Retrieve returns nil if it successfully retrieved the value.
// Error is returned if the value were not obtainable, or empty.
// according to the caller minioCred.Credentials.Get(),
// it already has a lock, so we don't need to worry about concurrency
func (c *CredentialProvider) Retrieve() (minioCred.Value, error) {
ret := minioCred.Value{}
ak, err := c.aliyunCreds.GetAccessKeyId()
if err != nil {
return ret, errors.Wrap(err, "failed to get access key id from aliyun credential")
}
ret.AccessKeyID = *ak
sk, err := c.aliyunCreds.GetAccessKeySecret()
if err != nil {
return minioCred.Value{}, errors.Wrap(err, "failed to get access key secret from aliyun credential")
}
securityToken, err := c.aliyunCreds.GetSecurityToken()
if err != nil {
return minioCred.Value{}, errors.Wrap(err, "failed to get security token from aliyun credential")
}
ret.SecretAccessKey = *sk
c.akCache = *ak
ret.SessionToken = *securityToken
return ret, nil
}

// IsExpired returns if the credentials are no longer valid, and need
// to be retrieved.
// according to the caller minioCred.Credentials.IsExpired(),
// it already has a lock, so we don't need to worry about concurrency
func (c CredentialProvider) IsExpired() bool {
ak, err := c.aliyunCreds.GetAccessKeyId()
if err != nil {
return true
}
return *ak != c.akCache
}
26 changes: 24 additions & 2 deletions oss/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
)

const (
CloudProviderGCP = "gcp"
CloudProviderAWS = "aws"
CloudProviderAliyun = "aliyun"
CloudProviderAzure = "azure"
CloudProviderTencent = "tencent"
)

type MinioClientParam struct {
Addr string
Port string
Expand Down Expand Up @@ -40,24 +48,38 @@ func NewMinioClient(ctx context.Context, p MinioClientParam) (*MinioClient, erro
endpoint := fmt.Sprintf("%s:%s", p.Addr, p.Port)

switch p.CloudProvider {
case "aws":
case CloudProviderAWS:
processMinioAwsOptions(p, opts)
case "gcp":
case CloudProviderGCP:
// adhoc to remove port of gcs address to let minio-go know it's gcs
if strings.Contains(endpoint, GcsDefaultAddress) {
endpoint = GcsDefaultAddress
}
processMinioGcpOptions(p, opts)
case CloudProviderAliyun:
processMinioAliyunOptions(p, opts)
case CloudProviderTencent:
// processMinioTencentOptions(p, opts)
// cos address issue WIP
fallthrough
case CloudProviderAzure:
// TODO support azure
fallthrough
default:
return nil, errors.Newf("Cloud provider %s not supported yet", p.CloudProvider)
}
fmt.Printf("Start to connect to oss endpoind: %s\n", endpoint)
client, err := minio.New(endpoint, opts)
if err != nil {
fmt.Println("new client failed: ", err.Error())
return nil, err
}

fmt.Println("Connection successful!")

ok, err := client.BucketExists(ctx, p.BucketName)
if err != nil {
fmt.Printf("check bucket %s exists failed: %s\n", p.BucketName, err.Error())
return nil, err
}
if !ok {
Expand Down
72 changes: 72 additions & 0 deletions oss/tencent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package oss

import (
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
minioCred "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
)

func processMinioTencentOptions(p MinioClientParam, opts *minio.Options) error {
if p.UseIAM {
credProvider, err := NewTencentCredentialProvider()
if err != nil {
return err
}
opts.Creds = minioCred.New(credProvider)

Check failure on line 17 in oss/tencent.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
} else {
opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "")
}
opts.BucketLookup = minio.BucketLookupDNS
return nil
}

// TencentCredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider
// also implements transport
type TencentCredentialProvider struct {
// tencentCreds doesn't provide a way to get the expired time, so we use the cache to check if it's expired
// when tencentCreds.GetSecretId is different from the cache, we know it's expired
akCache string
tencentCreds common.CredentialIface
}

func NewTencentCredentialProvider() (minioCred.Provider, error) {
provider, err := common.DefaultTkeOIDCRoleArnProvider()
if err != nil {
return nil, errors.Wrap(err, "failed to create tencent credential provider")
}

cred, err := provider.GetCredential()
if err != nil {
return nil, errors.Wrap(err, "failed to get tencent credential")
}
return &TencentCredentialProvider{tencentCreds: cred}, nil
}

// Retrieve returns nil if it successfully retrieved the value.
// Error is returned if the value were not obtainable, or empty.
// according to the caller minioCred.Credentials.Get(),
// it already has a lock, so we don't need to worry about concurrency
func (c *TencentCredentialProvider) Retrieve() (minioCred.Value, error) {
ret := minioCred.Value{}
ak := c.tencentCreds.GetSecretId()
ret.AccessKeyID = ak
c.akCache = ak

sk := c.tencentCreds.GetSecretKey()
ret.SecretAccessKey = sk

securityToken := c.tencentCreds.GetToken()
ret.SessionToken = securityToken
return ret, nil
}

// IsExpired returns if the credentials are no longer valid, and need
// to be retrieved.
// according to the caller minioCred.Credentials.IsExpired(),
// it already has a lock, so we don't need to worry about concurrency
func (c TencentCredentialProvider) IsExpired() bool {
ak := c.tencentCreds.GetSecretId()
return ak != c.akCache
}
4 changes: 4 additions & 0 deletions states/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
var ak, sk string
var useIAM string
var useSSL string
var region string

for _, config := range configurations {
switch config.GetKey() {
Expand All @@ -81,6 +82,8 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
addr = config.GetValue()
case "minio.port":
port = config.GetValue()
case "minio.region":
region = config.GetValue()
case "minio.bucketname":
bucketName = config.GetValue()
case "minio.rootpath":
Expand All @@ -101,6 +104,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str

mp := oss.MinioClientParam{
CloudProvider: cloudProvider,
Region: region,
Addr: addr,
Port: port,
AK: ak,
Expand Down

0 comments on commit 71f13a5

Please sign in to comment.